Движение по AruCo-меткам
Этот код реализует систему управления ровером, которая взаимодействует с сервером через протокол Socket.IO, управляет движением, поворотами и освещением ровера, а также анализирует изображения с камеры для обнаружения меток ArUco.
import time
import signal
import cv2
import numpy as np
import socketio
import asyncio
import aiofiles
from datetime import datetime
server = 'http://192.168.10.115:3006/' # адресс вашего ровера
# Создаем экземпляр асинхронного клиента Socket.IO
sio = socketio.AsyncClient()
diameter_of_wheel = 0.24 # Диаметр колеса в метрах
circumference = 3.14159 * diameter_of_wheel # Окружность колеса
async def write_report(message): # Функция создания автоматического отчета
filename = f'Отчет_Фамилия_Имя.txt'
async with aiofiles.open(filename, mode='a', encoding='utf-8') as file:
await file.write(message + "\n")
# Определяем обработчик события "connect"
@sio.event
async def connect():
await write_report('Подключено к серверу Socket.IO')
async def on_move(result):
await write_report(f'Command done: {result}')
async def perform_movement(distance_cm):
max_speed = 0.5 # Максимальная скорость м/с (примерная, можно изменить)
distance_m = distance_cm / 100.0 # Переводим сантиметры в метры
time_to_travel = distance_m / max_speed
start_time = time.time()
while (time.time() - start_time) < time_to_travel:
await sio.emit('command', {
'id': 1,
'type': "move",
'value': {
'x': 0,
'y': 1,
'sensitivity': 1,
},
}, namespace='/vehicles', callback=on_move)
await asyncio.sleep(0.2)
movement_message = f"Команда движения отправлена, ожидайте {time_to_travel:.2f} секунд."
await write_report(movement_message)
print(movement_message)
await asyncio.sleep(time_to_travel)
await asyncio.sleep(1)
completion_message = f"Движение завершено: Пройденное расстояние {distance_m} м."
await write_report(completion_message)
print(completion_message)
async def on_spin(result):
await write_report(f'Spin command done: {result}')
async def perform_spin_angle(angle, clockwise=True):
FULL_ROTATION_TIME = 1.1
ACCELERATION_FACTOR = 0.0005
rotation_time = (angle / 360) * FULL_ROTATION_TIME
total_time = rotation_time + (rotation_time * ACCELERATION_FACTOR)
start_time = time.time()
while (time.time() - start_time) < total_time:
await sio.emit('command', {
'id': 1,
'type': "spin",
'value': {
'state': True,
'direction': clockwise
},
}, namespace='/vehicles', callback=on_spin)
await asyncio.sleep(0.2)
spin_message = f"Начат поворот на {angle} градусов, {'по часовой стрелке' if clockwise else 'против часовой стрелки'}"
await write_report(spin_message)
print(spin_message)
completion_spin_message = f"Поворот на {angle} градусов завершён."
await write_report(completion_spin_message)
print(completion_spin_message)
async def on_lights(result):
await write_report(f'Lights command done: {result}')
async def control_lights(on: bool):
# Команда для управления светом
lights_command = {
'id': 1,
'type': 'lights',
'value': on
}
# Отправляем команду на сервер
await sio.emit('command', lights_command, namespace='/vehicles', callback=on_lights)
# Записываем сообщение в файл отчета и выводим его в консоль
action = "включены" if on else "выключены"
message = f"Команда на включение света {action} отправлена."
await write_report(message)
print(message)
# Функция для нахождения ближайшей к заданным координатам метки
def find_closest_marker(markers, target_x, target_y):
closest_marker = None
min_distance = float('inf')
for marker in markers:
center_x = marker['center'][0]
center_y = marker['center'][1]
distance = np.sqrt((center_x - target_x)**2 + (center_y - target_y)**2)
if distance < min_distance:
closest_marker = marker
min_distance = distance
return closest_marker
# Функция для нахождения угла поворота камеры
def calculate_rotation_angle(marker_center, target_x):
marker_x = marker_center[0]
delta_x = target_x - marker_x
focal_length = 1000 # Пример фокусного расстояния камеры
angle_rad = np.arctan(delta_x / focal_length)
angle_deg = np.degrees(angle_rad)
return angle_deg
def calculate_rotation_angles(marker_center, target_x):
marker_x = marker_center
delta_x = target_x - marker_x
focal_length = 1000 # Пример фокусного расстояния камеры
angle_rad = np.arctan(delta_x / focal_length)
angle_deg = np.degrees(angle_rad)
return angle_deg
def aruco_detect(frame,closest_id):
# Преобразование в оттенки серого и поиск меток
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_100)
parameters = cv2.aruco.DetectorParameters()
corners, ids, _ = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=parameters)
if ids is not None:
markers = []
mass_id_mark=[]
for i, marker_id in enumerate(ids):
marker_center = np.mean(corners[i][0], axis=0)
mass_id_mark.append([marker_id[0],corners[i][0]])
markers.append({'id': marker_id[0], 'center': marker_center})
# Находим ближайшую метку к x=360, y=10
target_x = 960
target_y = 1000
closest_marker = find_closest_marker(markers, target_x, target_y)
if closest_marker is not None:
# closest_id = closest_marker['id']
# closest_center = closest_marker['center']
# Выводим информацию о найденной метке
cv2.aruco.drawDetectedMarkers(frame, corners, ids)
# Находим индекс первого элемента массива, у которого первый элемент равен closest_id
y1=0
y2=0
ps=int(closest_id)
for sub_arr in mass_id_mark:
# Проверяем, если первый элемент массива равен 5
if sub_arr[0] == ps:
mass=sub_arr[1]
y1=int(mass[2][1])
y2=int(mass[3][1])
return y1,y2
return 0,0
async def main():
cap = cv2.VideoCapture("rtsp://localhost:8554/web")
await sio.connect(server, wait_timeout=20, namespaces=['/vehicles'], auth={"token": '000c2287-011a-4228-844e-e31z44b06d65'})
cv2.namedWindow('Frame',cv2.WINDOW_KEEPRATIO)
cv2.resizeWindow('Frame',640,640)
# while True:
# Считывание кадра
for i in range(1):
_,frame=cap.read()
# Преобразование в оттенки серого и поиск меток
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_100)
parameters = cv2.aruco.DetectorParameters()
corners, ids, _ = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=parameters)
if ids is not None:
markers = []
mass_id_mark=[]
for i, marker_id in enumerate(ids):
marker_center = np.mean(corners[i][0], axis=0)
markers.append({'id': marker_id[0], 'center': marker_center})
# Находим ближайшую метку к x=360, y=10
target_x = 960
target_y = 1000
closest_marker = find_closest_marker(markers, target_x, target_y)
if closest_marker is not None:
closest_id = closest_marker['id']
closest_center = closest_marker['center']
await control_lights(True)
# Выводим информацию о найденной метке
cv2.aruco.drawDetectedMarkers(frame, corners, ids)
# Вычисляем угол поворота камеры
angle_deg = calculate_rotation_angle(closest_center, target_x)
print(f"Closest marker ID: {closest_id}, Rotation angle needed: {angle_deg} degrees")
if angle_deg<0:
await perform_spin_angle(int(-angle_deg+10),True)
elif angle_deg>0:
await perform_spin_angle(int(angle_deg+10),False)
time.sleep(2)
y_start = closest_center[1]
y_end = 1920
# Коэффициент пропорциональности (должен быть настраиваемым в зависимости от вашей конкретной ситуации)
pixel_to_cm_ratio = 0.017
# Вычисление расстояния в сантиметрах
distance_cm = abs(y_end - y_start) * pixel_to_cm_ratio
await perform_movement(distance_cm)
ret, frame = cap.read()
y1,y2=aruco_detect(frame,closest_id.T)
if y1==0 and y2==0:
print("aruco is not detected")
else:
angle_virav = calculate_rotation_angles(y1,y2)
print(f"Closest marker ID: {closest_id}, Rotation angle needed: {angle_virav} degrees")
# Освобождение ресурсов
cv2.destroyAllWindows()
# Запускаем асинхронный цикл main()
asyncio.run(main())
Теперь разберем подробно этот код:
ИНИЦИАЛИЗАЦИЯ БИБЛИОТЕК
import time
import signal
import cv2
import numpy as np
import socketio
import asyncio
import aiofiles
from datetime import datetime
time: Стандартная библиотека Python для работы со временем.
signal: Стандартная библиотека Python для работы с сигналами ОС.
cv2 (OpenCV) Библиотека с открытым исходным кодом для компьютерного зрения.
numpy: Библиотека для работы с многомерными массивами и числовыми вычислениями.
socketio: Библиотека для общения по протоколу WebSocket и Socket.IO.
asyncio: Стандартная библиотека Python для написания асинхронного кода.
aiofiles: Библиотека для асинхронной работы с файлами.
datename: Стандартная библиотека Python для работы с датой и временем.
ЭКЗЕМПЛЯР АСИНХРОННОГО КЛИЕНТА SOCKETIO
sio = socketio.AsyncClient()
diameter_of_wheel = 0.24 # Диаметр колеса в метрах
circumference = 3.14159 * diameter_of_wheel # Окружность колеса
ОПИСАНИЕ ФУНКЦИЙ КОДА
write_report(message)
async def write_report(message): # Функция создания автоматического отчета
filename = f'Отчет_Фамилия_Имя.txt'
async with aiofiles.open(filename, mode='a', encoding='utf-8') as file:
await file.write(message + "\n")
Описание: Асинхронная функция для записи сообщений в файл отчета.
Аргументы:
message
: строка с сообщением, которое нужно записать.
Действие: Открывает или создает файл отчета с именем
Отчет_Фамилия_Имя.txt
и записывает в него сообщение в асинхронном режиме. Сообщение записывается в конце файла.
connect()
@sio.event
async def connect():
await write_report('Подключено к серверу Socket.IO')
Описание: Асинхронный обработчик события
connect
, вызывается при успешном подключении к серверу через Socket.IO.Действие: Записывает сообщение о подключении к серверу в отчет с помощью
write_report()
.
on_move(result)
async def on_move(result):
await write_report(f'Command done: {result}')
Описание: Обработчик обратного вызова, который вызывается после выполнения команды движения.
Аргументы:
result
: результат выполнения команды, переданный сервером.
Действие: Записывает результат команды движения в отчет с помощью
write_report()
.
perform_movement(distance_cm)
async def perform_movement(distance_cm):
max_speed = 0.5 # Максимальная скорость м/с (примерная, можно изменить)
distance_m = distance_cm / 100.0 # Переводим сантиметры в метры
time_to_travel = distance_m / max_speed
start_time = time.time()
while (time.time() - start_time) < time_to_travel:
await sio.emit('command', {
'id': 1,
'type': "move",
'value': {
'x': 0,
'y': 1,
'sensitivity': 1,
},
}, namespace='/vehicles', callback=on_move)
await asyncio.sleep(0.2)
movement_message = f"Команда движения отправлена, ожидайте {time_to_travel:.2f} секунд."
await write_report(movement_message)
print(movement_message)
await asyncio.sleep(time_to_travel)
await asyncio.sleep(1)
completion_message = f"Движение завершено: Пройденное расстояние {distance_m} м."
await write_report(completion_message)
print(completion_message)
Описание: Асинхронная функция, отправляющая команду на движение ровера.
Аргументы:
distance_cm
: расстояние, которое ровер должен пройти в сантиметрах.
Действие:
Вычисляет время для движения на заданное расстояние с максимальной скоростью.
В цикле отправляет команды движения через
sio.emit()
с типом команды"move"
.Ждет завершения движения (по времени).
Записывает результаты команд в отчет и выводит их на экран.
on_spin(result)
async def on_spin(result):
await write_report(f'Spin command done: {result}')
Описание: Обработчик обратного вызова для команды поворота.
Аргументы:
result
: результат выполнения команды поворота, переданный сервером.
Действие: Записывает результат команды поворота в отчет.
perform_spin_angle(angle, clockwise=True)
async def perform_spin_angle(angle, clockwise=True):
FULL_ROTATION_TIME = 1.1
ACCELERATION_FACTOR = 0.0005
rotation_time = (angle / 360) * FULL_ROTATION_TIME
total_time = rotation_time + (rotation_time * ACCELERATION_FACTOR)
start_time = time.time()
while (time.time() - start_time) < total_time:
await sio.emit('command', {
'id': 1,
'type': "spin",
'value': {
'state': True,
'direction': clockwise
},
}, namespace='/vehicles', callback=on_spin)
await asyncio.sleep(0.2)
spin_message = f"Начат поворот на {angle} градусов, {'по часовой стрелке' if clockwise else 'против часовой стрелки'}"
await write_report(spin_message)
print(spin_message)
completion_spin_message = f"Поворот на {angle} градусов завершён."
await write_report(completion_spin_message)
print(completion_spin_message)
Описание: Асинхронная функция, которая поворачивает ровер на заданный угол.
Аргументы:
angle
: угол поворота в градусах.clockwise
: направление поворота, по умолчанию — по часовой стрелке.
Действие:
Рассчитывает время, необходимое для поворота на заданный угол.
В цикле отправляет команду поворота через
sio.emit()
с типом команды"spin"
.Записывает сообщение о начале и завершении поворота в отчет и выводит их на экран.
async def on_lights(result):
await write_report(f'Lights command done: {result}')
Описание: Обработчик обратного вызова для команды управления светом.
Аргументы:
result
: результат выполнения команды, переданный сервером.
Действие: Записывает результат команды управления светом в отчет.
async def control_lights(on: bool):
# Команда для управления светом
lights_command = {
'id': 1,
'type': 'lights',
'value': on
}
# Отправляем команду на сервер
await sio.emit('command', lights_command, namespace='/vehicles', callback=on_lights)
# Записываем сообщение в файл отчета и выводим его в консоль
action = "включены" if on else "выключены"
message = f"Команда на включение света {action} отправлена."
await write_report(message)
print(message)
Описание: Асинхронная функция для управления светом ровера.
Аргументы:
on
: флаг включения света (True
для включения,False
для выключения).
Действие:
Создает команду для включения или выключения света.
Отправляет команду через
sio.emit()
с типом команды"lights"
.Записывает сообщение о включении/выключении света в отчет и выводит его на экран.
def find_closest_marker(markers, target_x, target_y):
closest_marker = None
min_distance = float('inf')
for marker in markers:
center_x = marker['center'][0]
center_y = marker['center'][1]
distance = np.sqrt((center_x - target_x)**2 + (center_y - target_y)**2)
if distance < min_distance:
closest_marker = marker
min_distance = distance
return closest_marker
Описание: Функция для поиска ближайшей метки ArUco к заданным координатам.
Аргументы:
markers
: список меток с их центрами.target_x
,target_y
: координаты целевой точки.
Действие:
Перебирает все метки и вычисляет расстояние до целевой точки.
Возвращает метку, которая находится ближе всего к целевой точке.
calculate_rotation_angle(marker_center, target_x)
def calculate_rotation_angle(marker_center, target_x):
marker_x = marker_center[0]
delta_x = target_x - marker_x
focal_length = 1000 # Пример фокусного расстояния камеры
angle_rad = np.arctan(delta_x / focal_length)
angle_deg = np.degrees(angle_rad)
return angle_deg
Описание: Функция для вычисления угла поворота камеры для наведения на метку.
Аргументы:
marker_center
: координаты центра метки.target_x
: целевая горизонтальная координата.
Действие:
Вычисляет горизонтальное смещение метки относительно целевой координаты.
Рассчитывает угол поворота в градусах с помощью арктангенса и возвращает его.
calculate_rotation_angles(marker_center, target_x)
def calculate_rotation_angles(marker_center, target_x):
marker_x = marker_center
delta_x = target_x - marker_x
focal_length = 1000 # Пример фокусного расстояния камеры
angle_rad = np.arctan(delta_x / focal_length)
angle_deg = np.degrees(angle_rad)
return angle_deg
Описание: Аналогична функции
calculate_rotation_angle
, но принимает только одну координату центра метки.Аргументы:
marker_center
: координата по оси X центра метки.target_x
: целевая горизонтальная координата.
Действие:
Вычисляет угол поворота, аналогично предыдущей функции, и возвращает результат.
aruco_detect(frame, closest_id)
def aruco_detect(frame,closest_id):
# Преобразование в оттенки серого и поиск меток
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_100)
parameters = cv2.aruco.DetectorParameters()
corners, ids, _ = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=parameters)
if ids is not None:
markers = []
mass_id_mark=[]
for i, marker_id in enumerate(ids):
marker_center = np.mean(corners[i][0], axis=0)
mass_id_mark.append([marker_id[0],corners[i][0]])
markers.append({'id': marker_id[0], 'center': marker_center})
# Находим ближайшую метку к x=360, y=10
target_x = 960
target_y = 1000
closest_marker = find_closest_marker(markers, target_x, target_y)
if closest_marker is not None:
# closest_id = closest_marker['id']
# closest_center = closest_marker['center']
# Выводим информацию о найденной метке
cv2.aruco.drawDetectedMarkers(frame, corners, ids)
# Находим индекс первого элемента массива, у которого первый элемент равен closest_id
y1=0
y2=0
ps=int(closest_id)
for sub_arr in mass_id_mark:
# Проверяем, если первый элемент массива равен 5
if sub_arr[0] == ps:
mass=sub_arr[1]
y1=int(mass[2][1])
y2=int(mass[3][1])
return y1,y2
return 0,0
Описание: Функция для обнаружения меток ArUco на изображении и нахождения вертикальных координат метки.
Аргументы:
frame
: кадр изображения, на котором выполняется детекция.closest_id
: идентификатор метки, которую нужно отслеживать.
Действие:
Преобразует кадр в оттенки серого и выполняет обнаружение меток ArUco.
Находит ближайшую метку.
Если метка найдена, возвращает координаты её сторон по вертикальной оси.
main()
async def main():
cap = cv2.VideoCapture("rtsp://localhost:8554/web")
await sio.connect(server, wait_timeout=20, namespaces=['/vehicles'], auth={"token": '000c2287-011a-4228-844e-e31z44b06d65'})
cv2.namedWindow('Frame',cv2.WINDOW_KEEPRATIO)
cv2.resizeWindow('Frame',640,640)
# while True:
# Считывание кадра
for i in range(1):
_,frame=cap.read()
# Преобразование в оттенки серого и поиск меток
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_100)
parameters = cv2.aruco.DetectorParameters()
corners, ids, _ = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=parameters)
if ids is not None:
markers = []
mass_id_mark=[]
for i, marker_id in enumerate(ids):
marker_center = np.mean(corners[i][0], axis=0)
markers.append({'id': marker_id[0], 'center': marker_center})
# Находим ближайшую метку к x=360, y=10
target_x = 960
target_y = 1000
closest_marker = find_closest_marker(markers, target_x, target_y)
if closest_marker is not None:
closest_id = closest_marker['id']
closest_center = closest_marker['center']
await control_lights(True)
# Выводим информацию о найденной метке
cv2.aruco.drawDetectedMarkers(frame, corners, ids)
# Вычисляем угол поворота камеры
angle_deg = calculate_rotation_angle(closest_center, target_x)
print(f"Closest marker ID: {closest_id}, Rotation angle needed: {angle_deg} degrees")
if angle_deg<0:
await perform_spin_angle(int(-angle_deg+10),True)
elif angle_deg>0:
await perform_spin_angle(int(angle_deg+10),False)
time.sleep(2)
y_start = closest_center[1]
y_end = 1920
# Коэффициент пропорциональности (должен быть настраиваемым в зависимости от вашей конкретной ситуации)
pixel_to_cm_ratio = 0.017
# Вычисление расстояния в сантиметрах
distance_cm = abs(y_end - y_start) * pixel_to_cm_ratio
await perform_movement(distance_cm)
ret, frame = cap.read()
y1,y2=aruco_detect(frame,closest_id.T)
if y1==0 and y2==0:
print("aruco is not detected")
else:
angle_virav = calculate_rotation_angles(y1,y2)
print(f"Closest marker ID: {closest_id}, Rotation angle needed: {angle_virav} degrees")
# Освобождение ресурсов
cv2.destroyAllWindows()
# Запускаем асинхронный цикл main()
asyncio.run(main())
Описание: Основная функция для управления ровером на основе детекции меток ArUco с камеры.
Действие:
Открывает видеопоток RTSP с камеры и подключается к серверу.
Обрабатывает кадры видео и находит ближайшую метку ArUco.
Выполняет поворот и движение ровера в зависимости от угла и расстояния до метки.
Повторно проверяет положение метки после движения и корректирует угол, если нужно.
РЕЗУЛЬТАТ РАБОТЫ КОДА И ЗАПОЛНЕНИЕ ОТЧЕТА
ПРИМЕЧАНИЕ: Как вы могли заметить, в данном коде есть две функции main: main и main2. main нужен для вызова базовых движений ровера, а main2 для выполнения кода работы с AruCo маркерами.
Когда вы запустите код, ровер начнет ехать примерно по такой траектории к ближайшему AruCo маркеру:
Параллельно движениям ровера в отчет записываются данные о том, какие движения делал ровер:

Last updated