Движение с использованием жестами
# Импорт библиотек
import asyncio
import cv2
import numpy as np
import mediapipe as mp
import socketio
import aiofiles
import time
import threading
from collections import deque
server = 'http://192.168.10.115:3006/' # адрес вашего сервера
sio = socketio.AsyncClient()
# Замените 'rtsp://username:password@ip_address:port/stream' на ваш RTSP поток
rtsp_url = 'rtsp://username:password@ip_address:port/stream'
cap = cv2.VideoCapture(rtsp_url)
# Установка разрешения видеопотока
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
mpHands = mp.solutions.hands
hands = mpHands.Hands(False)
npDraw = mp.solutions.drawing_utils
# Очередь для хранения координат пальцев
finger_positions = deque(maxlen=5) # Можно настроить длину очереди
# Переменные для управления жестами
last_gesture_time = 0
gesture_cooldown = 1.5 # Время ожидания между жестами в секундах
async def write_report(message):
filename = 'Отчет_Фамилия_Имя.txt'
async with aiofiles.open(filename, mode='a', encoding='utf-8') as file:
await file.write(message + "\n")
@sio.event
async def connect():
await write_report('Подключено к серверу Socket.IO')
print("Подключено к серверу Socket.IO")
async def control_lights(on: bool):
lights_command = {
'id': 1,
'type': 'lights',
'value': on
}
await sio.emit('command', lights_command, namespace='/vehicles')
action = "включены" if on else "выключены"
message = f"Команда на включение света {action} отправлена."
await write_report(message)
async def perform_movement(distance_cm):
max_speed = 0.5 # Максимальная скорость м/с
distance_m = distance_cm / 100.0 # Переводим сантиметры в метры
time_to_travel = distance_m / max_speed
start_time = time.time()
while (time.time() - start_time) < time_to_travel:
await sio.emit('command', {
'id': 1,
'type': 'moveDirectly',
'value': {
'right': 0.15,
'left': 0.15
}
}, namespace='/vehicles')
await asyncio.sleep(0.2)
completion_message = f"Движение завершено: Пройденное расстояние {distance_m} м."
await write_report(completion_message)
print(completion_message)
async def backward_movement(distance_cm):
max_speed = 0.5 # Максимальная скорость м/с
distance_m = distance_cm / 100.0 # Переводим сантиметры в метры
time_to_travel = distance_m / max_speed
start_time = time.time()
while (time.time() - start_time) < time_to_travel:
await sio.emit('command', {
'id': 1,
'type': 'moveDirectly',
'value': {
'right': -0.25,
'left': -0.25
}
}, namespace='/vehicles')
await asyncio.sleep(0.2)
completion_message = f"Движение завершено: Пройденное расстояние {distance_m} м."
await write_report(completion_message)
print(completion_message)
async def stop_movement():
await sio.emit('command', {
'id': 1,
'type': "move",
'value': {
'x': 0,
'y': 0,
'sensitivity': 1,
},
}, namespace='/vehicles')
stop_message = "Команда остановки движения отправлена."
await write_report(stop_message)
print(stop_message)
def apply_median_filter(landmarks):
"""Применяет медианный фильтр к координатам рук."""
# Преобразуем landmarks в массив numpy для удобства
landmarks_array = np.array([[lm.x, lm.y, lm.z] for lm in landmarks])
if len(finger_positions) == finger_positions.maxlen:
filtered_landmarks = np.median(finger_positions, axis=0)
else:
filtered_landmarks = landmarks_array
finger_positions.append(landmarks_array)
return filtered_landmarks
def are_fingers_raised_for_activation(landmarks):
index_finger_tip = landmarks[mpHands.HandLandmark.INDEX_FINGER_TIP.value]
index_finger_base = landmarks[mpHands.HandLandmark.INDEX_FINGER_MCP.value]
middle_finger_tip = landmarks[mpHands.HandLandmark.MIDDLE_FINGER_TIP.value]
middle_finger_base = landmarks[mpHands.HandLandmark.MIDDLE_FINGER_MCP.value]
return (
index_finger_tip[1] < index_finger_base[1] and
middle_finger_tip[1] < middle_finger_base[1]
)
def is_index_finger_raised(landmarks):
index_finger_tip = landmarks[mpHands.HandLandmark.INDEX_FINGER_TIP.value]
index_finger_base = landmarks[mpHands.HandLandmark.INDEX_FINGER_MCP.value]
return index_finger_tip[1] < index_finger_base[1]
def is_thumb_raised(landmarks):
thumb_tip = landmarks[mpHands.HandLandmark.THUMB_TIP.value]
thumb_base = landmarks[mpHands.HandLandmark.THUMB_MCP.value]
return thumb_tip[1] < thumb_base[1]
def are_fingers_raised_for_backward(landmarks):
"""Проверяет, подняты ли мизинец, безымянный и средний пальцы, а указательный и большой опущены."""
pinky_tip = landmarks[mpHands.HandLandmark.PINKY_TIP.value]
pinky_base = landmarks[mpHands.HandLandmark.PINKY_MCP.value]
ring_finger_tip = landmarks[mpHands.HandLandmark.RING_FINGER_TIP.value]
ring_finger_base = landmarks[mpHands.HandLandmark.RING_FINGER_MCP.value]
middle_finger_tip = landmarks[mpHands.HandLandmark.MIDDLE_FINGER_TIP.value]
middle_finger_base = landmarks[mpHands.HandLandmark.MIDDLE_FINGER_MCP.value]
index_finger_tip = landmarks[mpHands.HandLandmark.INDEX_FINGER_TIP.value]
index_finger_base = landmarks[mpHands.HandLandmark.INDEX_FINGER_MCP.value]
thumb_tip = landmarks[mpHands.HandLandmark.THUMB_TIP.value]
thumb_base = landmarks[mpHands.HandLandmark.THUMB_MCP.value]
return (
pinky_tip[1] < pinky_base[1] and
ring_finger_tip[1] < ring_finger_base[1] and
middle_finger_tip[1] < middle_finger_base[1] and
index_finger_tip[1] > index_finger_base[1] and # Указательный палец опущен
thumb_tip[1] > thumb_base[1] # Большой палец опущен
)
def is_hand_open(landmarks):
"""Проверяет, подняты ли все пальцы для жеста раскрытой ладони."""
fingers_raised = [
landmarks[mpHands.HandLandmark.THUMB_TIP.value][1] < landmarks[mpHands.HandLandmark.THUMB_MCP.value][1],
landmarks[mpHands.HandLandmark.INDEX_FINGER_TIP.value][1] < landmarks[mpHands.HandLandmark.INDEX_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.MIDDLE_FINGER_TIP.value][1] < landmarks[mpHands.HandLandmark.MIDDLE_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.RING_FINGER_TIP.value][1] < landmarks[mpHands.HandLandmark.RING_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.PINKY_TIP.value][1] < landmarks[mpHands.HandLandmark.PINKY_MCP.value][1],
]
# Преобразуем в массив numpy и проверяем, что все элементы являются булевыми
fingers_raised = np.array(fingers_raised, dtype=bool)
return np.all(fingers_raised)
def is_hand_closed(landmarks):
"""Проверяет, закрыта ли ладонь (все пальцы опущены)."""
fingers_lowered = [
landmarks[mpHands.HandLandmark.THUMB_TIP.value][1] > landmarks[mpHands.HandLandmark.THUMB_MCP.value][1],
landmarks[mpHands.HandLandmark.INDEX_FINGER_TIP.value][1] > landmarks[mpHands.HandLandmark.INDEX_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.MIDDLE_FINGER_TIP.value][1] > landmarks[mpHands.HandLandmark.MIDDLE_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.RING_FINGER_TIP.value][1] > landmarks[mpHands.HandLandmark.RING_FINGER_MCP.value][1],
landmarks[mpHands.HandLandmark.PINKY_TIP.value][1] > landmarks[mpHands.HandLandmark.PINKY_MCP.value][1],
]
# Преобразуем в массив numpy и проверяем, что все элементы являются булевыми
fingers_lowered = np.array(fingers_lowered, dtype=bool)
return np.all(fingers_lowered)
def process_video_stream(loop):
global cap
global last_gesture_time # добавляем глобальную переменную
pTime = time.time()
lights_on = False
activation_done = False
activation_start_time = None
thumb_raised_message_shown = False
backward_movement_active = False
while True:
success, img = cap.read()
if not success:
print("Не удалось получить кадр из потока.")
break
img = cv2.flip(img, 1)
imgRGB = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
results = hands.process(imgRGB)
if results.multi_hand_landmarks:
for handLms in results.multi_hand_landmarks:
npDraw.draw_landmarks(img, handLms, mpHands.HAND_CONNECTIONS)
# Применение медианного фильтра к координатам рук
filtered_landmarks = apply_median_filter(handLms.landmark)
current_time = time.time()
if not activation_done and are_fingers_raised_for_activation(filtered_landmarks):
if activation_start_time is None:
activation_start_time = time.time()
elif time.time() - activation_start_time >= 5:
activation_done = True
print("Жест активации выполнен. Теперь доступны другие жесты.")
else:
activation_start_time = None
if activation_done:
# Проверка на жест раскрытой ладони
if is_hand_open(filtered_landmarks) and (current_time - last_gesture_time >= gesture_cooldown):
asyncio.run_coroutine_threadsafe(backward_movement(30), loop)
last_gesture_time = current_time # обновляем время последнего жеста
print("Жест раскрытой ладони обнаружен! Игнорирование других команд.")
continue # Игнорируем другие команды
if is_index_finger_raised(filtered_landmarks) and (current_time - last_gesture_time >= gesture_cooldown):
if not lights_on:
asyncio.run_coroutine_threadsafe(control_lights(True), loop)
lights_on = True
last_gesture_time = current_time # обновляем время последнего жеста
print("Фары включены.")
else:
if lights_on and (current_time - last_gesture_time >= gesture_cooldown):
asyncio.run_coroutine_threadsafe(control_lights(False), loop)
lights_on = False
last_gesture_time = current_time # обновляем время последнего жеста
print("Фары выключены.")
# Проверка для движения назад
if are_fingers_raised_for_backward(filtered_landmarks) and (current_time - last_gesture_time >= gesture_cooldown):
if not backward_movement_active:
asyncio.run_coroutine_threadsafe(perform_movement(30), loop)
backward_movement_active = True
last_gesture_time = current_time # обновляем время последнего жеста
print("Мизинец, безымянный и средний пальцы подняты! Движение назад начато.")
else:
if backward_movement_active:
asyncio.run_coroutine_threadsafe(stop_movement(), loop)
backward_movement_active = False
print("Движение назад остановлено.")
# Проверка на жест закрытой ладони
if is_hand_closed(filtered_landmarks) and (current_time - last_gesture_time >= gesture_cooldown):
asyncio.run_coroutine_threadsafe(stop_movement(), loop)
last_gesture_time = current_time # обновляем время последнего жеста
print("Жест закрытой ладони обнаружен! Остановка движения.")
cTime = time.time()
fps = 1 / (cTime - pTime) if pTime > 0 else 0
pTime = cTime
cv2.putText(img, str(int(fps)), (10, 30), cv2.FONT_HERSHEY_PLAIN, 2, (255, 0, 0), 2)
cv2.imshow('RTSP Stream', img)
if cv2.waitKey(20) == 27: # выход по ESC
break
cap.release()
cv2.destroyAllWindows()
async def sensor(loop):
await sio.connect(server, wait_timeout=20, namespaces=['/vehicles'])
# Основной цикл для обработки событий Socket.IO
while True:
await asyncio.sleep(1) # Держим основной поток активным
def main():
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Запуск потока для обработки видеопотока
video_thread = threading.Thread(target=process_video_stream, args=(loop,))
video_thread.start()
# Запуск асинхронной функции для подключения к серверу
loop.run_until_complete(sensor(loop))
except Exception as e:
print(f"Ошибка: {e}")
if __name__ == "__main__":
main()
Объяснение кода
1. Подключение к серверу
2. Создание асинхронного клиента Socket.IO
3. Настройка видеопотока
4. Установка разрешения видеопотока
5. Инициализация MediaPipe для распознавания рук
6. Очередь для хранения координат пальцев
7. Переменные для управления жестами
Команды движения и записи отчета
Функции фильтрации и инициализации медиан пальцев
1. apply_median_filter(landmarks)
apply_median_filter(landmarks)2. are_fingers_raised_for_activation(landmarks)
are_fingers_raised_for_activation(landmarks)3. is_index_finger_raised(landmarks)
is_index_finger_raised(landmarks)4. is_thumb_raised(landmarks)
is_thumb_raised(landmarks)Функции проверок положения пальцев и открытой/закрытой ладони
1. are_fingers_raised_for_backward(landmarks)
are_fingers_raised_for_backward(landmarks)2. is_hand_open(landmarks)
is_hand_open(landmarks)3. is_hand_closed(landmarks)
is_hand_closed(landmarks)Основная функция (process_video_stream)
Функции запуска программы и мультипоточности
Функция sensor
sensorФункция main
mainПринцип работы






Последнее обновление