Основные библиотеки Python, используемые в ровере
На этой странице представлены основные использующиеся библиотеке в коде под ровер "Контакт". Больше библиотек прописано в описаниях кодов. Рекомендуется для проверки кодов на этой странице создать отдельную папку.

ПРИМЕЧАНИЕ: Библиотеки, которые вы используете в VS Code, необходимо так же докачивать на Orange Pi (подробнее об установке Python и pip смотрите в разделе "Работа с Orange Pi" в статьях "Установка Python 3.10" и "Установка pip"). Команды установки библиотек в VS Code и в Orange Pi не отличаются.
asyncio:
- Описание: - asyncioиспользуется в коде ровера для написания асинхронных программ, используя корутины, задачи и событийные циклы.
- Особенности: - Позволяет выполнять несколько задач ровера одновременно, не блокируя основной поток выполнения. 
- Используется для асинхронного ввода-вывода, что повышает производительность при работе с сетью. 
 
- Роль в проекте: - Асинхронная обработка соединений с клиентами через сокеты. 
- Асинхронное чтение файлов с помощью - aiofiles.
- Управление событиями и задачами в рамках основного цикла событий. 
 
- Пример кода с использование библиотеки asyncio: 
import asyncio
async def move():
    print("Ровер начинает движение.")
    await asyncio.sleep(2)
    print("Ровер продолжает движение.")
    await asyncio.sleep(2)
    print("Ровер останавливается.")
async def turn(direction):
    print(f"Ровер начинает поворот {direction}.")
    await asyncio.sleep(2)
    print(f"Ровер завершает поворот {direction}.")
async def blink_lights():
    for _ in range(5):
        print("Фары мигают.")
        await asyncio.sleep(1)
async def main():
    await asyncio.gather(
        move(),
        turn("налево"),
        blink_lights()
    )
# Запуск событийного цикла
asyncio.run(main())
Описание кода:
- Функция move Имитирует движение ровера, выводя сообщения о начале, продолжении и остановке движения с 2-секундными интервалами. 
- Функция turn Имитирует поворот ровера в заданном направлении (налево или направо), выводя сообщения о начале и завершении поворота с 2-секундными интервалами. 
- Функция blink_lights Имитирует мигание фар ровера, выводя сообщение о мигании каждую секунду в течение 5 секунд. 
- Функция main: Функция - mainиспользует- asyncio.gatherдля одновременного запуска всех трех асинхронных задач (- move(),- turn("налево"),- blink_lights()).Функция- mainиспользует- asyncio.gatherдля одновременного запуска всех трех асинхронных задач (- move(),- turn("налево"),- blink_lights()).
- asyncio.run(main()) - запуск кода 
Итоговый вывод:
Ровер начинает движение.
Ровер начинает поворот налево.
Фары мигают.
Фары мигают.
Ровер продолжает движение.
Фары мигают.
Ровер завершает поворот налево.
Фары мигают.
Ровер останавливается.
Фары мигают.
Процесс запуска кода:
- Откройте VS Code: - Запустите Visual Studio Code на вашем компьютере. 
  - Первое окно при запуске Visual Studio Code 
- Откройте папку с проектом: - В VS Code выберите - File->- Open Folder...и выберите папку, которую вы создали.
  - Выбор действия во вкладе File  - Вот так выглядит окно иерархии сейчас 
- Создайте файл с кодом: - Cоздайте новый файл с расширением - .py, назовите его- asynciolib.py, и вставьте в него предоставленный код.
 


ПРИМЕЧАНИЕ: Когда в списке действий в иерархии вы нажимаете New File, изначально создается внутренняя папка, где в дальнейшем будут храниться наши файлы. Назовите ее как хотите и после, наведя курсор на папку, создайте файл таким же образом. Так же не забывайте в конце названия файла прописывать .py - без типа файла VS Code автоматически создает txt файл.
- Установите необходимые зависимости: 
- В данном случае библиотека предустановлена, так что в окно программирования вставляем наш код 

- Сохранение файла: перед запуском кода всегда сохраняйте свой файл комбинацией - ctrl + Sили через- file - save.
- Запустите ваш код: в правом верхнем углу есть значок запуска кода. Нажав на него, у вас автоматически вылезет терминал, в который и будет выводиться результат. ВАЖНО: когда вы запустите код впервые, у вас попросят выбрать версию 


contextlib:
- Описание: - contextlibиспользуется в ровере для работы с контекстными менеджерами.
- Особенности: - Облегчает создание и использование контекстных менеджеров. 
- Содержит полезные утилиты, такие как - suppress, для обработки исключений.
 
