Движение с использованием жестами
# Импорт библиотек
import asyncio
import cv2
import numpy as np
import mediapipe as mp
import socketio
import aiofiles
import time
import threading
from collections import deque
server = 'http://192.168.10.115:3006/' # адрес вашего сервера
sio = socketio.AsyncClient()
# Замените 'rtsp://username:password@ip_address:port/stream' на ваш RTSP поток
rtsp_url = 'rtsp://username:password@ip_address:port/stream'
cap = cv2.VideoCapture(rtsp_url)
# Установка разрешения видеопотока
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
mpHands = mp.solutions.hands
hands = mpHands.Hands(False)
npDraw = mp.solutions.drawing_utils
# Очередь для хранения координат пальцев
finger_positions = deque(maxlen=5) # Можно настроить длину очереди
# Переменные для управления жестами
last_gesture_time = 0
gesture_cooldown = 1.5 # Время ожидания между жестами в секундах
async def write_report(message):
filename = 'Отчет_Фамилия_Имя.txt'
async with aiofiles.open(filename, mode='a', encoding='utf-8') as file:
await file.write(message + "\n")
@sio.event
async def connect():
await write_report('Подключено к серверу Socket.IO')
print("Подключено к серверу Socket.IO")
async def control_lights(on: bool):
lights_command = {
'id': 1,
'type': 'lights',
'value': on
}
await sio.emit('command', lights_command, namespace='/vehicles')
action = "включены" if on else "выключены"
message = f"Команда на включение света {action} отправлена."
await write_report(message)
async def perform_movement(distance_cm):
max_speed = 0.5 # Максимальная скорость м/с
distance_m = distance_cm / 100.0 # Переводим сантиметры в метры
time_to_travel = distance_m / max_speed
start_time = time.time()
while (time.time() - start_time) < time_to_travel:
await sio.emit('command', {
'id': 1,
'type': 'moveDirectly',
'value': {
'right': 0.15,
'left': 0.15
}
}, namespace='/vehicles')
await asyncio.sleep(0.2)
completion_message = f"Движение завершено: Пройденное расстояние {distance_m} м."
await write_report(completion_message)
print(completion_message)
async def backward_movement(distance_cm):
max_speed = 0.5 # Максимальная скорость м/с
distance_m = distance_cm / 100.0 # Переводим сантиметры в метры
time_to_travel = distance_m / max_speed
start_time = time.time()
while (time.time() - start_time) < time_to_travel:
await sio.emit('command', {
'id': 1,
'type': 'moveDirectly',
'value': {
'right': -0.25,
'left': -0.25
}
}, namespace='/vehicles')
await asyncio.sleep(0.2)
completion_message = f"Движение завершено: Пройденное расстояние {distance_m} м."
await write_report(completion_message)
print(completion_message)
async def stop_movement():
await sio.emit('command', {
'id': 1,
'type': "move",
'value': {
'x': 0,
'y': 0,
'sensitivity': 1,
},
}, namespace='/vehicles')
stop_message = "Команда остановки движения отправлена."
await write_report(stop_message)
print(stop_message)
def apply_median_filter(landmarks):
"""Применяет медианный фильтр к координатам рук."""
# Преобразуем landmarks в массив numpy для удобства
landmarks_array = np.array([[lm.x, lm.y, lm.z] for lm in landmarks])
if len(finger_positions) == finger_positions.maxlen:
filtered_landmarks = np.median(finger_positions, axis=0)
else:
filtered_landmarks = landmarks_array
finger_positions.append(landmarks_array)
return filtered_landmarks
def are_fingers_raised_for_activation(landmarks):
index_finger_tip = landmarks[mpHands.HandLandmark.INDEX_FINGER_TIP.value]
index_finger_base = landmarks[mpHands.HandLandmark.INDEX_FINGER_MCP.value]
middle_finger_tip = landmarks[mpHands.HandLandmark.MIDDLE_FINGER_TIP.value]
middle_finger_base = landmarks[mpHands.HandLandmark.MIDDLE_FINGER_MCP.value]
return (
index_finger_tip[1] < index_finger_base[1] and
middle_finger_tip[1] < middle_finger_base[1]
)
def is_index_finger_raised(landmarks):
index_finger_tip = landmarks[mpHands.HandLandmark.INDEX_FINGER_TIP.value]
index_finger_base = landmarks[mpHands.HandLandmark.INDEX_FINGER_MCP.value]
return index_finger_tip[1] < index_finger_base[1]
def is_thumb_raised(landmarks):
thumb_tip = landmarks[mpHands.HandLandmark.THUMB_TIP.value]
thumb_base = landmarks[mpHands.HandLandmark.THUMB_MCP.value]
return thumb_tip[1] < thumb_base[1]
def are_fingers_raised_for_backward(landmarks):
"""Проверяет, подняты ли мизинец, безымянный и средний пальцы, а указательный и большой опущены."""
pinky_tip = landmarks[mpHands.HandLandmark.PINKY_TIP.value]
pinky_base = landmarks[mpHands.HandLandmark.PINKY_MCP.value]
ring_finger_tip = landmarks[mpHands.HandLandmark.RING_FINGER_TIP.value]
ring_finger_base = landmarks[mpHands.HandLandmark.RING_FINGER_MCP.value]
middle_finger_tip = landmarks[mpHands.HandLandmark.MIDDLE_FINGER_TIP.value]
middle_finger_base = landmarks[mpHands.HandLandmark.MIDDLE_FINGER_MCP.value]
index_finger_tip = landmarks[mpHands.HandLandmark.INDEX_FINGER_TIP.value]
index_finger_base = landmarks[mpHands.HandLandmark.INDEX_FINGER_MCP.value]
thumb_tip = landmarks[mpHands.HandLandmark.THUMB_TIP.value]
thumb_base = landmarks[mpHands.HandLandmark.THUMB_MCP.value]
return (
pinky_tip[1] < pinky_base[1] and
ring_finger_tip[1] < ring_finger_base[1] and
middle_finger_tip[1] < middle_finger_base[1] and
index_finger_tip[1] > index_finger_base[1] and # Указательный палец опущен
thumb_tip[1] > thumb_base[1] # Большой палец опущен
)
def is_hand_open(landmarks):
"""Проверяет, подняты ли все пальцы для жеста раскрытой ладони."""
fingers_raised = [
landmarks[mpHands.HandLandmark.THUMB_TIP.value][1] < landmarks[mpHands.HandLandmark.THUMB_MCP.value][1],
landmarks[mpHands.HandLandmark.INDEX_FINGER_TIP.value][1] < landmarks[mpHands.HandLandmark.INDEX_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.MIDDLE_FINGER_TIP.value][1] < landmarks[mpHands.HandLandmark.MIDDLE_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.RING_FINGER_TIP.value][1] < landmarks[mpHands.HandLandmark.RING_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.PINKY_TIP.value][1] < landmarks[mpHands.HandLandmark.PINKY_MCP.value][1],
]
# Преобразуем в массив numpy и проверяем, что все элементы являются булевыми
fingers_raised = np.array(fingers_raised, dtype=bool)
return np.all(fingers_raised)
def is_hand_closed(landmarks):
"""Проверяет, закрыта ли ладонь (все пальцы опущены)."""
fingers_lowered = [
landmarks[mpHands.HandLandmark.THUMB_TIP.value][1] > landmarks[mpHands.HandLandmark.THUMB_MCP.value][1],
landmarks[mpHands.HandLandmark.INDEX_FINGER_TIP.value][1] > landmarks[mpHands.HandLandmark.INDEX_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.MIDDLE_FINGER_TIP.value][1] > landmarks[mpHands.HandLandmark.MIDDLE_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.RING_FINGER_TIP.value][1] > landmarks[mpHands.HandLandmark.RING_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.PINKY_TIP.value][1] > landmarks[mpHands.HandLandmark.PINKY_MCP.value][1],
]
# Преобразуем в массив numpy и проверяем, что все элементы являются булевыми
fingers_lowered = np.array(fingers_lowered, dtype=bool)
return np.all(fingers_lowered)
def process_video_stream(loop):
global cap
global last_gesture_time # добавляем глобальную переменную
pTime = time.time()
lights_on = False
activation_done = False
activation_start_time = None
thumb_raised_message_shown = False
backward_movement_active = False
while True:
success, img = cap.read()
if not success:
print("Не удалось получить кадр из потока.")
break
img = cv2.flip(img, 1)
imgRGB = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
results = hands.process(imgRGB)
if results.multi_hand_landmarks:
for handLms in results.multi_hand_landmarks:
npDraw.draw_landmarks(img, handLms, mpHands.HAND_CONNECTIONS)
# Применение медианного фильтра к координатам рук
filtered_landmarks = apply_median_filter(handLms.landmark)
current_time = time.time()
if not activation_done and are_fingers_raised_for_activation(filtered_landmarks):
if activation_start_time is None:
activation_start_time = time.time()
elif time.time() - activation_start_time >= 5:
activation_done = True
print("Жест активации выполнен. Теперь доступны другие жесты.")
else:
activation_start_time = None
if activation_done:
# Проверка на жест раскрытой ладони
if is_hand_open(filtered_landmarks) and (current_time - last_gesture_time >= gesture_cooldown):
asyncio.run_coroutine_threadsafe(backward_movement(30), loop)
last_gesture_time = current_time # обновляем время последнего жеста
print("Жест раскрытой ладони обнаружен! Игнорирование других команд.")
continue # Игнорируем другие команды
if is_index_finger_raised(filtered_landmarks) and (current_time - last_gesture_time >= gesture_cooldown):
if not lights_on:
asyncio.run_coroutine_threadsafe(control_lights(True), loop)
lights_on = True
last_gesture_time = current_time # обновляем время последнего жеста
print("Фары включены.")
else:
if lights_on and (current_time - last_gesture_time >= gesture_cooldown):
asyncio.run_coroutine_threadsafe(control_lights(False), loop)
lights_on = False
last_gesture_time = current_time # обновляем время последнего жеста
print("Фары выключены.")
# Проверка для движения назад
if are_fingers_raised_for_backward(filtered_landmarks) and (current_time - last_gesture_time >= gesture_cooldown):
if not backward_movement_active:
asyncio.run_coroutine_threadsafe(perform_movement(30), loop)
backward_movement_active = True
last_gesture_time = current_time # обновляем время последнего жеста
print("Мизинец, безымянный и средний пальцы подняты! Движение назад начато.")
else:
if backward_movement_active:
asyncio.run_coroutine_threadsafe(stop_movement(), loop)
backward_movement_active = False
print("Движение назад остановлено.")
# Проверка на жест закрытой ладони
if is_hand_closed(filtered_landmarks) and (current_time - last_gesture_time >= gesture_cooldown):
asyncio.run_coroutine_threadsafe(stop_movement(), loop)
last_gesture_time = current_time # обновляем время последнего жеста
print("Жест закрытой ладони обнаружен! Остановка движения.")
cTime = time.time()
fps = 1 / (cTime - pTime) if pTime > 0 else 0
pTime = cTime
cv2.putText(img, str(int(fps)), (10, 30), cv2.FONT_HERSHEY_PLAIN, 2, (255, 0, 0), 2)
cv2.imshow('RTSP Stream', img)
if cv2.waitKey(20) == 27: # выход по ESC
break
cap.release()
cv2.destroyAllWindows()
async def sensor(loop):
await sio.connect(server, wait_timeout=20, namespaces=['/vehicles'])
# Основной цикл для обработки событий Socket.IO
while True:
await asyncio.sleep(1) # Держим основной поток активным
def main():
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Запуск потока для обработки видеопотока
video_thread = threading.Thread(target=process_video_stream, args=(loop,))
video_thread.start()
# Запуск асинхронной функции для подключения к серверу
loop.run_until_complete(sensor(loop))
except Exception as e:
print(f"Ошибка: {e}")
if __name__ == "__main__":
main()
Объяснение кода
Импорт библиотек
import asyncio
import cv2
import numpy as np
import mediapipe as mp
import socketio
import aiofiles
import time
import threading
from collections import deque
asyncio:
Это стандартная библиотека Python, предназначенная для написания асинхронного кода. Она предоставляет инструменты для работы с асинхронными операциями, такими как корутины, события и циклы событий. Это особенно полезно для ввода-вывода (I/O) операций, таких как сетевые запросы или работа с файлами, позволяя программе продолжать выполнение, пока ожидаются результаты.
time:
Стандартная библиотека Python, которая предоставляет функции для работы со временем. Она позволяет выполнять такие операции, как получение текущего времени, задержка выполнения программы (например, с помощью
time.sleep()
), и преобразование времени в различные форматы.
threading:
Это стандартная библиотека Python для создания и управления потоками. Она позволяет выполнять несколько операций одновременно (параллельно), что может быть полезно для повышения производительности программ, особенно в задачах, связанных с I/O или длительными вычислениями.
collections:
Стандартная библиотека Python, которая предоставляет специализированные контейнеры данных, такие как
deque
(двусторонняя очередь),Counter
(для подсчета хешируемых объектов),defaultdict
(словарь с значениями по умолчанию) и другие. Эти структуры данных могут быть более эффективными или удобными, чем стандартные списки или словари.
cv2 (OpenCV):
OpenCV (Open Source Computer Vision Library) - это библиотека для компьютерного зрения и обработки изображений. Она предоставляет множество функций для работы с изображениями и видео, включая распознавание объектов, фильтрацию, трансформации и анализ изображений.
numpy:
NumPy - это библиотека для работы с многомерными массивами и матрицами, а также для выполнения математических операций над ними. Она является основой для многих других библиотек в Python для научных вычислений и анализа данных, таких как SciPy и Pandas.
mediapipe:
MediaPipe - это библиотека для построения мультимодальных (видео, аудио, текст) приложений, в основном для задач компьютерного зрения и машинного обучения. Она предоставляет готовые решения для распознавания лиц, жестов, поз и других задач, что делает ее удобной для разработки приложений с использованием технологий машинного обучения.
socketio:
Socket.IO - это библиотека для работы с веб-сокетами, которая позволяет устанавливать двунаправленное взаимодействие между клиентом и сервером в реальном времени. Она широко используется для создания чат-приложений, уведомлений и других приложений, требующих мгновенной передачи данных.
aiofiles:
Aiofiles - это библиотека для асинхронного чтения и записи файлов. Она позволяет выполнять операции с файлами без блокировки основного потока, что делает ее полезной в асинхронных приложениях, где важна производительность и отзывчивость.
Из этих библиотек не предустановлены в python: opencv, mediapipe, aiofiles и socketio. Подробнее об установке в разделе "Библиотеки"
Инициализация переменных и инициализация сервера
server = 'http://192.168.10.115:3006/' # адрес вашего сервера
sio = socketio.AsyncClient()
rtsp_url = 'rtsp://username:password@ip_address:port/stream'
cap = cv2.VideoCapture(rtsp_url)
# Установка разрешения видеопотока
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
mpHands = mp.solutions.hands
hands = mpHands.Hands(False)
npDraw = mp.solutions.drawing_utils
# Очередь для хранения координат пальцев
finger_positions = deque(maxlen=5) # Можно настроить длину очереди
# Переменные для управления жестами
last_gesture_time = 0
gesture_cooldown = 1.5 # Время ожидания между жестами в секундах
1. Подключение к серверу
server = 'http://192.168.10.115:3006/' # адрес вашего сервера
Здесь задается адрес сервера, с которым будет взаимодействовать клиент.
2. Создание асинхронного клиента Socket.IO
sio = socketio.AsyncClient()
Создается экземпляр асинхронного клиента Socket.IO. Этот клиент будет использоваться для установления соединения с сервером и отправки/получения сообщений в реальном времени.
3. Настройка видеопотока
rtsp_url = 'rtsp://username:password@ip_address:port/stream'
cap = cv2.VideoCapture(rtsp_url)
Создается объект
VideoCapture
из библиотеки OpenCV, который используется для захвата видео с камеры. В данном случае используется rtsp-поток камеры вашего ровера.
4. Установка разрешения видеопотока
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
Эти строки устанавливают разрешение видеопотока на 640x480 пикселей. Это может помочь снизить нагрузку на процессор и улучшить производительность, особенно если не требуется высокое разрешение.
5. Инициализация MediaPipe для распознавания рук
mpHands = mp.solutions.hands
hands = mpHands.Hands(False)
npDraw = mp.solutions.drawing_utils
Здесь используется библиотека MediaPipe для распознавания рук.
mpHands
инициализирует решение для распознавания рук.hands = mpHands.Hands(False)
создает объект для распознавания рук, гдеFalse
указывает, что не нужно использовать информацию о вероятности (это может ускорить процесс).npDraw = mp.solutions.drawing_utils
предоставляет инструменты для рисования на изображениях, например, для отображения распознанных ключевых точек.
6. Очередь для хранения координат пальцев
finger_positions = deque(maxlen=5) # Можно настроить длину очереди
Создается двусторонняя очередь (deque) для хранения координат пальцев. Параметр
maxlen=5
ограничивает длину очереди до 5 элементов. Это может быть полезно для хранения последних координат пальцев, чтобы отслеживать их движение или выполнять анализ.
7. Переменные для управления жестами
last_gesture_time = 0
gesture_cooldown = 1.5 # Время ожидания между жестами в секундах
last_gesture_time
хранит время последнего зарегистрированного жеста. Эта переменная может использоваться для ограничения частоты распознавания жестов.gesture_cooldown = 1.5
определяет время ожидания между распознаванием жестов в секундах. Это предотвращает слишком частое срабатывание одного и того же жеста, что может быть полезно для улучшения пользовательского опыта.
Команды движения и записи отчета
async def write_report(message):
filename = 'Отчет_Фамилия_Имя.txt'
async with aiofiles.open(filename, mode='a', encoding='utf-8') as file:
await file.write(message + "\n")
@sio.event
async def connect():
await write_report('Подключено к серверу Socket.IO')
print("Подключено к серверу Socket.IO")
async def control_lights(on: bool):
lights_command = {
'id': 1,
'type': 'lights',
'value': on
}
await sio.emit('command', lights_command, namespace='/vehicles')
action = "включены" if on else "выключены"
message = f"Команда на включение света {action} отправлена."
await write_report(message)
async def perform_movement(distance_cm):
max_speed = 0.5 # Максимальная скорость м/с
distance_m = distance_cm / 100.0 # Переводим сантиметры в метры
time_to_travel = distance_m / max_speed
start_time = time.time()
while (time.time() - start_time) < time_to_travel:
await sio.emit('command', {
'id': 1,
'type': 'moveDirectly',
'value': {
'right': 0.15,
'left': 0.15
}
}, namespace='/vehicles')
await asyncio.sleep(0.2)
completion_message = f"Движение завершено: Пройденное расстояние {distance_m} м."
await write_report(completion_message)
print(completion_message)
async def backward_movement(distance_cm):
max_speed = 0.5 # Максимальная скорость м/с
distance_m = distance_cm / 100.0 # Переводим сантиметры в метры
time_to_travel = distance_m / max_speed
start_time = time.time()
while (time.time() - start_time) < time_to_travel:
await sio.emit('command', {
'id': 1,
'type': 'moveDirectly',
'value': {
'right': -0.25,
'left': -0.25
}
}, namespace='/vehicles')
await asyncio.sleep(0.2)
completion_message = f"Движение завершено: Пройденное расстояние {distance_m} м."
await write_report(completion_message)
print(completion_message)
async def stop_movement():
await sio.emit('command', {
'id': 1,
'type': "move",
'value': {
'x': 0,
'y': 0,
'sensitivity': 1,
},
}, namespace='/vehicles')
stop_message = "Команда остановки движения отправлена."
await write_report(stop_message)
print(stop_message)
Функция write_report отвечает за запись проделанных ровером команд.
Функция connect - это обработчик события подключения к серверу.
Функция control_lights вызывается асинхронно и управляет включением и выключением фар (где True - это включить, а False - это выключить).
Функция perform_movement вызывается асинхронно и управляет движением ровера вперед (значение задается в сантиметрах (пример: await perform_movement(10)
- проехать вперед на 10 сантиметров)).
Функция backward_movement вызывается асинхронно и управляет движением назад (значение задается в сантиметрах (пример: await backward_movement(10) - проехать назад на 10 сантиметров)).
Функция stop_movement вызывается асинхронно и управляет остановкой ровера (пример: await stop_movement() - остановка ровера).
Функции фильтрации и инициализации медиан пальцев
1. apply_median_filter(landmarks)
apply_median_filter(landmarks)
def apply_median_filter(landmarks):
"""Применяет медианный фильтр к координатам рук."""
# Преобразуем landmarks в массив numpy для удобства
landmarks_array = np.array([[lm.x, lm.y, lm.z] for lm in landmarks])
if len(finger_positions) == finger_positions.maxlen:
filtered_landmarks = np.median(finger_positions, axis=0)
else:
filtered_landmarks = landmarks_array
finger_positions.append(landmarks_array)
return filtered_landmarks
Описание:
Цель: Эта функция применяет медианный фильтр к координатам рук, чтобы сгладить данные и устранить шум.
Входные данные:
landmarks
— это список координат (объектов), представляющих положение ключевых точек на руке.Преобразование: Координаты преобразуются в массив NumPy для удобного вычисления.
Фильтрация: Если очередь
finger_positions
заполнена до максимальной длины, функция вычисляет медиану по всем сохраненным позициям пальцев, чтобы получить сглаженные координаты. В противном случае возвращаются текущие координаты.Сохранение: Текущие координаты добавляются в очередь
finger_positions
для последующего использования.Выход: Возвращает отфильтрованные координаты рук.
2. are_fingers_raised_for_activation(landmarks)
are_fingers_raised_for_activation(landmarks)
def are_fingers_raised_for_activation(landmarks):
index_finger_tip = landmarks[mpHands.HandLandmark.INDEX_FINGER_TIP.value]
index_finger_base = landmarks[mpHands.HandLandmark.INDEX_FINGER_MCP.value]
middle_finger_tip = landmarks[mpHands.HandLandmark.MIDDLE_FINGER_TIP.value]
middle_finger_base = landmarks[mpHands.HandLandmark.MIDDLE_FINGER_MCP.value]
return (
index_finger_tip[1] < index_finger_base[1] and
middle_finger_tip[1] < middle_finger_base[1]
)
Описание:
Цель: Определяет, подняты ли указательный и средний пальцы для активации какого-либо действия (например, жеста).
Входные данные:
landmarks
— это список координат, представляющих положение ключевых точек на руке.Логика: Функция проверяет, находятся ли кончики указательного и среднего пальцев выше их оснований по оси Y. Если оба пальца подняты, функция возвращает
True
, иначе —False
.Выход: Возвращает логическое значение, указывающее на состояние пальцев.
3. is_index_finger_raised(landmarks)
is_index_finger_raised(landmarks)
def is_index_finger_raised(landmarks):
index_finger_tip = landmarks[mpHands.HandLandmark.INDEX_FINGER_TIP.value]
index_finger_base = landmarks[mpHands.HandLandmark.INDEX_FINGER_MCP.value]
return index_finger_tip[1] < index_finger_base[1]
Описание:
Цель: Проверяет, поднят ли указательный палец.
Входные данные:
landmarks
— это список координат, представляющих положение ключевых точек на руке.Логика: Функция сравнивает положение кончика указательного пальца с его основанием. Если кончик выше основания по оси Y, возвращается
True
, иначе —False
.Выход: Возвращает логическое значение, указывающее на состояние указательного пальца.
4. is_thumb_raised(landmarks)
is_thumb_raised(landmarks)
def is_thumb_raised(landmarks):
thumb_tip = landmarks[mpHands.HandLandmark.THUMB_TIP.value]
thumb_base = landmarks[mpHands.HandLandmark.THUMB_MCP.value]
return thumb_tip[1] < thumb_base[1]
Описание:
Цель: Проверяет, поднят ли большой палец.
Входные данные:
landmarks
— это список координат, представляющих положение ключевых точек на руке.Логика: Функция сравнивает положение кончика большого пальца с его основанием. Если кончик выше основания по оси Y, возвращается
True
, иначе —False
.Выход: Возвращает логическое значение, указывающее на состояние большого пальца.
Функции проверок положения пальцев и открытой/закрытой ладони
1. are_fingers_raised_for_backward(landmarks)
are_fingers_raised_for_backward(landmarks)
def are_fingers_raised_for_backward(landmarks):
"""Проверяет, подняты ли мизинец, безымянный и средний пальцы, а указательный и большой опущены."""
pinky_tip = landmarks[mpHands.HandLandmark.PINKY_TIP.value]
pinky_base = landmarks[mpHands.HandLandmark.PINKY_MCP.value]
ring_finger_tip = landmarks[mpHands.HandLandmark.RING_FINGER_TIP.value]
ring_finger_base = landmarks[mpHands.HandLandmark.RING_FINGER_MCP.value]
middle_finger_tip = landmarks[mpHands.HandLandmark.MIDDLE_FINGER_TIP.value]
middle_finger_base = landmarks[mpHands.HandLandmark.MIDDLE_FINGER_MCP.value]
index_finger_tip = landmarks[mpHands.HandLandmark.INDEX_FINGER_TIP.value]
index_finger_base = landmarks[mpHands.HandLandmark.INDEX_FINGER_MCP.value]
thumb_tip = landmarks[mpHands.HandLandmark.THUMB_TIP.value]
thumb_base = landmarks[mpHands.HandLandmark.THUMB_MCP.value]
return (
pinky_tip[1] < pinky_base[1] and
ring_finger_tip[1] < ring_finger_base[1] and
middle_finger_tip[1] < middle_finger_base[1] and
index_finger_tip[1] > index_finger_base[1] and # Указательный палец опущен
thumb_tip[1] > thumb_base[1] # Большой палец опущен
)
Описание:
Цель: Проверяет, подняты ли мизинец, безымянный и средний пальцы, в то время как указательный и большой пальцы опущены. Это может использоваться для распознавания жеста, указывающего на движение назад или отмену.
Входные данные:
landmarks
— список координат ключевых точек на руке.Логика: Функция проверяет положение кончиков и оснований пальцев по оси Y. Если мизинец, безымянный и средний пальцы подняты (кончики выше оснований), а указательный и большой пальцы опущены (кончики ниже оснований), функция возвращает
True
, иначе —False
.Выход: Возвращает логическое значение, указывающее на состояние пальцев.
2. is_hand_open(landmarks)
is_hand_open(landmarks)
def is_hand_open(landmarks):
"""Проверяет, подняты ли все пальцы для жеста раскрытой ладони."""
fingers_raised = [
landmarks[mpHands.HandLandmark.THUMB_TIP.value][1] < landmarks[mpHands.HandLandmark.THUMB_MCP.value][1],
landmarks[mpHands.HandLandmark.INDEX_FINGER_TIP.value][1] < landmarks[mpHands.HandLandmark.INDEX_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.MIDDLE_FINGER_TIP.value][1] < landmarks[mpHands.HandLandmark.MIDDLE_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.RING_FINGER_TIP.value][1] < landmarks[mpHands.HandLandmark.RING_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.PINKY_TIP.value][1] < landmarks[mpHands.HandLandmark.PINKY_MCP.value][1],
]
# Преобразуем в массив numpy и проверяем, что все элементы являются булевыми
fingers_raised = np.array(fingers_raised, dtype=bool)
return np.all(fingers_raised)
Описание:
Цель: Проверяет, подняты ли все пальцы для жеста раскрытой ладони (открытая рука).
Входные данные:
landmarks
— список координат ключевых точек на руке.Логика: Функция создает список, который содержит логические значения для каждого пальца, указывая, поднят ли он (кончик выше основания). Затем преобразует этот список в массив NumPy и проверяет, все ли элементы истинны.
Выход: Возвращает
True
, если все пальцы подняты, иначе —False
.
3. is_hand_closed(landmarks)
is_hand_closed(landmarks)
def is_hand_closed(landmarks):
"""Проверяет, закрыта ли ладонь (все пальцы опущены)."""
fingers_lowered = [
landmarks[mpHands.HandLandmark.THUMB_TIP.value][1] > landmarks[mpHands.HandLandmark.THUMB_MCP.value][1],
landmarks[mpHands.HandLandmark.INDEX_FINGER_TIP.value][1] > landmarks[mpHands.HandLandmark.INDEX_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.MIDDLE_FINGER_TIP.value][1] > landmarks[mpHands.HandLandmark.MIDDLE_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.RING_FINGER_TIP.value][1] > landmarks[mpHands.HandLandmark.RING_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.PINKY_TIP.value][1] > landmarks[mpHands.HandLandmark.PINKY_MCP.value][1],
]
# Преобразуем в массив numpy и проверяем, что все элементы являются булевыми
fingers_lowered = np.array(fingers_lowered, dtype=bool)
return np.all(fingers_lowered)
Описание:
Цель: Проверяет, закрыта ли рука (все пальцы опущены).
Входные данные:
landmarks
— список координат ключевых точек на руке.Логика: Функция создает список, который содержит логические значения для каждого пальца, указывая, опущен ли он (кончик ниже основания). Затем преобразует этот список в массив NumPy и проверяет, все ли элементы истинны.
Выход: Возвращает
True
, если все пальцы опущены, иначе —False
.
Основная функция (process_video_stream)
Глобальные переменные:
cap
: Объект для захвата видео.last_gesture_time
: Время последнего распознанного жеста.
Инициализация переменных:
pTime
: Время для расчета FPS (кадров в секунду).lights_on
,activation_done
,activation_start_time
,thumb_raised_message_shown
,backward_movement_active
: Логические переменные для отслеживания состояния жестов и управления действиями.
Основной цикл обработки видео:
Чтение кадра: Использует
cap.read()
для получения текущего кадра. Если не удается получить кадр, выводится сообщение об ошибке.Обработка изображения: Кадр переворачивается и преобразуется в формат RGB для обработки с помощью MediaPipe.
Распознавание жестов: Если обнаружены ключевые точки рук, обрабатываются каждое из них:
Рисуются соединения между ключевыми точками.
Применяется медианный фильтр для сглаживания координат рук.
Проверяется, выполнен ли жест активации, и если да, то активируются другие жесты.
Проверяются различные жесты (раскрытая ладонь, поднятый указательный палец, движение назад, закрытая ладонь) и выполняются соответствующие действия (включение/выключение света, движение назад, остановка движения).
Расчет FPS: Вычисляется количество кадров в секунду и отображается на изображении.
Отображение видео: Используется
cv2.imshow
для показа обработанного видео. Выход из цикла осуществляется по нажатию клавиши ESC.Освобождение ресурсов: После завершения обработки видео поток закрывается, и окна OpenCV закрываются.
Функции запуска программы и мультипоточности
Функция sensor
sensor
async def sensor(loop):
await sio.connect(server, wait_timeout=20, namespaces=['/vehicles'])
# Основной цикл для обработки событий Socket.IO
while True:
await asyncio.sleep(1) # Держим основной поток активным
Описание:
Асинхронная функция:
sensor
объявлена как асинхронная функция с помощью ключевого словаasync
. Это позволяет использоватьawait
для асинхронных операций внутри функции.Подключение к серверу:
await sio.connect(server, wait_timeout=20, namespaces=['/vehicles'])
— здесь происходит подключение к серверу через Socket.IO.server
— это переменная (предположительно строка), содержащая адрес сервера, к которому осуществляется подключение.wait_timeout=20
— время ожидания подключения, после которого будет выброшено исключение, если подключение не удалось.namespaces=['/vehicles']
— указывает пространство имен, к которому будет подключаться клиент.
Основной цикл:
while True:
— бесконечный цикл, который поддерживает активность функции.await asyncio.sleep(1)
— приостанавливает выполнение функции на 1 секунду, чтобы не загружать процессор. Это позволяет основному потоку оставаться активным и обрабатывать другие асинхронные задачи.
Функция main
main
def main():
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Запуск потока для обработки видеопотока
video_thread = threading.Thread(target=process_video_stream, args=(loop,))
video_thread.start()
# Запуск асинхронной функции для подключения к серверу
loop.run_until_complete(sensor(loop))
except Exception as e:
print(f"Ошибка: {e}")
if __name__ == "__main__":
main()
Описание:
Создание нового цикла событий:
loop = asyncio.new_event_loop()
— создается новый цикл событий, который будет использоваться для выполнения асинхронных задач.asyncio.set_event_loop(loop)
— устанавливает созданный цикл событий как текущий для текущего потока.
Запуск потока для обработки видеопотока:
video_thread = threading.Thread(target=process_video_stream, args=(loop,))
— создается новый поток, который будет выполнять функциюprocess_video_stream
, передавая в нее цикл событийloop
.video_thread.start()
— запускает созданный поток.
Запуск асинхронной функции:
loop.run_until_complete(sensor(loop))
— запускает выполнение асинхронной функцииsensor
, пока она не завершится. Это блокирует выполнение основного потока до тех пор, покаsensor
не завершит свою работу (в данном случае, это будет бесконечный цикл, так что основной поток будет заблокирован на этом этапе).
Обработка исключений:
except Exception as e:
— блок для обработки любых исключений, которые могут возникнуть в процессе выполнения.print(f"Ошибка: {e}")
— выводит сообщение об ошибке в консоль, если что-то пойдет не так.
Запуск программы:
if __name__ == "__main__":
— проверяет, является ли данный файл основным модулем, и если да, вызывает функциюmain()
для начала выполнения программы.
Принцип работы
Список жестов:
Активация жестов:

Включение фар:


ПРИМЕЧАНИЕ: после выключения фар камера видит жест остановки, подробнее ниже.
Жест движения вперед:

При поднятии трех пальцев ровер поедет вперед на 30 сантиметров и остановится, и в случае, если проедет - начнет ехать по новой, увидев этот жест снова
Жест движения назад:

Жест остановки ровера:

Last updated