Основные библиотеки Python, используемые в ровере

На этой странице представлены основные использующиеся библиотеке в коде под ровер "Контакт". Больше библиотек прописано в описаниях кодов. Рекомендуется для проверки кодов на этой странице создать отдельную папку.

Папка для дальнейшего хранения файлов с примерами кодов

ПРИМЕЧАНИЕ: Библиотеки, которые вы используете в VS Code, необходимо так же докачивать на Orange Pi (подробнее об установке Python и pip смотрите в разделе "Работа с Orange Pi" в статьях "Установка Python 3.10" и "Установка pip"). Команды установки библиотек в VS Code и в Orange Pi не отличаются.

asyncio:

  • Описание: asyncio используется в коде ровера для написания асинхронных программ, используя корутины, задачи и событийные циклы.

  • Особенности:

    • Позволяет выполнять несколько задач ровера одновременно, не блокируя основной поток выполнения.

    • Используется для асинхронного ввода-вывода, что повышает производительность при работе с сетью.

  • Роль в проекте:

    • Асинхронная обработка соединений с клиентами через сокеты.

    • Асинхронное чтение файлов с помощью aiofiles.

    • Управление событиями и задачами в рамках основного цикла событий.

  • Пример кода с использование библиотеки asyncio:

import asyncio


async def move():
    print("Ровер начинает движение.")
    await asyncio.sleep(2)
    print("Ровер продолжает движение.")
    await asyncio.sleep(2)
    print("Ровер останавливается.")

async def turn(direction):
    print(f"Ровер начинает поворот {direction}.")
    await asyncio.sleep(2)
    print(f"Ровер завершает поворот {direction}.")

async def blink_lights():
    for _ in range(5):
        print("Фары мигают.")
        await asyncio.sleep(1)

async def main():
    await asyncio.gather(
        move(),
        turn("налево"),
        blink_lights()
    )

# Запуск событийного цикла
asyncio.run(main())

Описание кода:

  1. Функция move Имитирует движение ровера, выводя сообщения о начале, продолжении и остановке движения с 2-секундными интервалами.

  2. Функция turn Имитирует поворот ровера в заданном направлении (налево или направо), выводя сообщения о начале и завершении поворота с 2-секундными интервалами.

  3. Функция blink_lights Имитирует мигание фар ровера, выводя сообщение о мигании каждую секунду в течение 5 секунд.

  4. Функция main: Функция main использует asyncio.gather для одновременного запуска всех трех асинхронных задач (move(), turn("налево"), blink_lights()).Функция main использует asyncio.gather для одновременного запуска всех трех асинхронных задач (move(), turn("налево"), blink_lights()).

  5. asyncio.run(main()) - запуск кода

Итоговый вывод:

Ровер начинает движение.
Ровер начинает поворот налево.
Фары мигают.
Фары мигают.
Ровер продолжает движение.
Фары мигают.
Ровер завершает поворот налево.
Фары мигают.
Ровер останавливается.
Фары мигают.

Процесс запуска кода:

  1. Откройте VS Code:

    • Запустите Visual Studio Code на вашем компьютере.

    Первое окно при запуске Visual Studio Code
  2. Откройте папку с проектом:

    • В VS Code выберите File -> Open Folder... и выберите папку, которую вы создали.

    Выбор действия во вкладе File
    Вот так выглядит окно иерархии сейчас
  3. Создайте файл с кодом:

    • Cоздайте новый файл с расширением .py, назовите его asynciolib.py, и вставьте в него предоставленный код.

Создание нового файла
Конечный вид окна иерархии

ПРИМЕЧАНИЕ: Когда в списке действий в иерархии вы нажимаете New File, изначально создается внутренняя папка, где в дальнейшем будут храниться наши файлы. Назовите ее как хотите и после, наведя курсор на папку, создайте файл таким же образом. Так же не забывайте в конце названия файла прописывать .py - без типа файла VS Code автоматически создает txt файл.

  1. Установите необходимые зависимости:

  • В данном случае библиотека предустановлена, так что в окно программирования вставляем наш код

