Движение по AruCo-меткам
Этот код реализует систему управления ровером, которая взаимодействует с сервером через протокол Socket.IO, управляет движением, поворотами и освещением ровера, а также анализирует изображения с камеры для обнаружения меток ArUco.
import time
import signal
import cv2
import numpy as np
import socketio
import asyncio
import aiofiles
from datetime import datetime
server = 'http://192.168.10.115:3006/' # адресс вашего ровера
# Создаем экземпляр асинхронного клиента Socket.IO
sio = socketio.AsyncClient()
diameter_of_wheel = 0.24  # Диаметр колеса в метрах
circumference = 3.14159 * diameter_of_wheel  # Окружность колеса
async def write_report(message): # Функция создания автоматического отчета
    filename = f'Отчет_Фамилия_Имя.txt'
    async with aiofiles.open(filename, mode='a', encoding='utf-8') as file:
        await file.write(message + "\n")
# Определяем обработчик события "connect"
@sio.event
async def connect():
    await write_report('Подключено к серверу Socket.IO')
async def on_move(result):
    await write_report(f'Command done: {result}')
async def perform_movement(distance_cm):
    max_speed = 0.5  # Максимальная скорость м/с (примерная, можно изменить)
    distance_m = distance_cm / 100.0  # Переводим сантиметры в метры
    time_to_travel = distance_m / max_speed
    start_time = time.time()
    while (time.time() - start_time) < time_to_travel:
        await sio.emit('command', {
        'id': 1,
        'type': "move",
        'value': {  
            'x': 0,
            'y': 1,
            'sensitivity': 1,
        },
        }, namespace='/vehicles', callback=on_move)
        await asyncio.sleep(0.2)
    movement_message = f"Команда движения отправлена, ожидайте {time_to_travel:.2f} секунд."
    await write_report(movement_message)
    print(movement_message)
    await asyncio.sleep(time_to_travel)
    await asyncio.sleep(1)
    completion_message = f"Движение завершено: Пройденное расстояние {distance_m} м."
    await write_report(completion_message)
    print(completion_message)
async def on_spin(result):
    await write_report(f'Spin command done: {result}')
async def perform_spin_angle(angle, clockwise=True):
    FULL_ROTATION_TIME = 1.1
    ACCELERATION_FACTOR = 0.0005
    rotation_time = (angle / 360) * FULL_ROTATION_TIME
    total_time = rotation_time + (rotation_time * ACCELERATION_FACTOR)
    start_time = time.time()
    while (time.time() - start_time) < total_time:
        await sio.emit('command', {
            'id': 1,
            'type': "spin",
            'value': {
                'state': True,
                'direction': clockwise
            },
        }, namespace='/vehicles', callback=on_spin)
        await asyncio.sleep(0.2)
    
    spin_message = f"Начат поворот на {angle} градусов, {'по часовой стрелке' if clockwise else 'против часовой стрелки'}"
    await write_report(spin_message)
    print(spin_message)
    completion_spin_message = f"Поворот на {angle} градусов завершён."
    await write_report(completion_spin_message)
    print(completion_spin_message)
    
    
async def on_lights(result):
    await write_report(f'Lights command done: {result}')
async def control_lights(on: bool):
    # Команда для управления светом
    lights_command = {
        'id': 1,
        'type': 'lights',
        'value': on
    }
    # Отправляем команду на сервер
    await sio.emit('command', lights_command, namespace='/vehicles', callback=on_lights)
    # Записываем сообщение в файл отчета и выводим его в консоль
    action = "включены" if on else "выключены"
    message = f"Команда на включение света {action} отправлена."
    await write_report(message)
    print(message)
# Функция для нахождения ближайшей к заданным координатам метки
def find_closest_marker(markers, target_x, target_y):
    closest_marker = None
    min_distance = float('inf')
    for marker in markers:
        center_x = marker['center'][0]
        center_y = marker['center'][1]
        distance = np.sqrt((center_x - target_x)**2 + (center_y - target_y)**2)
        if distance < min_distance:
            closest_marker = marker
            min_distance = distance
    return closest_marker
# Функция для нахождения угла поворота камеры
def calculate_rotation_angle(marker_center, target_x):
    marker_x = marker_center[0]
    delta_x = target_x - marker_x
    focal_length = 1000  # Пример фокусного расстояния камеры
    angle_rad = np.arctan(delta_x / focal_length)
    angle_deg = np.degrees(angle_rad)
    return angle_deg