- Роль в проекте: - Управление контекстами и исключениями при выполнении задач, что позволяет обеспечить безопасное выполнение кода. 
 
- Пример кода с работой библиотеки contextlib: 
import contextlib
@contextlib.contextmanager
def open_file(filename, mode):
    try:
        f = open(filename, mode)
        print(f"Файл {filename} открыт.")
        yield f
    finally:
        f.close()
        print(f"Файл {filename} закрыт.")
# Использование контекстного менеджера
with open_file('example.txt', 'w') as file:
    file.write('Hello, rover Contact!')
    print("Данные записаны в файл.")
# Чтение данных из файла
with open_file('example.txt', 'r') as file:
    data = file.read()
    print(f"Прочитанные данные: {data}")
Описание кода:
- @contextlib.contextmanager - Создание контекстного менеджера 
- Функция open_file: - Функция open_file принимает два аргумента: filename (имя файла) и mode(режим открытия файла). 
- В блоке try, файл открывается и выводится сообщение о том, что файл открыт. 
- Ключевое слово yield используется для передачи управления обратно в блок with, где файл будет использоваться. 
- В блоке finally, файл закрывается и выводится сообщение о том, что файл закрыт. 
 
- Использование контекстного менеджера: В этом блоке with, файл example.txt открывается в режиме записи ('w'), и в него записывается строка 'Привет, ровер Контакт!'. 
- Чтение данных из файла: В этом блоке with, файл example.txt открывается в режиме чтения ('r'), и данные из файла считываются и выводятся на экран. 
Итоговый вывод:
Файл example.txt открыт.
Данные записаны в файл.
Файл example.txt закрыт.
Файл example.txt открыт.
Прочитанные данные: Привет, ровер Контакт!
Файл example.txt закрыт.
Процесс запуска кода:
- Откройте VS Code: 
- при открытии VS Code повторно открывается ваша папка. Если он у вас уже открыт, то переходите к следующему пункту. 

- Создайте новый файл .py: 
- создайте в окне иерархии новый .py файл и назовите его my_contextlib.py 
ПРИМЕЧАНИЕ: Если назвать файл contextlib.py, python будет ругаться на несоответствие файловой конфигурации!

- Вставьте ваш код в окно программирования: 