Код в файле asynciolib.py
  1. Сохранение файла: перед запуском кода всегда сохраняйте свой файл комбинацией ctrl + S или через file - save.

  2. Запустите ваш код: в правом верхнем углу есть значок запуска кода. Нажав на него, у вас автоматически вылезет терминал, в который и будет выводиться результат. ВАЖНО: когда вы запустите код впервые, у вас попросят выбрать версию

Кнопка запуска программы

Вывод консоли и данных о проезде и состоянии фар ровера

contextlib:

  • Описание: contextlib используется в ровере для работы с контекстными менеджерами.

  • Особенности:

    • Облегчает создание и использование контекстных менеджеров.

    • Содержит полезные утилиты, такие как suppress, для обработки исключений.

  • Роль в проекте:

    • Управление контекстами и исключениями при выполнении задач, что позволяет обеспечить безопасное выполнение кода.

  • Пример кода с работой библиотеки contextlib:

import contextlib

@contextlib.contextmanager
def open_file(filename, mode):
    try:
        f = open(filename, mode)
        print(f"Файл {filename} открыт.")
        yield f
    finally:
        f.close()
        print(f"Файл {filename} закрыт.")

# Использование контекстного менеджера
with open_file('example.txt', 'w') as file:
    file.write('Hello, rover Contact!')
    print("Данные записаны в файл.")

# Чтение данных из файла
with open_file('example.txt', 'r') as file:
    data = file.read()
    print(f"Прочитанные данные: {data}")

Описание кода:

  1. @contextlib.contextmanager - Создание контекстного менеджера

  2. Функция open_file:

    • Функция open_file принимает два аргумента: filename (имя файла) и mode(режим открытия файла).

    • В блоке try, файл открывается и выводится сообщение о том, что файл открыт.

    • Ключевое слово yield используется для передачи управления обратно в блок with, где файл будет использоваться.

    • В блоке finally, файл закрывается и выводится сообщение о том, что файл закрыт.

  3. Использование контекстного менеджера: В этом блоке with, файл example.txt открывается в режиме записи ('w'), и в него записывается строка 'Привет, ровер Контакт!'.

  4. Чтение данных из файла: В этом блоке with, файл example.txt открывается в режиме чтения ('r'), и данные из файла считываются и выводятся на экран.

Итоговый вывод:

Файл example.txt открыт.
Данные записаны в файл.
Файл example.txt закрыт.
Файл example.txt открыт.
Прочитанные данные: Привет, ровер Контакт!
Файл example.txt закрыт.

Процесс запуска кода:

  1. Откройте VS Code:

  • при открытии VS Code повторно открывается ваша папка. Если он у вас уже открыт, то переходите к следующему пункту.

Начальное окно теперь выглядит так
  1. Создайте новый файл .py:

  • создайте в окне иерархии новый .py файл и назовите его my_contextlib.py

ПРИМЕЧАНИЕ: Если назвать файл contextlib.py, python будет ругаться на несоответствие файловой конфигурации!

Новый файл my_contextlib.py
  1. Вставьте ваш код в окно программирования:

Окно иерархии и программирования
  1. Обязательно сохраните файл комбинацией ctrl + S или через file - save.

  2. Скачайте необходимые зависимости: в данном случае придется докачать библиотеку, так как она не идет стандартом в Python. Установка идет следующим образом:\

  • Откройте терминал: если терминал у вас не открыт, перейдите в Terminal - New terminal. Так же можно запустить через горячие клавиши Ctrl + Shift + `

    Запуск терминала

  • Пропишите в консоль pip install contextlib2и нажмите Enter, у вас запустится загрузка библиотеки

Установка библиотеки contextlib
  1. Запустите код:

Запуск кода происходит через эту кнопку

В консоль выведется следующая информация:

Информация о записи файла в example.txt

Так же в иерархии добавится файл example.txt, содержащий в себе текст, который мы передали ему.

Содержимое текстового файла example.txt

socket:

  • Описание: socket используется в ровере для работы с сетевыми соединениями.

  • Особенности:

    • Обеспечивает низкоуровневый интерфейс для работы с сетевыми сокетами.

  • Роль в проекте:

    • Создание серверных и клиентских сокетов для обмена данными с ровером через локальную сеть.

  • Пример кода с использованием библиотеки socket:

Код сервера:

import socket

def start_server():
    # Создаем сокет
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    # Привязываем сокет к адресу и порту
    server_address = ('localhost', 10000)
    server_socket.bind(server_address)
    
    # Слушаем входящие соединения
    server_socket.listen(1)
    print("Сервер запущен и слушает на порту 10000...")
    
    while True:
        # Ждем соединения
        print("Ожидание соединения...")
        client_socket, client_address = server_socket.accept()
        print(f"Соединение установлено с {client_address}")
        
        try:
            # Получаем данные от клиента
            data = client_socket.recv(1024)
            print(f"Получено сообщение: {data.decode()}")
            
            # Отправляем ответ клиенту
            client_socket.sendall("Привет, ровер Контакт!")
        finally:
            # Закрываем соединение
            client_socket.close()
            print(f"Соединение с {client_address} закрыто")

if __name__ == "__main__":
    start_server()

Описание кода сервера:

  • Создает сокет с использованием socket.AF_INET (для IPv4) и socket.SOCK_STREAM (для TCP).

  • Привязывает сокет к адресу localhost и порту 10000.

  • Слушает входящие соединения с помощью метода listen.

  • Принимает соединение с помощью метода accept.

  • Получает данные от клиента и отправляет обратно ответ.

  • Закрывает соединение после обработки.

Код клиента:

import socket

def start_client():
    # Создаем сокет
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    # Подключаемся к серверу
    server_address = ('localhost', 10000)
    client_socket.connect(server_address)
    
    try:
        # Отправляем сообщение серверу
        message = "Привет, сервер!"
        client_socket.sendall(message.encode())
        
        # Получаем ответ от сервера
        data = client_socket.recv(1024)
        print(f"Получено сообщение от сервера: {data.decode()}")
    finally:
        # Закрываем соединение
        client_socket.close()

if __name__ == "__main__":
    start_client()

Описание кода клиента:

  • Создает сокет с использованием socket.AF_INET (для IPv4) и socket.SOCK_STREAM (для TCP).

  • Подключается к серверу по адресу localhost и порту 10000.

  • Отправляет сообщение серверу.

  • Получает ответ от сервера и выводит его на экран.

  • Закрывает соединение после обработки.

Как запустить:

  1. Добавьте код сервера и клиента в Visual Studio Code

  2. Запустите код сервера

  3. Запустите код клиента

  4. Если все правильно запущено, в консоль сервера начнут выводиться следующие данные:

Сервер запущен и слушает на порту 10000...
Ожидание соединения...
Соединение установлено с ('localhost', 52346)
Получено сообщение: Привет, сервер!
Соединение с ('127.0.0.1', 52346) закрыто

в консоль клиента:

Получено сообщение от сервера: Привет, ровер Контакт!

Процесс запуска кода:

  1. Создайте два файла: server.py и client.py:

Окно иерархии сейчас
  1. Вставьте соответвующие кода в нужные файлы:

  • Файл клиент:

Файл клиент с кодом под клиента
  • Файл сервер:

файл сервер с кодом сервера
  1. Запустите код сервера: в данном случае нет необходимости качать дополнительные библиотеки, поэтому просто запускаем код сервера, в консоль выведется следующее:

В консоли выводится информация об ожидании подключения клиента
  1. Запустите код клиента: после запуска кода сервера нужно подключить клиент. Для этого перейдите в файл client.py и запустите код с помощью режима откладки:

Запуск кода с помощью режима откладки
  1. Осмотрите консоли клиента и сервера: в клиент было отправлено сообщение "Получено сообщение от сервера":

Консоль файла клиента

В сервер было отправлено следующее выражение:

Консоль файла сервера

ПРИМЕЧАНИЕ: Ошибки, которые вывелись в консоль - это более подробное описания процесса закрытия соединения с клиентом, так что не пугайтесь этих ошибок!

threading:

  • Описание: threading используется в ровере для работы с потоками.

  • Особенности:

    • Позволяет выполнять задачи параллельно в разных потоках.

    • Поддерживает блокировки и события для синхронизации потоков.

  • Роль в проекте:

    • Управление потоками для выполнения фоновых задач, таких как обработка видеопотока.

  • пример кода с использованием библиотеки threading:

import threading
import time

def move():
    print("Ровер начинает движение.")
    time.sleep(2)
    print("Ровер продолжает движение.")
    time.sleep(2)
    print("Ровер останавливается.")

def blink_lights():
    for _ in range(5):
        print("Фары мигают.")
        time.sleep(1)

# Создаем потоки
move_thread = threading.Thread(target=move)
blink_lights_thread = threading.Thread(target=blink_lights)

# Запускаем потоки
move_thread.start()
blink_lights_thread.start()

# Ждем завершения потоков
move_thread.join()
blink_lights_thread.join()

print("Все задачи завершены.")

Описание кода:

  1. Функции move и blink_lights:

    • move: Имитирует движение ровера, выводя сообщения о начале, продолжении и остановке движения. Использует time.sleep для задержек.

    • blink_lights: Имитирует мигание фар ровера, выводя сообщения о мигании фар и используя time.sleep для задержек.

  2. Создание потоков:

    • move_thread и blink_lights_thread: Создаются объекты потоков, которые будут выполнять функции move и blink_lights соответственно.

  3. Запуск потоков:

    • move_thread.start() и blink_lights_thread.start(): Запускают потоки, чтобы они начали выполнение своих функций параллельно.

  4. Ожидание завершения потоков:

    • move_thread.join() и blink_lights_thread.join(): Обеспечивают ожидание завершения выполнения потоков, чтобы основной поток не завершился раньше, чем потоки выполнят свои задачи.

  5. Сообщение о завершении:

    • print("Все задачи завершены."): Выводит сообщение после того, как все потоки завершили свою работу.

Процесс запуска кода:

  1. Создайте файл .py и назовите его threadinglib.py:

Окно иерархии с новым файлом threadinglib.py
  1. Вставьте код в окно программирования:

Окно программирования после добавления кода
  1. Запустите код: в данном случае устанавливать ничего не нужно. Просто запускаем код и смотрим на результат:

aiofiles:

  • Описание: aiofiles используется в ровере для асинхронной работы с файловой системой.

  • Особенности:

    • Обеспечивает асинхронное чтение и запись файлов.

    • Поддерживает интерфейс, аналогичный стандартной библиотеке open.

  • Роль в проекте:

    • Асинхронное чтение файлов, содержащих данные карты ArUco.

  • Пример кода с использованием библиотеки aiofiles:

```python
import asyncio
import aiofiles