def calculate_rotation_angles(marker_center, target_x):
    marker_x = marker_center
    delta_x = target_x - marker_x
    focal_length = 1000  # Пример фокусного расстояния камеры
    angle_rad = np.arctan(delta_x / focal_length)
    angle_deg = np.degrees(angle_rad)
    return angle_deg
def aruco_detect(frame,closest_id):
    # Преобразование в оттенки серого и поиск меток
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_100)
        parameters = cv2.aruco.DetectorParameters()
        corners, ids, _ = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=parameters)
        if ids is not None:
            markers = []
            mass_id_mark=[]
            for i, marker_id in enumerate(ids):
                marker_center = np.mean(corners[i][0], axis=0)
                mass_id_mark.append([marker_id[0],corners[i][0]])
                markers.append({'id': marker_id[0], 'center': marker_center})
            # Находим ближайшую метку к x=360, y=10
            target_x = 960
            target_y = 1000
            closest_marker = find_closest_marker(markers, target_x, target_y)
            if closest_marker is not None:
                # closest_id = closest_marker['id']
                # closest_center = closest_marker['center']
                # Выводим информацию о найденной метке
                cv2.aruco.drawDetectedMarkers(frame, corners, ids)
                # Находим индекс первого элемента массива, у которого первый элемент равен closest_id
                y1=0
                y2=0
                ps=int(closest_id)
                for sub_arr in mass_id_mark:
                    # Проверяем, если первый элемент массива равен 5
                    if sub_arr[0] == ps:
                        mass=sub_arr[1] 
                        y1=int(mass[2][1])
                        y2=int(mass[3][1])  
                return y1,y2
        return 0,0
  
async def main():
    cap = cv2.VideoCapture("rtsp://localhost:8554/web")
    await sio.connect(server, wait_timeout=20, namespaces=['/vehicles'], auth={"token": '000c2287-011a-4228-844e-e31z44b06d65'})
    cv2.namedWindow('Frame',cv2.WINDOW_KEEPRATIO)
    cv2.resizeWindow('Frame',640,640)
    # while True:
        # Считывание кадра
    for i in range(1):
        _,frame=cap.read()
    
        # Преобразование в оттенки серого и поиск меток
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_100)
        parameters = cv2.aruco.DetectorParameters()
        corners, ids, _ = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=parameters)
        if ids is not None:
            markers = []
            mass_id_mark=[]
            for i, marker_id in enumerate(ids):
                marker_center = np.mean(corners[i][0], axis=0)
                markers.append({'id': marker_id[0], 'center': marker_center})
            # Находим ближайшую метку к x=360, y=10
            target_x = 960
            target_y = 1000
            closest_marker = find_closest_marker(markers, target_x, target_y)
            if closest_marker is not None:
                closest_id = closest_marker['id']
                closest_center = closest_marker['center']
                await control_lights(True)
                # Выводим информацию о найденной метке
                cv2.aruco.drawDetectedMarkers(frame, corners, ids)
                # Вычисляем угол поворота камеры
                angle_deg = calculate_rotation_angle(closest_center, target_x)
                print(f"Closest marker ID: {closest_id}, Rotation angle needed: {angle_deg} degrees")
                if angle_deg<0:
                    await perform_spin_angle(int(-angle_deg+10),True)
                elif angle_deg>0:
                    await perform_spin_angle(int(angle_deg+10),False)
                time.sleep(2)
                y_start = closest_center[1]
                y_end = 1920
                # Коэффициент пропорциональности (должен быть настраиваемым в зависимости от вашей конкретной ситуации)
                pixel_to_cm_ratio = 0.017
                # Вычисление расстояния в сантиметрах
                distance_cm = abs(y_end - y_start) * pixel_to_cm_ratio
                await perform_movement(distance_cm)
                ret, frame = cap.read()
                y1,y2=aruco_detect(frame,closest_id.T)
                if y1==0 and y2==0:
                    print("aruco is not detected")
                else:
                    angle_virav = calculate_rotation_angles(y1,y2)
                    print(f"Closest marker ID: {closest_id}, Rotation angle needed: {angle_virav} degrees")
    # Освобождение ресурсов
    cv2.destroyAllWindows()
