ФАЙЛ "VIDEO_PROCESSING.PY"

Файл video_processing.py представляет собой мощный инструмент для работы с сетевыми соединениями и асинхронной обработкой данных. Он позволяет взаимодействовать с серверами для получения данных маркеров ArUco, данных RealSense и детекций YOLO, что необходимо для эффективной обработки информации и управления робототехникой. Каждый класс выполняет конкретные сетевые операции, обеспечивая необходимую функциональность для разработки автономных систем.

ИНИЦИАЛИЗАЦИЯ БИБЛИОТЕК

import asyncio
import json
import os
import re
import socket
import subprocess
import sys
import threading
import time
import cv2
import cv2.aruco as aruco
import logging
from ultralytics import YOLO

logging.basicConfig(level=logging.INFO)

# Инициализация модели YOLO
model = YOLO("yolov8n")
  1. asyncio: Этот модуль предоставляет поддержку асинхронного ввода-вывода, циклов событий, сопрограмм и задач. Он используется для написания однопоточного параллельного кода с использованием сопрограмм, мультиплексирования доступа к вводу-выводу через сокеты и другие ресурсы, запуска сетевых клиентов и серверов, а также других связанных примитивов.

  2. json: Этот модуль предоставляет функции для кодирования объектов Python в формат JSON и декодирования JSON-строк в объекты Python. Он широко используется для обмена данными между приложениями.

  3. os: Этот модуль предоставляет способ использования функциональности, зависящей от операционной системы. Он включает функции для взаимодействия с файловой системой, управления переменными окружения и контроля процессов.

  4. re: Этот модуль предоставляет поддержку регулярных выражений. Он используется для поиска, сопоставления и манипулирования текстом на основе шаблонов.

  5. socket: Этот модуль предоставляет низкоуровневые сетевые интерфейсы, позволяющие осуществлять сетевое взаимодействие через сокеты.

  6. subprocess: Этот модуль позволяет порождать новые процессы, подключаться к их каналам ввода/вывода/ошибок и получать их коды возврата.

  7. sys: Этот модуль предоставляет доступ к некоторым переменным, используемым или поддерживаемым интерпретатором, а также к функциям, которые сильно взаимодействуют с интерпретатором.

  8. threading: Этот модуль строится на низкоуровневых возможностях потоков, чтобы сделать работу с потоками еще проще и более питоничной.

  9. time: Этот модуль предоставляет различные функции, связанные со временем.

  10. cv2: Это библиотека OpenCV, которая используется для задач компьютерного зрения, таких как обработка изображений и видео, обнаружение объектов и отслеживание.

  11. cv2.aruco: Это подмодуль OpenCV, который предоставляет инструменты для работы с маркерами Aruco, часто используемыми в робототехнике и дополненной реальности.

  12. logging: Этот модуль предоставляет гибкую систему логирования для приложений и библиотек.

  13. ultralytics.YOLO: Это, вероятно, модуль из библиотеки Ultralytics, который предоставляет реализацию модели YOLO (You Only Look Once) для обнаружения объектов в реальном времени.

ИНИЦИАЛИЗАЦИЯ КЛАССА Connect_aruco

class Connect_aruco: 
    """
    Класс для установления соединения с сервером и обработки данных ArUco.

    Атрибуты:
    - ports (int): Порт, используемый для соединения.
    - data_aruco (str): Данные, полученные от сервера.
    - host (str): IP-адрес сервера.
    - port (int): Порт сервера.
    - server_socket (socket.socket): Сокет сервера.
    - socket_thread (threading.Thread): Поток для асинхронной задачи.

    Методы:
    - __init__(self, IP, PORT, rtsp): Конструктор класса.
    - stop(self): Останавливает соединение и поток.
    - async_task(self): Асинхронная задача для обработки данных.
    - aruco_detect(self, timeout=5): Асинхронно обрабатывает данные от сервера.
    """
    def __init__(self,IP,PORT,rtsp): 
        """
        Конструктор класса.

        Параметры:
        - IP (str): IP-адрес сервера.
        - PORT (int): Порт сервера.
        - rtsp (str): RTSP адрес потока.

        Описание:
        Инициализирует сокет сервера, создает поток для асинхронной задачи и запускает процесс ArUco.
        """
        self.ports=0 
        self.data_aruco=None 
        self.host = IP 
        self.port = PORT 
        port_data = f'{self.host}:{self.port},{rtsp}' # создаем переменную для работы с аргументами между файлов 
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024) 
        self.server_socket.bind((self.host, self.port)) 
        self.server_socket.listen(5) 
        self.socket_thread = threading.Thread(target=self.async_task, daemon=True) 
        self.socket_thread.start() 
        sys.argv.append(port_data) # объявляем метод через sys 
        commands = f'python3.11 aruco.py {" ".join(sys.argv[1:])}'  
        processes = subprocess.Popen(commands, shell=True)
        time.sleep(3) 

Здесь IP и PORT — это параметры, которые используются для установления соединения с сервером. RTSP — это адрес потока, используемый для обработки данных. Эти параметры передаются при создании объекта класса, чтобы затем получать и обрабатывать данные с сервера.

