Файл video_processing.py представляет собой мощный инструмент для работы с сетевыми соединениями и асинхронной обработкой данных. Он позволяет взаимодействовать с серверами для получения данных маркеров ArUco, данных RealSense и детекций YOLO, что необходимо для эффективной обработки информации и управления робототехникой. Каждый класс выполняет конкретные сетевые операции, обеспечивая необходимую функциональность для разработки автономных систем.
asyncio: Этот модуль предоставляет поддержку асинхронного ввода-вывода, циклов событий, сопрограмм и задач. Он используется для написания однопоточного параллельного кода с использованием сопрограмм, мультиплексирования доступа к вводу-выводу через сокеты и другие ресурсы, запуска сетевых клиентов и серверов, а также других связанных примитивов.
json: Этот модуль предоставляет функции для кодирования объектов Python в формат JSON и декодирования JSON-строк в объекты Python. Он широко используется для обмена данными между приложениями.
os: Этот модуль предоставляет способ использования функциональности, зависящей от операционной системы. Он включает функции для взаимодействия с файловой системой, управления переменными окружения и контроля процессов.
re: Этот модуль предоставляет поддержку регулярных выражений. Он используется для поиска, сопоставления и манипулирования текстом на основе шаблонов.
socket: Этот модуль предоставляет низкоуровневые сетевые интерфейсы, позволяющие осуществлять сетевое взаимодействие через сокеты.
subprocess: Этот модуль позволяет порождать новые процессы, подключаться к их каналам ввода/вывода/ошибок и получать их коды возврата.
sys: Этот модуль предоставляет доступ к некоторым переменным, используемым или поддерживаемым интерпретатором, а также к функциям, которые сильно взаимодействуют с интерпретатором.
threading: Этот модуль строится на низкоуровневых возможностях потоков, чтобы сделать работу с потоками еще проще и более питоничной.
time: Этот модуль предоставляет различные функции, связанные со временем.
cv2: Это библиотека OpenCV, которая используется для задач компьютерного зрения, таких как обработка изображений и видео, обнаружение объектов и отслеживание.
cv2.aruco: Это подмодуль OpenCV, который предоставляет инструменты для работы с маркерами Aruco, часто используемыми в робототехнике и дополненной реальности.
logging: Этот модуль предоставляет гибкую систему логирования для приложений и библиотек.
ultralytics.YOLO: Это, вероятно, модуль из библиотеки Ultralytics, который предоставляет реализацию модели YOLO (You Only Look Once) для обнаружения объектов в реальном времени.
ИНИЦИАЛИЗАЦИЯ КЛАССА Connect_aruco
classConnect_aruco: """ Класс для установления соединения с сервером и обработки данных ArUco. Атрибуты: - ports (int): Порт, используемый для соединения. - data_aruco (str): Данные, полученные от сервера. - host (str): IP-адрес сервера. - port (int): Порт сервера. - server_socket (socket.socket): Сокет сервера. - socket_thread (threading.Thread): Поток для асинхронной задачи. Методы: - __init__(self, IP, PORT, rtsp): Конструктор класса. - stop(self): Останавливает соединение и поток. - async_task(self): Асинхронная задача для обработки данных. - aruco_detect(self, timeout=5): Асинхронно обрабатывает данные от сервера. """def__init__(self,IP,PORT,rtsp): """ Конструктор класса. Параметры: - IP (str): IP-адрес сервера. - PORT (int): Порт сервера. - rtsp (str): RTSP адрес потока. Описание: Инициализирует сокет сервера, создает поток для асинхронной задачи и запускает процесс ArUco. """ self.ports=0 self.data_aruco=None self.host = IP self.port = PORT port_data = f'{self.host}:{self.port},{rtsp}'# создаем переменную для работы с аргументами между файлов self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) self.socket_thread = threading.Thread(target=self.async_task, daemon=True) self.socket_thread.start() sys.argv.append(port_data)# объявляем метод через sys commands = f'python3.11 aruco.py {" ".join(sys.argv[1:])}' processes = subprocess.Popen(commands, shell=True) time.sleep(3)
Здесь IP и PORT — это параметры, которые используются для установления соединения с сервером. RTSP — это адрес потока, используемый для обработки данных. Эти параметры передаются при создании объекта класса, чтобы затем получать и обрабатывать данные с сервера.
МЕТОД ОСТАНОВКИ (в классе Connect_aruco)
defstop(self): """ Останавливает соединение и поток. Описание: Завершает процесс ArUco, используя PID, и ожидает завершения потока. """ command = f'netstat -a -n -o | find "{self.ports}"' output = subprocess.check_output(command, shell=True, text=True) pattern =r'ESTABLISHED\s+(\d+)' pids = re.findall(pattern, output) sys='taskkill /F /PID '+str(pids[1]) os.system(sys) self.socket_thread.join()
Метод stop завершает соединение и остановливает все запущенные процессы и потоки.
МЕТОД АСИНХРОННОЙ ЗАДАЧИ (в классе Connect_aruco)
defasync_task(self): """ Асинхронная задача для обработки данных. Описание: Принимает соединения от клиентов и обрабатывает полученные данные. """try:whileTrue: client_socket, addr = self.server_socket.accept()print(f"Connected by {addr}") self.ports=addr[1]try:whileTrue: data = client_socket.recv(1024).decode('utf-8')ifnot data:breakifnot data=='None':# print(f"Получены данные: {data}") self.data_aruco=data else:# print("Даты нет") self.data_aruco=""finally: client_socket.close()print("Connection closed.")exceptException:pass
Метод async_task обрабатывает входящие соединения и принимает данные от клиентов асинхронно.
МЕТОД ОБНАРУЖЕНИЯ ARUCO (в классе Connect_aruco)
asyncdefaruco_detect(self,timeout=5):""" Асинхронно обрабатывает данные от сервера. Параметры: - timeout (int): Время ожидания в секундах. Возвращает: - str: Данные, полученные от сервера, связанные с маркерами ArUco. """ start_time = asyncio.get_event_loop().time()whileTrue:if self.data_aruco[:4]!="null"and self.data_aruco !="":return(self.data_aruco)else: self.data_aruco=""await asyncio.sleep(0.1)# Краткая пауза между итерациями обработки кадров if (asyncio.get_event_loop().time()- start_time) > timeout: logging.info("Detection timeout - proceeding with next commands.")break
Метод aruco_detect обрабатывает данные от сервера, связанные с маркерами ArUco, асинхронно, возвращая их после ожидания.
ИНИЦИАЛИЗАЦИЯ КЛАССА Realsense
classRealsense:""" Класс для работы с сетевыми операциями и получением данных от сервера. Атрибуты: - buffer_size (int): Размер буфера для приема данных. - client_socket (socket.socket): Клиентский сокет для соединения с сервером. Методы: - __init__(self, IP, PORT): Конструктор класса. - receive_data(self, sock): Получение данных от сервера. - give_mass(self): Возвращает данные, полученные от сервера. """def__init__(self,IP,PORT):""" Конструктор класса. Параметры: - IP (str): IP-адрес сервера. - PORT (int): Порт сервера. Описание: Инициализирует клиентский сокет и устанавливает соединение с сервером. """ host = IP port = PORT self.buffer_size =10000# Инициализация клиентского сокета self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client_socket.connect((host, port)) self.client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)
Здесь IP и PORT — это параметры, которые используются для установления соединения с сервером. Эти параметры передаются при создании объекта класса для получения данных.
МЕТОД ПОЛУЧЕНИЯ ДАННЫХ (в классе Realsense)
defreceive_data(self,sock):""" Получение данных от сервера. Параметры: - sock (socket.socket): Сокет, через который происходит прием данных. Возвращает: - str: Данные, полученные от сервера в виде строки. """ sock.sendall(f"{0}".encode('utf-8')) data = sock.recv(self.buffer_size).decode('utf-8')return data
Метод receive_data принимает данные от сервера через сокет и возвращает их в виде строки.
МЕТОД ВОЗВРАТА ДАННЫХ (в классе Realsense)
defgive_mass(self):""" Возвращает данные, полученные от сервера. Возвращает: - any: Данные, полученные от сервера, преобразованные из строки в объект Python. """try:# Получение данных от сервера data = self.receive_data(self.client_socket)returneval(data)exceptExceptionas e:print(f"Ошибка: {e}")
Метод give_mass получает данные от сервера и возвращает их в виде объекта.
ИНИЦИАЛИЗАЦИЯ КЛАССА Connect_yolo
classConnect_yolo: """ Класс для установления соединения с YOLO сервером и обработки данных. Атрибуты: - ports (int): Порт, используемый для соединения. - data_yolo (str): Данные, полученные от YOLO сервера. - host (str): IP-адрес сервера. - port (int): Порт сервера. - server_socket (socket.socket): Сокет сервера. - socket_thread (threading.Thread): Поток для асинхронной задачи. Методы: - __init__(self, IP, PORT, rtsp): Конструктор класса. - stop(self): Останавливает соединение и поток. - async_task(self): Асинхронная задача для обработки данных. - return_port(self): Возвращает используемый порт. - yolo_detect(self, timeout=5): Асинхронно обрабатывает данные от YOLO сервера. """def__init__(self,IP,PORT,rtsp): """ Конструктор класса. Параметры: - IP (str): IP-адрес сервера. - PORT (int): Порт сервера. - rtsp (str): RTSP адрес потока. Описание: Инициализирует сокет сервера, создает поток для асинхронной задачи и запускает процесс YOLO. """ self.ports=0 self.data_yolo=None self.host = IP self.port = PORT port_data = f'{self.host}:{self.port},{rtsp}'# создаем переменную для работы с аргументами между файлов self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) self.socket_thread = threading.Thread(target=self.async_task, daemon=True) self.socket_thread.start() sys.argv.append(port_data)# объявляем метод через sys commands = f'python3.11 yolo.py {" ".join(sys.argv[1:])}' processes = subprocess.Popen(commands, shell=True) time.sleep(10)
Здесь IP и PORT — это параметры, которые используются для установления соединения с сервером. RTSP — это адрес потока, используемый для обработки данных YOLO. Эти параметры передаются при создании объекта класса для получения и обработки данных с сервера.
МЕТОД ОСТАНОВКИ (в классе Connect_yolo)
defstop(self): """ Останавливает соединение и поток. Описание: Завершает процесс YOLO, используя PID, и ожидает завершения потока. """ command = f'netstat -a -n -o | find "{self.ports}"' output = subprocess.check_output(command, shell=True, text=True) pattern =r'ESTABLISHED\s+(\d+)' pids = re.findall(pattern, output) sys='taskkill /F /PID '+str(pids[1]) os.system(sys) self.socket_thread.join()
Метод stop завершает соединение и останавливает все запущенные процессы и потоки.
МЕТОД АСИНХРОННОЙ ЗАДАЧИ (в классе Connect_yolo)
defasync_task(self): """ Асинхронная задача для обработки данных. Описание: Принимает соединения от клиентов и обрабатывает полученные данные. """try:whileTrue: client_socket, addr = self.server_socket.accept()print(f"Connected by {addr}") self.ports=addr[1]try:whileTrue: data = client_socket.recv(1024).decode('utf-8')ifnot data:breakifnot data=='null':# print(f"Получены данные: {data}") self.data_yolo=data else:# print("Даты нет") self.data_yolo=Nonefinally: client_socket.close()print("Connection closed.")exceptException:pass
Метод async_task обрабатывает входящие соединения и принимает данные от клиентов асинхронно.
МЕТОД ОБНАРУЖЕНИЯ YOLO (в классе Connect_yolo)
asyncdefyolo_detect(self,timeout=5): """ Асинхронно обрабатывает данные от YOLO сервера. Параметры: - timeout (int): Время ожидания в секундах. Возвращает: - tuple: Кортеж с классами и ограничивающими рамками, обнаруженными YOLO. """ start_time = asyncio.get_event_loop().time()whileTrue:ifnot self.data_yolo==None: data = json.loads(self.data_yolo)# Извлечение массивов из словаря boxes = data['boxes'] config = data['conf'] classes = data['class']return (classes, boxes) await asyncio.sleep(0.1)# Краткая пауза между итерациями обработки кадров if (asyncio.get_event_loop().time()- start_time) > timeout: logging.info("Detection timeout - proceeding with next commands.")break
Метод yolo_detect обрабатывает данные от YOLO сервера асинхронно, возвращая их после ожидания.
Процесс подготовки кода:
Создайте файл и назовите его video_processing.py
Поэтапно копируйте код в файл
Скачайте недостающие библиотеки: в данном случае не предустановлены следующие библиотеки: cv2 и ultralytics. Если эти библиотеки у вас не установлены, выполните следующие действия:
Откройте терминал VS Code (Terminal -> New Termial или комбинацией Ctrl + Shift + `)
В терминале поочередно введите и запустите следующие команды: pip install ultralytics; pip install opencv-python