# Запускаем асинхронный цикл main()
asyncio.run(main())Теперь разберем подробно этот код:
ИНИЦИАЛИЗАЦИЯ БИБЛИОТЕК
import time
import signal
import cv2
import numpy as np
import socketio
import asyncio
import aiofiles
from datetime import datetime- time: Стандартная библиотека Python для работы со временем. 
- signal: Стандартная библиотека Python для работы с сигналами ОС. 
- cv2 (OpenCV) Библиотека с открытым исходным кодом для компьютерного зрения. 
- numpy: Библиотека для работы с многомерными массивами и числовыми вычислениями. 
- socketio: Библиотека для общения по протоколу WebSocket и Socket.IO. 
- asyncio: Стандартная библиотека Python для написания асинхронного кода. 
- aiofiles: Библиотека для асинхронной работы с файлами. 
- datename: Стандартная библиотека Python для работы с датой и временем. 
ЭКЗЕМПЛЯР АСИНХРОННОГО КЛИЕНТА SOCKETIO
sio = socketio.AsyncClient()
diameter_of_wheel = 0.24  # Диаметр колеса в метрах
circumference = 3.14159 * diameter_of_wheel # Окружность колесаОПИСАНИЕ ФУНКЦИЙ КОДА
- write_report(message) 
async def write_report(message): # Функция создания автоматического отчета
    filename = f'Отчет_Фамилия_Имя.txt'
    async with aiofiles.open(filename, mode='a', encoding='utf-8') as file:
        await file.write(message + "\n")- Описание: Асинхронная функция для записи сообщений в файл отчета. 
- Аргументы: - message: строка с сообщением, которое нужно записать.
 
- Действие: Открывает или создает файл отчета с именем - Отчет_Фамилия_Имя.txtи записывает в него сообщение в асинхронном режиме. Сообщение записывается в конце файла.
- connect() 
@sio.event
async def connect():
    await write_report('Подключено к серверу Socket.IO')- Описание: Асинхронный обработчик события - connect, вызывается при успешном подключении к серверу через Socket.IO.
- Действие: Записывает сообщение о подключении к серверу в отчет с помощью - write_report().
- on_move(result) 
async def on_move(result):
    await write_report(f'Command done: {result}')- Описание: Обработчик обратного вызова, который вызывается после выполнения команды движения. 
- Аргументы: - result: результат выполнения команды, переданный сервером.
 
- Действие: Записывает результат команды движения в отчет с помощью - write_report().
- perform_movement(distance_cm) 
async def perform_movement(distance_cm):
    max_speed = 0.5  # Максимальная скорость м/с (примерная, можно изменить)
    distance_m = distance_cm / 100.0  # Переводим сантиметры в метры
    time_to_travel = distance_m / max_speed
    start_time = time.time()
    while (time.time() - start_time) < time_to_travel:
        await sio.emit('command', {
        'id': 1,
        'type': "move",
        'value': {  
            'x': 0,
            'y': 1,
            'sensitivity': 1,
        },
        }, namespace='/vehicles', callback=on_move)
        await asyncio.sleep(0.2)
    movement_message = f"Команда движения отправлена, ожидайте {time_to_travel:.2f} секунд."
    await write_report(movement_message)
    print(movement_message)
    await asyncio.sleep(time_to_travel)
    await asyncio.sleep(1)
    completion_message = f"Движение завершено: Пройденное расстояние {distance_m} м."
    await write_report(completion_message)
    print(completion_message)- Описание: Асинхронная функция, отправляющая команду на движение ровера. 
- Аргументы: - distance_cm: расстояние, которое ровер должен пройти в сантиметрах.
 
- Действие: - Вычисляет время для движения на заданное расстояние с максимальной скоростью. 
- В цикле отправляет команды движения через - sio.emit()с типом команды- "move".
- Ждет завершения движения (по времени). 
- Записывает результаты команд в отчет и выводит их на экран. 
 
- on_spin(result) 
async def on_spin(result):
    await write_report(f'Spin command done: {result}')- Описание: Обработчик обратного вызова для команды поворота. 
- Аргументы: - result: результат выполнения команды поворота, переданный сервером.
 
- Действие: Записывает результат команды поворота в отчет. 
- perform_spin_angle(angle, clockwise=True) 
async def perform_spin_angle(angle, clockwise=True):
    FULL_ROTATION_TIME = 1.1
    ACCELERATION_FACTOR = 0.0005
    rotation_time = (angle / 360) * FULL_ROTATION_TIME
    total_time = rotation_time + (rotation_time * ACCELERATION_FACTOR)
    start_time = time.time()
    while (time.time() - start_time) < total_time:
        await sio.emit('command', {
            'id': 1,
            'type': "spin",
            'value': {
                'state': True,
                'direction': clockwise
            },
        }, namespace='/vehicles', callback=on_spin)
        await asyncio.sleep(0.2)
    
    spin_message = f"Начат поворот на {angle} градусов, {'по часовой стрелке' if clockwise else 'против часовой стрелки'}"
    await write_report(spin_message)
    print(spin_message)
    completion_spin_message = f"Поворот на {angle} градусов завершён."
    await write_report(completion_spin_message)
    print(completion_spin_message)- Описание: Асинхронная функция, которая поворачивает ровер на заданный угол. 
- Аргументы: - angle: угол поворота в градусах.
- clockwise: направление поворота, по умолчанию — по часовой стрелке.
 
- Действие: - Рассчитывает время, необходимое для поворота на заданный угол. 
- В цикле отправляет команду поворота через - sio.emit()с типом команды- "spin".
- Записывает сообщение о начале и завершении поворота в отчет и выводит их на экран. 
 
async def on_lights(result):
    await write_report(f'Lights command done: {result}')- Описание: Обработчик обратного вызова для команды управления светом. 
- Аргументы: - result: результат выполнения команды, переданный сервером.
 
- Действие: Записывает результат команды управления светом в отчет. 
async def control_lights(on: bool):
    # Команда для управления светом
    lights_command = {
        'id': 1,
        'type': 'lights',
        'value': on
    }
    # Отправляем команду на сервер
    await sio.emit('command', lights_command, namespace='/vehicles', callback=on_lights)
    # Записываем сообщение в файл отчета и выводим его в консоль
    action = "включены" if on else "выключены"
    message = f"Команда на включение света {action} отправлена."
    await write_report(message)
    print(message)- Описание: Асинхронная функция для управления светом ровера. 
- Аргументы: - on: флаг включения света (- Trueдля включения,- Falseдля выключения).
 
- Действие: - Создает команду для включения или выключения света. 
- Отправляет команду через - sio.emit()с типом команды- "lights".
- Записывает сообщение о включении/выключении света в отчет и выводит его на экран. 
 
def find_closest_marker(markers, target_x, target_y):
    closest_marker = None
    min_distance = float('inf')
    for marker in markers:
        center_x = marker['center'][0]
        center_y = marker['center'][1]
        distance = np.sqrt((center_x - target_x)**2 + (center_y - target_y)**2)
        if distance < min_distance:
            closest_marker = marker
            min_distance = distance
    return closest_marker- Описание: Функция для поиска ближайшей метки ArUco к заданным координатам. 
- Аргументы: - markers: список меток с их центрами.
- target_x,- target_y: координаты целевой точки.
 
- Действие: - Перебирает все метки и вычисляет расстояние до целевой точки. 
- Возвращает метку, которая находится ближе всего к целевой точке. 
 
- calculate_rotation_angle(marker_center, target_x) 
def calculate_rotation_angle(marker_center, target_x):
    marker_x = marker_center[0]
    delta_x = target_x - marker_x
    focal_length = 1000  # Пример фокусного расстояния камеры
    angle_rad = np.arctan(delta_x / focal_length)
    angle_deg = np.degrees(angle_rad)
    return angle_deg- Описание: Функция для вычисления угла поворота камеры для наведения на метку. 
- Аргументы: - marker_center: координаты центра метки.
- target_x: целевая горизонтальная координата.
 
- Действие: - Вычисляет горизонтальное смещение метки относительно целевой координаты. 
- Рассчитывает угол поворота в градусах с помощью арктангенса и возвращает его. 
 
- calculate_rotation_angles(marker_center, target_x) 
def calculate_rotation_angles(marker_center, target_x):
    marker_x = marker_center
    delta_x = target_x - marker_x
    focal_length = 1000  # Пример фокусного расстояния камеры
    angle_rad = np.arctan(delta_x / focal_length)
    angle_deg = np.degrees(angle_rad)
    return angle_deg- Описание: Аналогична функции - calculate_rotation_angle, но принимает только одну координату центра метки.
- Аргументы: - marker_center: координата по оси X центра метки.
- target_x: целевая горизонтальная координата.
 
- Действие: - Вычисляет угол поворота, аналогично предыдущей функции, и возвращает результат. 
 
- aruco_detect(frame, closest_id) 
def aruco_detect(frame,closest_id):
    # Преобразование в оттенки серого и поиск меток
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_100)
        parameters = cv2.aruco.DetectorParameters()
        corners, ids, _ = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=parameters)
        if ids is not None:
            markers = []
            mass_id_mark=[]
            for i, marker_id in enumerate(ids):
                marker_center = np.mean(corners[i][0], axis=0)
                mass_id_mark.append([marker_id[0],corners[i][0]])
                markers.append({'id': marker_id[0], 'center': marker_center})
            # Находим ближайшую метку к x=360, y=10
            target_x = 960
            target_y = 1000
            closest_marker = find_closest_marker(markers, target_x, target_y)
            if closest_marker is not None:
                # closest_id = closest_marker['id']
                # closest_center = closest_marker['center']
                # Выводим информацию о найденной метке
                cv2.aruco.drawDetectedMarkers(frame, corners, ids)
                # Находим индекс первого элемента массива, у которого первый элемент равен closest_id
                y1=0
                y2=0
                ps=int(closest_id)
                for sub_arr in mass_id_mark:
                    # Проверяем, если первый элемент массива равен 5
                    if sub_arr[0] == ps:
                        mass=sub_arr[1] 
                        y1=int(mass[2][1])
                        y2=int(mass[3][1])  
                return y1,y2
        return 0,0- Описание: Функция для обнаружения меток ArUco на изображении и нахождения вертикальных координат метки. 