МЕТОД ОСТАНОВКИ (в классе Connect_aruco)

    def stop(self): 
        """
        Останавливает соединение и поток.

        Описание:
        Завершает процесс ArUco, используя PID, и ожидает завершения потока.
        """
        command = f'netstat -a -n -o | find "{self.ports}"' 
        output = subprocess.check_output(command, shell=True, text=True) 
        pattern = r'ESTABLISHED\s+(\d+)' 
        pids = re.findall(pattern, output) 
        sys='taskkill /F /PID '+str(pids[1]) 
        os.system(sys) 
        self.socket_thread.join() 

Метод stop завершает соединение и остановливает все запущенные процессы и потоки.

МЕТОД АСИНХРОННОЙ ЗАДАЧИ (в классе Connect_aruco)

    def async_task(self): 
        """
        Асинхронная задача для обработки данных.

        Описание:
        Принимает соединения от клиентов и обрабатывает полученные данные.
        """
        try:
            while True: 
                client_socket, addr = self.server_socket.accept() 
                print(f"Connected by {addr}") 
                self.ports=addr[1] 
    
                try: 
                    while True: 
                        data = client_socket.recv(1024).decode('utf-8') 
                        if not data: 
                            break 
                        if not data=='None': 
                            # print(f"Получены данные: {data}") 
                            self.data_aruco=data 
                        else: 
                            # print("Даты нет") 
                            self.data_aruco="" 
                finally: 
                    client_socket.close() 
                    print("Connection closed.") 
        except Exception:
            pass

Метод async_task обрабатывает входящие соединения и принимает данные от клиентов асинхронно.

МЕТОД ОБНАРУЖЕНИЯ ARUCO (в классе Connect_aruco)

    async def aruco_detect(self,timeout=5):
        """
        Асинхронно обрабатывает данные от сервера.

        Параметры:
        - timeout (int): Время ожидания в секундах.

        Возвращает:
        - str: Данные, полученные от сервера, связанные с маркерами ArUco.
        """ 
        start_time = asyncio.get_event_loop().time() 
        while True: 
            if self.data_aruco[:4] != "null" and self.data_aruco != "":
                return(self.data_aruco)
            else:
                self.data_aruco=""
            await asyncio.sleep(0.1)  # Краткая пауза между итерациями обработки кадров 
 
            if (asyncio.get_event_loop().time() - start_time) > timeout: 
                logging.info("Detection timeout - proceeding with next commands.") 
                break 

Метод aruco_detect обрабатывает данные от сервера, связанные с маркерами ArUco, асинхронно, возвращая их после ожидания.

ИНИЦИАЛИЗАЦИЯ КЛАССА Realsense

class Realsense:
    """
    Класс для работы с сетевыми операциями и получением данных от сервера.

    Атрибуты:
    - buffer_size (int): Размер буфера для приема данных.
    - client_socket (socket.socket): Клиентский сокет для соединения с сервером.

    Методы:
    - __init__(self, IP, PORT): Конструктор класса.
    - receive_data(self, sock): Получение данных от сервера.
    - give_mass(self): Возвращает данные, полученные от сервера.
    """
    def __init__(self,IP,PORT):
        """
        Конструктор класса.

        Параметры:
        - IP (str): IP-адрес сервера.
        - PORT (int): Порт сервера.

        Описание:
        Инициализирует клиентский сокет и устанавливает соединение с сервером.
        """
        host = IP
        port = PORT
        self.buffer_size = 10000

        # Инициализация клиентского сокета
        self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client_socket.connect((host, port))
        self.client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)

Здесь IP и PORT — это параметры, которые используются для установления соединения с сервером. Эти параметры передаются при создании объекта класса для получения данных.

МЕТОД ПОЛУЧЕНИЯ ДАННЫХ (в классе Realsense)

    def receive_data(self,sock):
        """
        Получение данных от сервера.

        Параметры:
        - sock (socket.socket): Сокет, через который происходит прием данных.

        Возвращает:
        - str: Данные, полученные от сервера в виде строки.
        """
        sock.sendall(f"{0}".encode('utf-8'))
        data = sock.recv(self.buffer_size).decode('utf-8')
        return data

Метод receive_data принимает данные от сервера через сокет и возвращает их в виде строки.

МЕТОД ВОЗВРАТА ДАННЫХ (в классе Realsense)

    def give_mass(self):
        """
        Возвращает данные, полученные от сервера.

        Возвращает:
        - any: Данные, полученные от сервера, преобразованные из строки в объект Python.
        """
        try:
            # Получение данных от сервера
            data = self.receive_data(self.client_socket)
            return eval(data)
        except Exception as e:
            print(f"Ошибка: {e}")

Метод give_mass получает данные от сервера и возвращает их в виде объекта.

ИНИЦИАЛИЗАЦИЯ КЛАССА Connect_yolo

class Connect_yolo: 
    """
    Класс для установления соединения с YOLO сервером и обработки данных.

    Атрибуты:
    - ports (int): Порт, используемый для соединения.
    - data_yolo (str): Данные, полученные от YOLO сервера.
    - host (str): IP-адрес сервера.
    - port (int): Порт сервера.
    - server_socket (socket.socket): Сокет сервера.
    - socket_thread (threading.Thread): Поток для асинхронной задачи.

    Методы:
    - __init__(self, IP, PORT, rtsp): Конструктор класса.
    - stop(self): Останавливает соединение и поток.
    - async_task(self): Асинхронная задача для обработки данных.
    - return_port(self): Возвращает используемый порт.
    - yolo_detect(self, timeout=5): Асинхронно обрабатывает данные от YOLO сервера.
    """
    def __init__(self,IP,PORT,rtsp): 
        """
        Конструктор класса.

        Параметры:
        - IP (str): IP-адрес сервера.
        - PORT (int): Порт сервера.
        - rtsp (str): RTSP адрес потока.

        Описание:
        Инициализирует сокет сервера, создает поток для асинхронной задачи и запускает процесс YOLO.
        """
        self.ports=0 
        self.data_yolo=None 
        self.host = IP 
        self.port = PORT 
        port_data = f'{self.host}:{self.port},{rtsp}' # создаем переменную для работы с аргументами между файлов 
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024) 
        self.server_socket.bind((self.host, self.port)) 
        self.server_socket.listen(5) 
        self.socket_thread = threading.Thread(target=self.async_task, daemon=True) 
        self.socket_thread.start() 
        sys.argv.append(port_data) # объявляем метод через sys 
        commands = f'python3.11 yolo.py {" ".join(sys.argv[1:])}'  
        processes = subprocess.Popen(commands, shell=True)
        time.sleep(10) 

Здесь IP и PORT — это параметры, которые используются для установления соединения с сервером. RTSP — это адрес потока, используемый для обработки данных YOLO. Эти параметры передаются при создании объекта класса для получения и обработки данных с сервера.

МЕТОД ОСТАНОВКИ (в классе Connect_yolo)

    def stop(self): 
        """
        Останавливает соединение и поток.

        Описание:
        Завершает процесс YOLO, используя PID, и ожидает завершения потока.
        """
        command = f'netstat -a -n -o | find "{self.ports}"' 
        output = subprocess.check_output(command, shell=True, text=True) 
        pattern = r'ESTABLISHED\s+(\d+)' 
        pids = re.findall(pattern, output) 
        sys='taskkill /F /PID '+str(pids[1]) 
        os.system(sys) 
        self.socket_thread.join() 

Метод stop завершает соединение и останавливает все запущенные процессы и потоки.

МЕТОД АСИНХРОННОЙ ЗАДАЧИ (в классе Connect_yolo)

    def async_task(self): 
        """
        Асинхронная задача для обработки данных.

        Описание:
        Принимает соединения от клиентов и обрабатывает полученные данные.
        """
        try:
            while True: 
                client_socket, addr = self.server_socket.accept() 
                print(f"Connected by {addr}") 
                self.ports=addr[1] 
    
                try: 
                    while True: 
                        data = client_socket.recv(1024).decode('utf-8') 
                        if not data: 
                            break 
                        if not data=='null': 
                            # print(f"Получены данные: {data}") 
                            self.data_yolo=data 
                        else: 
                            # print("Даты нет") 
                            self.data_yolo=None 
                finally: 
                    client_socket.close() 
                    print("Connection closed.") 
        except Exception:
            pass

Метод async_task обрабатывает входящие соединения и принимает данные от клиентов асинхронно.

МЕТОД ОБНАРУЖЕНИЯ YOLO (в классе Connect_yolo)

    async def yolo_detect(self,timeout=5): 
        """
        Асинхронно обрабатывает данные от YOLO сервера.

        Параметры:
        - timeout (int): Время ожидания в секундах.

        Возвращает:
        - tuple: Кортеж с классами и ограничивающими рамками, обнаруженными YOLO.
        """
        start_time = asyncio.get_event_loop().time() 
        while True: 
            if not self.data_yolo==None: 
                data = json.loads(self.data_yolo) 
                # Извлечение массивов из словаря 
                boxes = data['boxes'] 
                config = data['conf'] 
                classes = data['class'] 
                return (classes, boxes) 
            await asyncio.sleep(0.1)  # Краткая пауза между итерациями обработки кадров 
 
            if (asyncio.get_event_loop().time() - start_time) > timeout: 
                logging.info("Detection timeout - proceeding with next commands.") 
                break 

Метод yolo_detect обрабатывает данные от YOLO сервера асинхронно, возвращая их после ожидания.

Процесс подготовки кода:

  1. Создайте файл и назовите его video_processing.py

Файл video_processing.py в окне иерархии
  1. Поэтапно копируйте код в файл

  1. Скачайте недостающие библиотеки: в данном случае не предустановлены следующие библиотеки: cv2 и ultralytics. Если эти библиотеки у вас не установлены, выполните следующие действия:

    1. Откройте терминал VS Code (Terminal -> New Termial или комбинацией Ctrl + Shift + `)

    2. В терминале поочередно введите и запустите следующие команды: pip install ultralytics; pip install opencv-python

Last updated