- Обязательно сохраните файл комбинацией - ctrl + Sили через- file - save.
- Скачайте необходимые зависимости: в данном случае придется докачать библиотеку, так как она не идет стандартом в Python. Установка идет следующим образом:\ 
- Откройте терминал: если терминал у вас не открыт, перейдите в - Terminal - New terminal. Так же можно запустить через горячие клавиши- Ctrl + Shift + ` - Запуск терминала 
- Пропишите в консоль - pip install contextlib2и нажмите Enter, у вас запустится загрузка библиотеки

- Запустите код: 

В консоль выведется следующая информация:

Так же в иерархии добавится файл example.txt, содержащий в себе текст, который мы передали ему.

socket:
- Описание: - socketиспользуется в ровере для работы с сетевыми соединениями.
- Особенности: - Обеспечивает низкоуровневый интерфейс для работы с сетевыми сокетами. 
 
- Роль в проекте: - Создание серверных и клиентских сокетов для обмена данными с ровером через локальную сеть. 
 
- Пример кода с использованием библиотеки socket: 
Код сервера:
import socket
def start_server():
    # Создаем сокет
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    # Привязываем сокет к адресу и порту
    server_address = ('localhost', 10000)
    server_socket.bind(server_address)
    
    # Слушаем входящие соединения
    server_socket.listen(1)
    print("Сервер запущен и слушает на порту 10000...")
    
    while True:
        # Ждем соединения
        print("Ожидание соединения...")
        client_socket, client_address = server_socket.accept()
        print(f"Соединение установлено с {client_address}")
        
        try:
            # Получаем данные от клиента
            data = client_socket.recv(1024)
            print(f"Получено сообщение: {data.decode()}")
            
            # Отправляем ответ клиенту
            client_socket.sendall("Привет, ровер Контакт!")
        finally:
            # Закрываем соединение
            client_socket.close()
            print(f"Соединение с {client_address} закрыто")
if __name__ == "__main__":
    start_server()
Описание кода сервера:
- Создает сокет с использованием - socket.AF_INET(для IPv4) и- socket.SOCK_STREAM(для TCP).
- Привязывает сокет к адресу - localhostи порту- 10000.
- Слушает входящие соединения с помощью метода - listen.
- Принимает соединение с помощью метода - accept.
- Получает данные от клиента и отправляет обратно ответ. 
- Закрывает соединение после обработки. 
Код клиента:
import socket
def start_client():
    # Создаем сокет
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    # Подключаемся к серверу
    server_address = ('localhost', 10000)
    client_socket.connect(server_address)
    
    try:
        # Отправляем сообщение серверу
        message = "Привет, сервер!"
        client_socket.sendall(message.encode())
        
        # Получаем ответ от сервера
        data = client_socket.recv(1024)
        print(f"Получено сообщение от сервера: {data.decode()}")
    finally:
        # Закрываем соединение
        client_socket.close()
if __name__ == "__main__":
    start_client()
Описание кода клиента:
- Создает сокет с использованием - socket.AF_INET(для IPv4) и- socket.SOCK_STREAM(для TCP).
- Подключается к серверу по адресу - localhostи порту- 10000.
- Отправляет сообщение серверу. 
- Получает ответ от сервера и выводит его на экран. 
- Закрывает соединение после обработки. 
Как запустить:
- Добавьте код сервера и клиента в Visual Studio Code 
- Запустите код сервера 
- Запустите код клиента 
- Если все правильно запущено, в консоль сервера начнут выводиться следующие данные: 
Сервер запущен и слушает на порту 10000...
Ожидание соединения...
Соединение установлено с ('localhost', 52346)
Получено сообщение: Привет, сервер!
Соединение с ('127.0.0.1', 52346) закрытов консоль клиента:
Получено сообщение от сервера: Привет, ровер Контакт!Процесс запуска кода:
- Создайте два файла: server.py и client.py: 

- Вставьте соответвующие кода в нужные файлы: 
- Файл клиент: 

- Файл сервер: 

- Запустите код сервера: в данном случае нет необходимости качать дополнительные библиотеки, поэтому просто запускаем код сервера, в консоль выведется следующее: 

- Запустите код клиента: после запуска кода сервера нужно подключить клиент. Для этого перейдите в файл client.py и запустите код с помощью режима откладки: 

- Осмотрите консоли клиента и сервера: в клиент было отправлено сообщение "Получено сообщение от сервера": 

В сервер было отправлено следующее выражение:

ПРИМЕЧАНИЕ: Ошибки, которые вывелись в консоль - это более подробное описания процесса закрытия соединения с клиентом, так что не пугайтесь этих ошибок!
threading:
- Описание: - threadingиспользуется в ровере для работы с потоками.
- Особенности: - Позволяет выполнять задачи параллельно в разных потоках. 
- Поддерживает блокировки и события для синхронизации потоков. 
 
- Роль в проекте: - Управление потоками для выполнения фоновых задач, таких как обработка видеопотока. 
 
- пример кода с использованием библиотеки threading: 
import threading
import time
def move():
    print("Ровер начинает движение.")
    time.sleep(2)
    print("Ровер продолжает движение.")
    time.sleep(2)
    print("Ровер останавливается.")
def blink_lights():
    for _ in range(5):
        print("Фары мигают.")
        time.sleep(1)
# Создаем потоки
move_thread = threading.Thread(target=move)
blink_lights_thread = threading.Thread(target=blink_lights)
# Запускаем потоки
move_thread.start()
blink_lights_thread.start()
# Ждем завершения потоков
move_thread.join()
blink_lights_thread.join()
print("Все задачи завершены.")Описание кода:
- Функции - moveи- blink_lights:- move: Имитирует движение ровера, выводя сообщения о начале, продолжении и остановке движения. Использует- time.sleepдля задержек.
- blink_lights: Имитирует мигание фар ровера, выводя сообщения о мигании фар и используя- time.sleepдля задержек.
 
- Создание потоков: - move_threadи- blink_lights_thread: Создаются объекты потоков, которые будут выполнять функции- moveи- blink_lightsсоответственно.
 
- Запуск потоков: - move_thread.start()и- blink_lights_thread.start(): Запускают потоки, чтобы они начали выполнение своих функций параллельно.
 
- Ожидание завершения потоков: - move_thread.join()и- blink_lights_thread.join(): Обеспечивают ожидание завершения выполнения потоков, чтобы основной поток не завершился раньше, чем потоки выполнят свои задачи.
 
- Сообщение о завершении: - print("Все задачи завершены."): Выводит сообщение после того, как все потоки завершили свою работу.
 
Процесс запуска кода:
- Создайте файл .py и назовите его threadinglib.py: 

- Вставьте код в окно программирования: 

- Запустите код: в данном случае устанавливать ничего не нужно. Просто запускаем код и смотрим на результат: 

aiofiles:
- Описание: - aiofilesиспользуется в ровере для асинхронной работы с файловой системой.
- Особенности: - Обеспечивает асинхронное чтение и запись файлов. 
- Поддерживает интерфейс, аналогичный стандартной библиотеке - open.
 
- Роль в проекте: - Асинхронное чтение файлов, содержащих данные карты ArUco. 
 
- Пример кода с использованием библиотеки aiofiles: 
```python
import asyncio
import aiofiles
async def log_data(data):
    async with aiofiles.open('rover_log.txt', mode='a') as file:
        await file.write(data + '\n')
        print(f"Записано: {data}")