async def log_data(data):
    async with aiofiles.open('rover_log.txt', mode='a') as file:
        await file.write(data + '\n')
        print(f"Записано: {data}")

async def main():
    tasks = [
        log_data("The rover started moving."),
        log_data("The rover continues to move."),
        log_data("The rover stopped."),
        log_data("The rover started flashing its headlights."),
        log_data("The rover has finished flashing its headlights.")
    ]
    await asyncio.gather(*tasks)

# Запуск событийного цикла
asyncio.run(main())
```

Описание кода:

  1. Функция log_data:

    • Принимает строку data и асинхронно записывает её в файл rover_log.txt.

    • Использует aiofiles.open для асинхронного открытия файла в режиме добавления (mode='a').

    • await file.write(data + '\n') асинхронно записывает данные в файл и добавляет перенос строки.

    • Выводит сообщение о записанных данных.

  2. Функция main:

    • Создает список задач tasks, каждая из которых представляет собой вызов функции log_data с различными сообщениями.

    • Использует asyncio.gather для одновременного выполнения всех задач.

  3. Запуск событийного цикла:

    • asyncio.run(main()) запускает асинхронную функцию main и управляет событийным циклом.

    Обратите внимание: в данном коде так же используется библиотека asyncio! Она описана выше!

Итоговый вывод:

Записано: Ровер начал движение.
Записано: Ровер продолжает движение.
Записано: Ровер остановился.
Записано: Ровер начал мигать фарами.
Записано: Ровер завершил мигание фарами.

Процесс запуска кода:

  1. Создайте файл aiofiles.py:

Окно иерархии

Пока что файл подсвечен желтым, он подсвечивается как только вы добавляете код в файл. Это потому, что библиотека не установлена. Если вас смущает то, что после установки библиотеки файл до сих пор помечен желтым, можете перезагрузить Visual Studio Code и проблема решится.

  1. Добавьте код в файл:

Окно программирования после добавленного кода
  1. Установите необходимые зависимости: в данном случае библиотеку aiofiles надо докачать. Открываем терминал и прописываем туда следующую команду: pip install aiofiles

Команда установки библиотеки и процесс скачивания
  1. Запустите код: после скачивания библиотеки можно приступать к тестированию кода. Запустите кода и посмотрите, что происходит:

вывод консоли

В консоль выводится информация о записанных данных. Так же в иерархии создался новый текстовый файл rover_log.txt

Текстовый файл rover_log.txt

logging:

  • Описание: loggingиспользуется в ровере для ведения логов.

  • Особенности:

    • Обеспечивает гибкую систему для создания логов различных уровней (информация, предупреждения, ошибки и т.д.).

    • Поддерживает вывод логов в консоль, файлы и другие обработчики.

  • Роль в проекте:

    • Ведение логов для отслеживания работы программы и отладки.

  • Пример кода с использованием библиотеки logging:

```python
import logging

# Настройка логгера
logging.basicConfig(
    filename='rover_log.log',
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# Создание логгера
logger = logging.getLogger('rover_logger')

# Примеры логирования сообщений
def log_rover_activity():
    logger.debug("The rover started moving.")
    logger.info("The rover continues to move.")
    logger.warning("The rover is approaching an obstacle.")
    logger.error("The rover encountered an obstacle.")
    logger.critical("The rover has lost contact with the user.")

# Вызов функции для логирования активности ровера
log_rover_activity()
```

Описание кода:

  1. Настройка логгера:

    • logging.basicConfig настраивает базовую конфигурацию логгера.

    • filename='rover_log.log' указывает, что логи будут записываться в файл rover_log.log.

    • level=logging.DEBUG устанавливает минимальный уровень логирования на DEBUG, что означает, что все сообщения от DEBUG и выше будут записаны.

    • format='%(asctime)s - %(levelname)s - %(message)s' задает формат записи логов, включая время, уровень сообщения и само сообщение.

    • datefmt='%Y-%m-%d %H:%M:%S' задает формат времени.

  2. Создание логгера:

    • logger = logging.getLogger('rover_logger') создает экземпляр логгера с именем rover_logger.

  3. Примеры логирования сообщений:

    • logger.debug, logger.info, logger.warning, logger.error, logger.critical используются для логирования сообщений различных уровней важности.

    • Вызов функции для логирования активности ровера:

  4. log_rover_activity() вызывает функцию, которая логирует различные сообщения от ровера.

Итоговый вывод:

В консоль данных не выводится. Вместо этого данные записываются в лог

2023-10-05 14:30:00 - DEBUG - Ровер начал движение.
2023-10-05 14:30:00 - INFO - Ровер продолжает движение.
2023-10-05 14:30:00 - WARNING - Ровер приближается к препятствию.
2023-10-05 14:30:00 - ERROR - Ровер столкнулся с препятствием.
2023-10-05 14:30:00 - CRITICAL - Ровер потерял связь с пользователем.

Процесс запуска кода:

  1. Создайте файл с названием logginglib.py:

Окно иерархии с новым файлом logginglib.py
  1. Добавьте код в окно программирования:

Окно программирования файла logginglib.py
  1. Запустите код: здесь не нужно докачивать библиотеки, поэтому просто запускаем код:

Результат выполнения кода в консоли

В консоль ничего не вывелось, однако в иерархии добавился файл rover_log.log:

Файл rover_log,py

queue:

  • Описание: queue используется в ровере для работы с очередями.

  • Особенности:

    • Поддерживает различные типы очередей (FIFO, LIFO, приоритетные).

    • Безопасна для использования в многопоточных программах.

  • Роль в проекте:

    • Хранение кадров видеопотока для дальнейшей обработки.

  • Пример кода с использованием библиотеки logging:

import queue
import threading
import time

# Создание очереди
message_queue = queue.Queue()

# Функция для добавления сообщений в очередь
def producer():
    for i in range(10):
        message = f"Сообщение {i}"
        message_queue.put(message)
        print(f"Ровер отправил: {message}")
        time.sleep(1)

# Функция для обработки сообщений из очереди
def consumer():
    while True:
        message = message_queue.get()
        if message is None:
            break
        print(f"Пользователь обработал: {message}")
        message_queue.task_done()

# Создание и запуск потоков
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

# Ожидание завершения работы производителя
producer_thread.join()

# Добавление сигнала о завершении для потребителя
message_queue.put(None)
consumer_thread.join()

print("Работа завершена.")

Описание кода:

  1. Создание очереди:

    • message_queue = queue.Queue() создает очередь для хранения сообщений.

  2. Функция для добавления сообщений в очередь:

    • producer функция добавляет 10 сообщений в очередь с интервалом в 1 секунду.

  3. Функция для обработки сообщений из очереди:

    • consumer функция бесконечно обрабатывает сообщения из очереди, пока не получит сигнал о завершении (None).

  4. Создание и запуск потоков:

    • producer_thread и consumer_thread создаются и запускаются для параллельной работы.

  5. Ожидание завершения работы производителя:

    • producer_thread.join() ожидает завершения работы потока производителя.

  6. Добавление сигнала о завершении для потребителя:

    • message_queue.put(None) добавляет сигнал о завершении для потребителя.

  7. Ожидание завершения работы потребителя:

    • consumer_thread.join() ожидает завершения работы потока потребителя.

Итоговый вывод:

Ровер отправил: Сообщение 0
Пользователь обработал: Сообщение 0
Ровер отправил: Сообщение 1
Пользователь обработал: Сообщение 1
Ровер отправил: Сообщение 2
Пользователь обработал: Сообщение 2
Ровер отправил: Сообщение 3
Пользователь обработал: Сообщение 3
Ровер отправил: Сообщение 4
Пользователь обработал: Сообщение 4
Ровер отправил: Сообщение 5
Пользователь обработал: Сообщение 5
Ровер отправил: Сообщение 6
Пользователь обработал: Сообщение 6
Ровер отправил: Сообщение 7
Пользователь обработал: Сообщение 7
Ровер отправил: Сообщение 8
Пользователь обработал: Сообщение 8
Ровер отправил: Сообщение 9
Пользователь обработал: Сообщение 9
Работа завершена.

Процесс запуска кода:

  1. Создайте файл queuelib.py:

Окно иерархии с новым файлом queuelib.py
  1. Добавьте в окно программирования код:

Окно программирования после добавления кода
  1. Запустите код: в данном случае ничего устанавливать не нужно, поэтому просто запускаем код:

Вывод в консоли после запуска.

В итоге наше окно иерархии выглядит так, в нем есть различные файлы и это далеко не всё, что можно сделать с этими библиотеками!

Конечный вид окна иерархии

Last updated