Этот код реализует систему управления ровером, которая взаимодействует с сервером через протокол Socket.IO, управляет движением, поворотами и освещением ровера, а также анализирует изображения с камеры для обнаружения меток ArUco.
import timeimport signalimport cv2import numpy as npimport socketioimport asyncioimport aiofilesfrom datetime import datetimeserver ='http://192.168.10.115:3006/'# адресс вашего ровера# Создаем экземпляр асинхронного клиента Socket.IOsio = socketio.AsyncClient()diameter_of_wheel =0.24# Диаметр колеса в метрахcircumference =3.14159* diameter_of_wheel # Окружность колесаasyncdefwrite_report(message): # Функция создания автоматического отчета filename = f'Отчет_Фамилия_Имя.txt'asyncwith aiofiles.open(filename, mode='a', encoding='utf-8')as file:await file.write(message +"\n")# Определяем обработчик события "connect"@sio.eventasyncdefconnect():awaitwrite_report('Подключено к серверу Socket.IO')asyncdefon_move(result):awaitwrite_report(f'Command done: {result}')asyncdefperform_movement(distance_cm): max_speed =0.5# Максимальная скорость м/с (примерная, можно изменить) distance_m = distance_cm /100.0# Переводим сантиметры в метры time_to_travel = distance_m / max_speed start_time = time.time()while (time.time()- start_time) < time_to_travel:await sio.emit('command', {'id': 1,'type': "move",'value': { 'x': 0,'y': 1,'sensitivity': 1, }, }, namespace='/vehicles', callback=on_move)await asyncio.sleep(0.2) movement_message = f"Команда движения отправлена, ожидайте {time_to_travel:.2f} секунд."awaitwrite_report(movement_message)print(movement_message)await asyncio.sleep(time_to_travel)await asyncio.sleep(1) completion_message = f"Движение завершено: Пройденное расстояние {distance_m} м."awaitwrite_report(completion_message)print(completion_message)asyncdefon_spin(result):awaitwrite_report(f'Spin command done: {result}')asyncdefperform_spin_angle(angle,clockwise=True): FULL_ROTATION_TIME =1.1 ACCELERATION_FACTOR =0.0005 rotation_time = (angle /360) * FULL_ROTATION_TIME total_time = rotation_time + (rotation_time * ACCELERATION_FACTOR) start_time = time.time()while (time.time()- start_time) < total_time:await sio.emit('command', {'id': 1,'type': "spin",'value': {'state': True,'direction': clockwise }, }, namespace='/vehicles', callback=on_spin)await asyncio.sleep(0.2) spin_message = f"Начат поворот на {angle} градусов, {'по часовой стрелке' if clockwise else 'против часовой стрелки'}"
awaitwrite_report(spin_message)print(spin_message) completion_spin_message = f"Поворот на {angle} градусов завершён."awaitwrite_report(completion_spin_message)print(completion_spin_message)asyncdefon_lights(result):awaitwrite_report(f'Lights command done: {result}')asyncdefcontrol_lights(on:bool):# Команда для управления светом lights_command ={'id':1,'type':'lights','value': on}# Отправляем команду на серверawait sio.emit('command', lights_command, namespace='/vehicles', callback=on_lights)# Записываем сообщение в файл отчета и выводим его в консоль action ="включены"if on else"выключены" message = f"Команда на включение света {action} отправлена."awaitwrite_report(message)print(message)asyncdefmain(): # Код выполнения подключения и выполнения команд движения Ровера Контакт await sio.connect(server, wait_timeout=20, namespaces=['/vehicles'], auth={"token": '000c2287-011a-4228-844e-e31z44b06d65'})
# for i in range(4):# await perform_movement(20)awaitperform_spin_angle(90, clockwise=False)# await perform_spin_angle(0, clockwise=False)# await control_lights(True)# await asyncio.sleep(0.1)# await control_lights(False)# await asyncio.sleep(0.5)# await control_lights(True)# await asyncio.sleep(0.5)# await control_lights(False)# await asyncio.sleep(0.5)# await control_lights(True)# await asyncio.sleep(0.5)# await control_lights(False)# await asyncio.sleep(0.5)await sio.disconnect()# Закрываем соединение после завершения всех операций# Функция для нахождения ближайшей к заданным координатам меткиdeffind_closest_marker(markers,target_x,target_y): closest_marker =None min_distance =float('inf')for marker in markers: center_x = marker['center'][0] center_y = marker['center'][1] distance = np.sqrt((center_x - target_x)**2+ (center_y - target_y)**2)if distance < min_distance: closest_marker = marker min_distance = distancereturn closest_marker# Функция для нахождения угла поворота камерыdefcalculate_rotation_angle(marker_center,target_x): marker_x = marker_center[0] delta_x = target_x - marker_x focal_length =1000# Пример фокусного расстояния камеры angle_rad = np.arctan(delta_x / focal_length) angle_deg = np.degrees(angle_rad)return angle_degdefcalculate_rotation_angles(marker_center,target_x): marker_x = marker_center delta_x = target_x - marker_x focal_length =1000# Пример фокусного расстояния камеры angle_rad = np.arctan(delta_x / focal_length) angle_deg = np.degrees(angle_rad)return angle_degdefaruco_detect(frame,closest_id):# Преобразование в оттенки серого и поиск меток gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_100) parameters = cv2.aruco.DetectorParameters() corners, ids, _ = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=parameters)if ids isnotNone: markers = [] mass_id_mark=[]for i, marker_id inenumerate(ids): marker_center = np.mean(corners[i][0], axis=0) mass_id_mark.append([marker_id[0],corners[i][0]]) markers.append({'id': marker_id[0], 'center': marker_center})# Находим ближайшую метку к x=360, y=10 target_x =960 target_y =1000 closest_marker =find_closest_marker(markers, target_x, target_y)if closest_marker isnotNone:# closest_id = closest_marker['id']# closest_center = closest_marker['center']# Выводим информацию о найденной метке cv2.aruco.drawDetectedMarkers(frame, corners, ids)# Находим индекс первого элемента массива, у которого первый элемент равен closest_id y1=0 y2=0 ps=int(closest_id)for sub_arr in mass_id_mark:# Проверяем, если первый элемент массива равен 5if sub_arr[0]== ps: mass=sub_arr[1] y1=int(mass[2][1]) y2=int(mass[3][1])return y1,y2return0,0asyncdefmain2(): cap = cv2.VideoCapture("rtsp://localhost:8554/web") await sio.connect(server, wait_timeout=20, namespaces=['/vehicles'], auth={"token": '000c2287-011a-4228-844e-e31z44b06d65'})
cv2.namedWindow('Frame',cv2.WINDOW_KEEPRATIO) cv2.resizeWindow('Frame',640,640)# while True:# Считывание кадраfor i inrange(1): _,frame=cap.read()# Преобразование в оттенки серого и поиск меток gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_100) parameters = cv2.aruco.DetectorParameters() corners, ids, _ = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=parameters)if ids isnotNone: markers = [] mass_id_mark=[]for i, marker_id inenumerate(ids): marker_center = np.mean(corners[i][0], axis=0) markers.append({'id': marker_id[0], 'center': marker_center})# Находим ближайшую метку к x=360, y=10 target_x =960 target_y =1000 closest_marker =find_closest_marker(markers, target_x, target_y)if closest_marker isnotNone: closest_id = closest_marker['id'] closest_center = closest_marker['center']awaitcontrol_lights(True)# Выводим информацию о найденной метке cv2.aruco.drawDetectedMarkers(frame, corners, ids)# Вычисляем угол поворота камеры angle_deg =calculate_rotation_angle(closest_center, target_x)print(f"Closest marker ID: {closest_id}, Rotation angle needed: {angle_deg} degrees")if angle_deg<0:awaitperform_spin_angle(int(-angle_deg+10),True)elif angle_deg>0:awaitperform_spin_angle(int(angle_deg+10),False) time.sleep(2) y_start = closest_center[1] y_end =1920# Коэффициент пропорциональности (должен быть настраиваемым в зависимости от вашей конкретной ситуации) pixel_to_cm_ratio =0.017# Вычисление расстояния в сантиметрах distance_cm =abs(y_end - y_start)* pixel_to_cm_ratioawaitperform_movement(distance_cm) ret, frame = cap.read() y1,y2=aruco_detect(frame,closest_id.T)if y1==0and y2==0:print("aruco is not detected")else: angle_virav =calculate_rotation_angles(y1,y2)print(f"Closest marker ID: {closest_id}, Rotation angle needed: {angle_virav} degrees")# Освобождение ресурсов cv2.destroyAllWindows()# Запускаем асинхронный цикл main()asyncio.run(main2())
asyncdefwrite_report(message): # Функция создания автоматического отчета filename = f'Отчет_Фамилия_Имя.txt'asyncwith aiofiles.open(filename, mode='a', encoding='utf-8')as file:await file.write(message +"\n")
Описание: Асинхронная функция для записи сообщений в файл отчета.
Аргументы:
message: строка с сообщением, которое нужно записать.
Действие: Открывает или создает файл отчета с именем Отчет_Фамилия_Имя.txt и записывает в него сообщение в асинхронном режиме. Сообщение записывается в конце файла.
connect()
@sio.eventasyncdefconnect():awaitwrite_report('Подключено к серверу Socket.IO')
Описание: Асинхронный обработчик события connect, вызывается при успешном подключении к серверу через Socket.IO.
Действие: Записывает сообщение о подключении к серверу в отчет с помощью write_report().
Описание: Обработчик обратного вызова для команды управления светом.
Аргументы:
result: результат выполнения команды, переданный сервером.
Действие: Записывает результат команды управления светом в отчет.
control_lights(on: bool)
asyncdefcontrol_lights(on:bool):# Команда для управления светом lights_command ={'id':1,'type':'lights','value': on}# Отправляем команду на серверawait sio.emit('command', lights_command, namespace='/vehicles', callback=on_lights)# Записываем сообщение в файл отчета и выводим его в консоль action ="включены"if on else"выключены" message = f"Команда на включение света {action} отправлена."awaitwrite_report(message)print(message)
Описание: Асинхронная функция для управления светом ровера.
Аргументы:
on: флаг включения света (True для включения, False для выключения).
Действие:
Создает команду для включения или выключения света.
Отправляет команду через sio.emit() с типом команды "lights".
Записывает сообщение о включении/выключении света в отчет и выводит его на экран.
main()
asyncdefmain(): # Код выполнения подключения и выполнения команд движения Ровера Контакт await sio.connect(server, wait_timeout=20, namespaces=['/vehicles'], auth={"token": '000c2287-011a-4228-844e-e31z44b06d65'})
# for i in range(4):# await perform_movement(20)awaitperform_spin_angle(90, clockwise=False)# await perform_spin_angle(0, clockwise=False)# await control_lights(True)# await asyncio.sleep(0.1)# await control_lights(False)# await asyncio.sleep(0.5)# await control_lights(True)# await asyncio.sleep(0.5)# await control_lights(False)# await asyncio.sleep(0.5)# await control_lights(True)# await asyncio.sleep(0.5)# await control_lights(False)# await asyncio.sleep(0.5)await sio.disconnect()# Закрываем соединение после завершения всех операций
Описание: Основная асинхронная функция, которая подключается к серверу и выполняет команды управления ровером.
Действие:
Подключается к серверу через Socket.IO.
(Отключенные строки) Выполняет серию команд движения и управления светом.
Выполняет команду поворота ровера на 90 градусов против часовой стрелки.
Описание: Аналогична функции calculate_rotation_angle, но принимает только одну координату центра метки.
Аргументы:
marker_center: координата по оси X центра метки.
target_x: целевая горизонтальная координата.
Действие:
Вычисляет угол поворота, аналогично предыдущей функции, и возвращает результат.
aruco_detect(frame, closest_id)
defaruco_detect(frame,closest_id):# Преобразование в оттенки серого и поиск меток gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_100) parameters = cv2.aruco.DetectorParameters() corners, ids, _ = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=parameters)if ids isnotNone: markers = [] mass_id_mark=[]for i, marker_id inenumerate(ids): marker_center = np.mean(corners[i][0], axis=0) mass_id_mark.append([marker_id[0],corners[i][0]]) markers.append({'id': marker_id[0], 'center': marker_center})# Находим ближайшую метку к x=360, y=10 target_x =960 target_y =1000 closest_marker =find_closest_marker(markers, target_x, target_y)if closest_marker isnotNone:# closest_id = closest_marker['id']# closest_center = closest_marker['center']# Выводим информацию о найденной метке cv2.aruco.drawDetectedMarkers(frame, corners, ids)# Находим индекс первого элемента массива, у которого первый элемент равен closest_id y1=0 y2=0 ps=int(closest_id)for sub_arr in mass_id_mark:# Проверяем, если первый элемент массива равен 5if sub_arr[0]== ps: mass=sub_arr[1] y1=int(mass[2][1]) y2=int(mass[3][1])return y1,y2return0,0
Описание: Функция для обнаружения меток ArUco на изображении и нахождения вертикальных координат метки.
Аргументы:
frame: кадр изображения, на котором выполняется детекция.
closest_id: идентификатор метки, которую нужно отслеживать.
Действие:
Преобразует кадр в оттенки серого и выполняет обнаружение меток ArUco.
Находит ближайшую метку.
Если метка найдена, возвращает координаты её сторон по вертикальной оси.
main2()
asyncdefmain2(): cap = cv2.VideoCapture("rtsp://localhost:8554/web") await sio.connect(server, wait_timeout=20, namespaces=['/vehicles'], auth={"token": '000c2287-011a-4228-844e-e31z44b06d65'})
cv2.namedWindow('Frame',cv2.WINDOW_KEEPRATIO) cv2.resizeWindow('Frame',640,640)# while True:# Считывание кадраfor i inrange(1): _,frame=cap.read()# Преобразование в оттенки серого и поиск меток gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_100) parameters = cv2.aruco.DetectorParameters() corners, ids, _ = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=parameters)if ids isnotNone: markers = [] mass_id_mark=[]for i, marker_id inenumerate(ids): marker_center = np.mean(corners[i][0], axis=0) markers.append({'id': marker_id[0], 'center': marker_center})# Находим ближайшую метку к x=360, y=10 target_x =960 target_y =1000 closest_marker =find_closest_marker(markers, target_x, target_y)if closest_marker isnotNone: closest_id = closest_marker['id'] closest_center = closest_marker['center']awaitcontrol_lights(True)# Выводим информацию о найденной метке cv2.aruco.drawDetectedMarkers(frame, corners, ids)# Вычисляем угол поворота камеры angle_deg =calculate_rotation_angle(closest_center, target_x)print(f"Closest marker ID: {closest_id}, Rotation angle needed: {angle_deg} degrees")if angle_deg<0:awaitperform_spin_angle(int(-angle_deg+10),True)elif angle_deg>0:awaitperform_spin_angle(int(angle_deg+10),False) time.sleep(2) y_start = closest_center[1] y_end =1920# Коэффициент пропорциональности (должен быть настраиваемым в зависимости от вашей конкретной ситуации) pixel_to_cm_ratio =0.017# Вычисление расстояния в сантиметрах distance_cm =abs(y_end - y_start)* pixel_to_cm_ratioawaitperform_movement(distance_cm) ret, frame = cap.read() y1,y2=aruco_detect(frame,closest_id.T)if y1==0and y2==0:print("aruco is not detected")else: angle_virav =calculate_rotation_angles(y1,y2)print(f"Closest marker ID: {closest_id}, Rotation angle needed: {angle_virav} degrees")# Освобождение ресурсов cv2.destroyAllWindows()# Запускаем асинхронный цикл main()asyncio.run(main2())
Описание: Основная функция для управления ровером на основе детекции меток ArUco с камеры.
Действие:
Открывает видеопоток RTSP с камеры и подключается к серверу.
Обрабатывает кадры видео и находит ближайшую метку ArUco.
Выполняет поворот и движение ровера в зависимости от угла и расстояния до метки.
Повторно проверяет положение метки после движения и корректирует угол, если нужно.
РЕЗУЛЬТАТ РАБОТЫ КОДА И ЗАПОЛНЕНИЕ ОТЧЕТА
ПРИМЕЧАНИЕ: Как вы могли заметить, в данном коде есть две функции main: main и main2. main нужен для вызова базовых движений ровера, а main2 для выполнения кода работы с AruCo маркерами.
Когда вы запустите код, ровер начнет ехать примерно по такой траектории к ближайшему AruCo маркеру:
Параллельно движениям ровера в отчет записываются данные о том, какие движения делал ровер: