Файл video_processing.py представляет собой мощный инструмент для работы с сетевыми соединениями и асинхронной обработкой данных. Он позволяет взаимодействовать с серверами для получения данных маркеров ArUco, данных RealSense и детекций YOLO, что необходимо для эффективной обработки информации и управления робототехникой. Каждый класс выполняет конкретные сетевые операции, обеспечивая необходимую функциональность для разработки автономных систем.
asyncio: Этот модуль предоставляет поддержку асинхронного ввода-вывода, циклов событий, сопрограмм и задач. Он используется для написания однопоточного параллельного кода с использованием сопрограмм, мультиплексирования доступа к вводу-выводу через сокеты и другие ресурсы, запуска сетевых клиентов и серверов, а также других связанных примитивов.
json: Этот модуль предоставляет функции для кодирования объектов Python в формат JSON и декодирования JSON-строк в объекты Python. Он широко используется для обмена данными между приложениями.
os: Этот модуль предоставляет способ использования функциональности, зависящей от операционной системы. Он включает функции для взаимодействия с файловой системой, управления переменными окружения и контроля процессов.
re: Этот модуль предоставляет поддержку регулярных выражений. Он используется для поиска, сопоставления и манипулирования текстом на основе шаблонов.
socket: Этот модуль предоставляет низкоуровневые сетевые интерфейсы, позволяющие осуществлять сетевое взаимодействие через сокеты.
subprocess: Этот модуль позволяет порождать новые процессы, подключаться к их каналам ввода/вывода/ошибок и получать их коды возврата.
sys: Этот модуль предоставляет доступ к некоторым переменным, используемым или поддерживаемым интерпретатором, а также к функциям, которые сильно взаимодействуют с интерпретатором.
threading: Этот модуль строится на низкоуровневых возможностях потоков, чтобы сделать работу с потоками еще проще и более питоничной.
time: Этот модуль предоставляет различные функции, связанные со временем.
cv2: Это библиотека OpenCV, которая используется для задач компьютерного зрения, таких как обработка изображений и видео, обнаружение объектов и отслеживание.
cv2.aruco: Это подмодуль OpenCV, который предоставляет инструменты для работы с маркерами Aruco, часто используемыми в робототехнике и дополненной реальности.
logging: Этот модуль предоставляет гибкую систему логирования для приложений и библиотек.
ultralytics.YOLO: Это, вероятно, модуль из библиотеки Ultralytics, который предоставляет реализацию модели YOLO (You Only Look Once) для обнаружения объектов в реальном времени.
ИНИЦИАЛИЗАЦИЯ КЛАССА Connect_aruco
Здесь IP и PORT — это параметры, которые используются для установления соединения с сервером. RTSP — это адрес потока, используемый для обработки данных. Эти параметры передаются при создании объекта класса, чтобы затем получать и обрабатывать данные с сервера.
МЕТОД ОСТАНОВКИ (в классе Connect_aruco)
Метод stop завершает соединение и остановливает все запущенные процессы и потоки.
МЕТОД АСИНХРОННОЙ ЗАДАЧИ (в классе Connect_aruco)
Метод async_task обрабатывает входящие соединения и принимает данные от клиентов асинхронно.
МЕТОД ОБНАРУЖЕНИЯ ARUCO (в классе Connect_aruco)
Метод aruco_detect обрабатывает данные от сервера, связанные с маркерами ArUco, асинхронно, возвращая их после ожидания.
ИНИЦИАЛИЗАЦИЯ КЛАССА Realsense
Здесь IP и PORT — это параметры, которые используются для установления соединения с сервером. Эти параметры передаются при создании объекта класса для получения данных.
МЕТОД ПОЛУЧЕНИЯ ДАННЫХ (в классе Realsense)
Метод receive_data принимает данные от сервера через сокет и возвращает их в виде строки.
МЕТОД ВОЗВРАТА ДАННЫХ (в классе Realsense)
Метод give_mass получает данные от сервера и возвращает их в виде объекта.
ИНИЦИАЛИЗАЦИЯ КЛАССА Connect_yolo
Здесь IP и PORT — это параметры, которые используются для установления соединения с сервером. RTSP — это адрес потока, используемый для обработки данных YOLO. Эти параметры передаются при создании объекта класса для получения и обработки данных с сервера.
МЕТОД ОСТАНОВКИ (в классе Connect_yolo)
Метод stop завершает соединение и останавливает все запущенные процессы и потоки.
МЕТОД АСИНХРОННОЙ ЗАДАЧИ (в классе Connect_yolo)
Метод async_task обрабатывает входящие соединения и принимает данные от клиентов асинхронно.
МЕТОД ОБНАРУЖЕНИЯ YOLO (в классе Connect_yolo)
Метод yolo_detect обрабатывает данные от YOLO сервера асинхронно, возвращая их после ожидания.
Процесс подготовки кода:
Создайте файл и назовите его video_processing.py
Файл video_processing.py в окне иерархии
Поэтапно копируйте код в файл
Скачайте недостающие библиотеки: в данном случае не предустановлены следующие библиотеки: cv2 и ultralytics. Если эти библиотеки у вас не установлены, выполните следующие действия:
Откройте терминал VS Code (Terminal -> New Termial или комбинацией Ctrl + Shift + `)
В терминале поочередно введите и запустите следующие команды: pip install ultralytics; pip install opencv-python
class Connect_aruco:
"""
Класс для установления соединения с сервером и обработки данных ArUco.
Атрибуты:
- ports (int): Порт, используемый для соединения.
- data_aruco (str): Данные, полученные от сервера.
- host (str): IP-адрес сервера.
- port (int): Порт сервера.
- server_socket (socket.socket): Сокет сервера.
- socket_thread (threading.Thread): Поток для асинхронной задачи.
Методы:
- __init__(self, IP, PORT, rtsp): Конструктор класса.
- stop(self): Останавливает соединение и поток.
- async_task(self): Асинхронная задача для обработки данных.
- aruco_detect(self, timeout=5): Асинхронно обрабатывает данные от сервера.
"""
def __init__(self,IP,PORT,rtsp):
"""
Конструктор класса.
Параметры:
- IP (str): IP-адрес сервера.
- PORT (int): Порт сервера.
- rtsp (str): RTSP адрес потока.
Описание:
Инициализирует сокет сервера, создает поток для асинхронной задачи и запускает процесс ArUco.
"""
self.ports=0
self.data_aruco=None
self.host = IP
self.port = PORT
port_data = f'{self.host}:{self.port},{rtsp}' # создаем переменную для работы с аргументами между файлов
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.socket_thread = threading.Thread(target=self.async_task, daemon=True)
self.socket_thread.start()
sys.argv.append(port_data) # объявляем метод через sys
commands = f'python3.11 aruco.py {" ".join(sys.argv[1:])}'
processes = subprocess.Popen(commands, shell=True)
time.sleep(3)
def stop(self):
"""
Останавливает соединение и поток.
Описание:
Завершает процесс ArUco, используя PID, и ожидает завершения потока.
"""
command = f'netstat -a -n -o | find "{self.ports}"'
output = subprocess.check_output(command, shell=True, text=True)
pattern = r'ESTABLISHED\s+(\d+)'
pids = re.findall(pattern, output)
sys='taskkill /F /PID '+str(pids[1])
os.system(sys)
self.socket_thread.join()
def async_task(self):
"""
Асинхронная задача для обработки данных.
Описание:
Принимает соединения от клиентов и обрабатывает полученные данные.
"""
try:
while True:
client_socket, addr = self.server_socket.accept()
print(f"Connected by {addr}")
self.ports=addr[1]
try:
while True:
data = client_socket.recv(1024).decode('utf-8')
if not data:
break
if not data=='None':
# print(f"Получены данные: {data}")
self.data_aruco=data
else:
# print("Даты нет")
self.data_aruco=""
finally:
client_socket.close()
print("Connection closed.")
except Exception:
pass
async def aruco_detect(self,timeout=5):
"""
Асинхронно обрабатывает данные от сервера.
Параметры:
- timeout (int): Время ожидания в секундах.
Возвращает:
- str: Данные, полученные от сервера, связанные с маркерами ArUco.
"""
start_time = asyncio.get_event_loop().time()
while True:
if self.data_aruco[:4] != "null" and self.data_aruco != "":
return(self.data_aruco)
else:
self.data_aruco=""
await asyncio.sleep(0.1) # Краткая пауза между итерациями обработки кадров
if (asyncio.get_event_loop().time() - start_time) > timeout:
logging.info("Detection timeout - proceeding with next commands.")
break
class Realsense:
"""
Класс для работы с сетевыми операциями и получением данных от сервера.
Атрибуты:
- buffer_size (int): Размер буфера для приема данных.
- client_socket (socket.socket): Клиентский сокет для соединения с сервером.
Методы:
- __init__(self, IP, PORT): Конструктор класса.
- receive_data(self, sock): Получение данных от сервера.
- give_mass(self): Возвращает данные, полученные от сервера.
"""
def __init__(self,IP,PORT):
"""
Конструктор класса.
Параметры:
- IP (str): IP-адрес сервера.
- PORT (int): Порт сервера.
Описание:
Инициализирует клиентский сокет и устанавливает соединение с сервером.
"""
host = IP
port = PORT
self.buffer_size = 10000
# Инициализация клиентского сокета
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.connect((host, port))
self.client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)
def receive_data(self,sock):
"""
Получение данных от сервера.
Параметры:
- sock (socket.socket): Сокет, через который происходит прием данных.
Возвращает:
- str: Данные, полученные от сервера в виде строки.
"""
sock.sendall(f"{0}".encode('utf-8'))
data = sock.recv(self.buffer_size).decode('utf-8')
return data
def give_mass(self):
"""
Возвращает данные, полученные от сервера.
Возвращает:
- any: Данные, полученные от сервера, преобразованные из строки в объект Python.
"""
try:
# Получение данных от сервера
data = self.receive_data(self.client_socket)
return eval(data)
except Exception as e:
print(f"Ошибка: {e}")
class Connect_yolo:
"""
Класс для установления соединения с YOLO сервером и обработки данных.
Атрибуты:
- ports (int): Порт, используемый для соединения.
- data_yolo (str): Данные, полученные от YOLO сервера.
- host (str): IP-адрес сервера.
- port (int): Порт сервера.
- server_socket (socket.socket): Сокет сервера.
- socket_thread (threading.Thread): Поток для асинхронной задачи.
Методы:
- __init__(self, IP, PORT, rtsp): Конструктор класса.
- stop(self): Останавливает соединение и поток.
- async_task(self): Асинхронная задача для обработки данных.
- return_port(self): Возвращает используемый порт.
- yolo_detect(self, timeout=5): Асинхронно обрабатывает данные от YOLO сервера.
"""
def __init__(self,IP,PORT,rtsp):
"""
Конструктор класса.
Параметры:
- IP (str): IP-адрес сервера.
- PORT (int): Порт сервера.
- rtsp (str): RTSP адрес потока.
Описание:
Инициализирует сокет сервера, создает поток для асинхронной задачи и запускает процесс YOLO.
"""
self.ports=0
self.data_yolo=None
self.host = IP
self.port = PORT
port_data = f'{self.host}:{self.port},{rtsp}' # создаем переменную для работы с аргументами между файлов
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.socket_thread = threading.Thread(target=self.async_task, daemon=True)
self.socket_thread.start()
sys.argv.append(port_data) # объявляем метод через sys
commands = f'python3.11 yolo.py {" ".join(sys.argv[1:])}'
processes = subprocess.Popen(commands, shell=True)
time.sleep(10)
def stop(self):
"""
Останавливает соединение и поток.
Описание:
Завершает процесс YOLO, используя PID, и ожидает завершения потока.
"""
command = f'netstat -a -n -o | find "{self.ports}"'
output = subprocess.check_output(command, shell=True, text=True)
pattern = r'ESTABLISHED\s+(\d+)'
pids = re.findall(pattern, output)
sys='taskkill /F /PID '+str(pids[1])
os.system(sys)
self.socket_thread.join()
def async_task(self):
"""
Асинхронная задача для обработки данных.
Описание:
Принимает соединения от клиентов и обрабатывает полученные данные.
"""
try:
while True:
client_socket, addr = self.server_socket.accept()
print(f"Connected by {addr}")
self.ports=addr[1]
try:
while True:
data = client_socket.recv(1024).decode('utf-8')
if not data:
break
if not data=='null':
# print(f"Получены данные: {data}")
self.data_yolo=data
else:
# print("Даты нет")
self.data_yolo=None
finally:
client_socket.close()
print("Connection closed.")
except Exception:
pass
async def yolo_detect(self,timeout=5):
"""
Асинхронно обрабатывает данные от YOLO сервера.
Параметры:
- timeout (int): Время ожидания в секундах.
Возвращает:
- tuple: Кортеж с классами и ограничивающими рамками, обнаруженными YOLO.
"""
start_time = asyncio.get_event_loop().time()
while True:
if not self.data_yolo==None:
data = json.loads(self.data_yolo)
# Извлечение массивов из словаря
boxes = data['boxes']
config = data['conf']
classes = data['class']
return (classes, boxes)
await asyncio.sleep(0.1) # Краткая пауза между итерациями обработки кадров
if (asyncio.get_event_loop().time() - start_time) > timeout:
logging.info("Detection timeout - proceeding with next commands.")
break