async def main():
    tasks = [
        log_data("The rover started moving."),
        log_data("The rover continues to move."),
        log_data("The rover stopped."),
        log_data("The rover started flashing its headlights."),
        log_data("The rover has finished flashing its headlights.")
    ]
    await asyncio.gather(*tasks)
# Запуск событийного цикла
asyncio.run(main())
```
Описание кода:
- Функция - log_data:- Принимает строку - dataи асинхронно записывает её в файл- rover_log.txt.
- Использует - aiofiles.openдля асинхронного открытия файла в режиме добавления (- mode='a').
- await file.write(data + '\n')асинхронно записывает данные в файл и добавляет перенос строки.
- Выводит сообщение о записанных данных. 
 
- Функция - main:- Создает список задач - tasks, каждая из которых представляет собой вызов функции- log_dataс различными сообщениями.
- Использует - asyncio.gatherдля одновременного выполнения всех задач.
 
- Запуск событийного цикла: - asyncio.run(main())запускает асинхронную функцию- mainи управляет событийным циклом.
 - Обратите внимание: в данном коде так же используется библиотека asyncio! Она описана выше! 
Итоговый вывод:
Записано: Ровер начал движение.
Записано: Ровер продолжает движение.
Записано: Ровер остановился.
Записано: Ровер начал мигать фарами.
Записано: Ровер завершил мигание фарами.Процесс запуска кода:
- Создайте файл aiofiles.py: 

Пока что файл подсвечен желтым, он подсвечивается как только вы добавляете код в файл. Это потому, что библиотека не установлена. Если вас смущает то, что после установки библиотеки файл до сих пор помечен желтым, можете перезагрузить Visual Studio Code и проблема решится.
- Добавьте код в файл: 

- Установите необходимые зависимости: в данном случае библиотеку aiofiles надо докачать. Открываем терминал и прописываем туда следующую команду: - pip install aiofiles

- Запустите код: после скачивания библиотеки можно приступать к тестированию кода. Запустите кода и посмотрите, что происходит: 

В консоль выводится информация о записанных данных. Так же в иерархии создался новый текстовый файл rover_log.txt

logging:
- Описание: - loggingиспользуется в ровере для ведения логов.
- Особенности: - Обеспечивает гибкую систему для создания логов различных уровней (информация, предупреждения, ошибки и т.д.). 
- Поддерживает вывод логов в консоль, файлы и другие обработчики. 
 
- Роль в проекте: - Ведение логов для отслеживания работы программы и отладки. 
 
- Пример кода с использованием библиотеки logging: 
```python
import logging
# Настройка логгера
logging.basicConfig(
    filename='rover_log.log',
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
# Создание логгера
logger = logging.getLogger('rover_logger')
# Примеры логирования сообщений
def log_rover_activity():
    logger.debug("The rover started moving.")
    logger.info("The rover continues to move.")
    logger.warning("The rover is approaching an obstacle.")
    logger.error("The rover encountered an obstacle.")
    logger.critical("The rover has lost contact with the user.")
# Вызов функции для логирования активности ровера
log_rover_activity()
```
Описание кода:
- Настройка логгера: - logging.basicConfigнастраивает базовую конфигурацию логгера.
- filename='rover_log.log'указывает, что логи будут записываться в файл- rover_log.log.
- level=logging.DEBUGустанавливает минимальный уровень логирования на- DEBUG, что означает, что все сообщения от- DEBUGи выше будут записаны.
- format='%(asctime)s - %(levelname)s - %(message)s'задает формат записи логов, включая время, уровень сообщения и само сообщение.
- datefmt='%Y-%m-%d %H:%M:%S'задает формат времени.
 
- Создание логгера: - logger = logging.getLogger('rover_logger')создает экземпляр логгера с именем- rover_logger.
 
- Примеры логирования сообщений: - logger.debug,- logger.info,- logger.warning,- logger.error,- logger.criticalиспользуются для логирования сообщений различных уровней важности.
- Вызов функции для логирования активности ровера: 
 
- log_rover_activity()вызывает функцию, которая логирует различные сообщения от ровера.
Итоговый вывод:
В консоль данных не выводится. Вместо этого данные записываются в лог
2023-10-05 14:30:00 - DEBUG - Ровер начал движение.
2023-10-05 14:30:00 - INFO - Ровер продолжает движение.
2023-10-05 14:30:00 - WARNING - Ровер приближается к препятствию.
2023-10-05 14:30:00 - ERROR - Ровер столкнулся с препятствием.
2023-10-05 14:30:00 - CRITICAL - Ровер потерял связь с пользователем.Процесс запуска кода:
- Создайте файл с названием logginglib.py: 

- Добавьте код в окно программирования: 

- Запустите код: здесь не нужно докачивать библиотеки, поэтому просто запускаем код: 

В консоль ничего не вывелось, однако в иерархии добавился файл rover_log.log:

queue:
- Описание: - queueиспользуется в ровере для работы с очередями.
- Особенности: - Поддерживает различные типы очередей (FIFO, LIFO, приоритетные). 
- Безопасна для использования в многопоточных программах. 
 
- Роль в проекте: - Хранение кадров видеопотока для дальнейшей обработки. 
 
- Пример кода с использованием библиотеки logging: 
import queue
import threading
import time
# Создание очереди
message_queue = queue.Queue()
# Функция для добавления сообщений в очередь
def producer():
    for i in range(10):
        message = f"Сообщение {i}"
        message_queue.put(message)
        print(f"Ровер отправил: {message}")
        time.sleep(1)
# Функция для обработки сообщений из очереди
def consumer():
    while True:
        message = message_queue.get()
        if message is None:
            break
        print(f"Пользователь обработал: {message}")
        message_queue.task_done()
# Создание и запуск потоков
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
# Ожидание завершения работы производителя
producer_thread.join()
# Добавление сигнала о завершении для потребителя
message_queue.put(None)
consumer_thread.join()
print("Работа завершена.")
Описание кода:
- Создание очереди: - message_queue = queue.Queue()создает очередь для хранения сообщений.
 
- Функция для добавления сообщений в очередь: - producerфункция добавляет 10 сообщений в очередь с интервалом в 1 секунду.
 
- Функция для обработки сообщений из очереди: - consumerфункция бесконечно обрабатывает сообщения из очереди, пока не получит сигнал о завершении (- None).
 
- Создание и запуск потоков: - producer_threadи- consumer_threadсоздаются и запускаются для параллельной работы.
 
- Ожидание завершения работы производителя: - producer_thread.join()ожидает завершения работы потока производителя.
 
- Добавление сигнала о завершении для потребителя: - message_queue.put(None)добавляет сигнал о завершении для потребителя.
 
- Ожидание завершения работы потребителя: - consumer_thread.join()ожидает завершения работы потока потребителя.
 
Итоговый вывод:
Ровер отправил: Сообщение 0
Пользователь обработал: Сообщение 0
Ровер отправил: Сообщение 1
Пользователь обработал: Сообщение 1
Ровер отправил: Сообщение 2
Пользователь обработал: Сообщение 2
Ровер отправил: Сообщение 3
Пользователь обработал: Сообщение 3
Ровер отправил: Сообщение 4
Пользователь обработал: Сообщение 4
Ровер отправил: Сообщение 5
Пользователь обработал: Сообщение 5
Ровер отправил: Сообщение 6
Пользователь обработал: Сообщение 6
Ровер отправил: Сообщение 7
Пользователь обработал: Сообщение 7
Ровер отправил: Сообщение 8
Пользователь обработал: Сообщение 8
Ровер отправил: Сообщение 9
Пользователь обработал: Сообщение 9
Работа завершена.
Процесс запуска кода:
- Создайте файл queuelib.py: 

- Добавьте в окно программирования код: 

- Запустите код: в данном случае ничего устанавливать не нужно, поэтому просто запускаем код: 

В итоге наше окно иерархии выглядит так, в нем есть различные файлы и это далеко не всё, что можно сделать с этими библиотеками!

Последнее обновление