- Аргументы: - frame: кадр изображения, на котором выполняется детекция.
- closest_id: идентификатор метки, которую нужно отслеживать.
 
- Действие: - Преобразует кадр в оттенки серого и выполняет обнаружение меток ArUco. 
- Находит ближайшую метку. 
- Если метка найдена, возвращает координаты её сторон по вертикальной оси. 
 
- main() 
async def main():
    cap = cv2.VideoCapture("rtsp://localhost:8554/web")
    await sio.connect(server, wait_timeout=20, namespaces=['/vehicles'], auth={"token": '000c2287-011a-4228-844e-e31z44b06d65'})
    cv2.namedWindow('Frame',cv2.WINDOW_KEEPRATIO)
    cv2.resizeWindow('Frame',640,640)
    # while True:
        # Считывание кадра
    for i in range(1):
        _,frame=cap.read()
    
        # Преобразование в оттенки серого и поиск меток
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_100)
        parameters = cv2.aruco.DetectorParameters()
        corners, ids, _ = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=parameters)
        if ids is not None:
            markers = []
            mass_id_mark=[]
            for i, marker_id in enumerate(ids):
                marker_center = np.mean(corners[i][0], axis=0)
                markers.append({'id': marker_id[0], 'center': marker_center})
            # Находим ближайшую метку к x=360, y=10
            target_x = 960
            target_y = 1000
            closest_marker = find_closest_marker(markers, target_x, target_y)
            if closest_marker is not None:
                closest_id = closest_marker['id']
                closest_center = closest_marker['center']
                await control_lights(True)
                # Выводим информацию о найденной метке
                cv2.aruco.drawDetectedMarkers(frame, corners, ids)
                # Вычисляем угол поворота камеры
                angle_deg = calculate_rotation_angle(closest_center, target_x)
                print(f"Closest marker ID: {closest_id}, Rotation angle needed: {angle_deg} degrees")
                if angle_deg<0:
                    await perform_spin_angle(int(-angle_deg+10),True)
                elif angle_deg>0:
                    await perform_spin_angle(int(angle_deg+10),False)
                time.sleep(2)
                y_start = closest_center[1]
                y_end = 1920
                # Коэффициент пропорциональности (должен быть настраиваемым в зависимости от вашей конкретной ситуации)
                pixel_to_cm_ratio = 0.017
                # Вычисление расстояния в сантиметрах
                distance_cm = abs(y_end - y_start) * pixel_to_cm_ratio
                await perform_movement(distance_cm)
                ret, frame = cap.read()
                y1,y2=aruco_detect(frame,closest_id.T)
                if y1==0 and y2==0:
                    print("aruco is not detected")
                else:
                    angle_virav = calculate_rotation_angles(y1,y2)
                    print(f"Closest marker ID: {closest_id}, Rotation angle needed: {angle_virav} degrees")
    # Освобождение ресурсов
    cv2.destroyAllWindows()
# Запускаем асинхронный цикл main()
asyncio.run(main())- Описание: Основная функция для управления ровером на основе детекции меток ArUco с камеры. 
- Действие: - Открывает видеопоток RTSP с камеры и подключается к серверу. 
- Обрабатывает кадры видео и находит ближайшую метку ArUco. 
- Выполняет поворот и движение ровера в зависимости от угла и расстояния до метки. 
- Повторно проверяет положение метки после движения и корректирует угол, если нужно. 
 
РЕЗУЛЬТАТ РАБОТЫ КОДА И ЗАПОЛНЕНИЕ ОТЧЕТА
ПРИМЕЧАНИЕ: Как вы могли заметить, в данном коде есть две функции main: main и main2. main нужен для вызова базовых движений ровера, а main2 для выполнения кода работы с AruCo маркерами.
Когда вы запустите код, ровер начнет ехать примерно по такой траектории к ближайшему AruCo маркеру:
Параллельно движениям ровера в отчет записываются данные о том, какие движения делал ровер:

Последнее обновление
