Основные библиотеки Python, используемые в ровере
На этой странице представлены основные использующиеся библиотеке в коде под ровер "Контакт". Больше библиотек прописано в описаниях кодов. Рекомендуется для проверки кодов на этой странице создать отдельную папку.

ПРИМЕЧАНИЕ: Библиотеки, которые вы используете в VS Code, необходимо так же докачивать на Orange Pi (подробнее об установке Python и pip смотрите в разделе "Работа с Orange Pi" в статьях "Установка Python 3.10" и "Установка pip"). Команды установки библиотек в VS Code и в Orange Pi не отличаются.
asyncio:
Описание:
asyncio
используется в коде ровера для написания асинхронных программ, используя корутины, задачи и событийные циклы.Особенности:
Позволяет выполнять несколько задач ровера одновременно, не блокируя основной поток выполнения.
Используется для асинхронного ввода-вывода, что повышает производительность при работе с сетью.
Роль в проекте:
Асинхронная обработка соединений с клиентами через сокеты.
Асинхронное чтение файлов с помощью
aiofiles
.Управление событиями и задачами в рамках основного цикла событий.
Пример кода с использование библиотеки asyncio:
import asyncio
async def move():
print("Ровер начинает движение.")
await asyncio.sleep(2)
print("Ровер продолжает движение.")
await asyncio.sleep(2)
print("Ровер останавливается.")
async def turn(direction):
print(f"Ровер начинает поворот {direction}.")
await asyncio.sleep(2)
print(f"Ровер завершает поворот {direction}.")
async def blink_lights():
for _ in range(5):
print("Фары мигают.")
await asyncio.sleep(1)
async def main():
await asyncio.gather(
move(),
turn("налево"),
blink_lights()
)
# Запуск событийного цикла
asyncio.run(main())
Описание кода:
Функция move Имитирует движение ровера, выводя сообщения о начале, продолжении и остановке движения с 2-секундными интервалами.
Функция turn Имитирует поворот ровера в заданном направлении (налево или направо), выводя сообщения о начале и завершении поворота с 2-секундными интервалами.
Функция blink_lights Имитирует мигание фар ровера, выводя сообщение о мигании каждую секунду в течение 5 секунд.
Функция main: Функция
main
используетasyncio.gather
для одновременного запуска всех трех асинхронных задач (move()
,turn("налево")
,blink_lights()
).Функцияmain
используетasyncio.gather
для одновременного запуска всех трех асинхронных задач (move()
,turn("налево")
,blink_lights()
).asyncio.run(main()) - запуск кода
Итоговый вывод:
Ровер начинает движение.
Ровер начинает поворот налево.
Фары мигают.
Фары мигают.
Ровер продолжает движение.
Фары мигают.
Ровер завершает поворот налево.
Фары мигают.
Ровер останавливается.
Фары мигают.
Процесс запуска кода:
Откройте VS Code:
Запустите Visual Studio Code на вашем компьютере.
Первое окно при запуске Visual Studio Code Откройте папку с проектом:
В VS Code выберите
File
->Open Folder...
и выберите папку, которую вы создали.
Выбор действия во вкладе File Вот так выглядит окно иерархии сейчас Создайте файл с кодом:
Cоздайте новый файл с расширением
.py
, назовите егоasynciolib.py
, и вставьте в него предоставленный код.


ПРИМЕЧАНИЕ: Когда в списке действий в иерархии вы нажимаете New File
, изначально создается внутренняя папка, где в дальнейшем будут храниться наши файлы. Назовите ее как хотите и после, наведя курсор на папку, создайте файл таким же образом. Так же не забывайте в конце названия файла прописывать .py
- без типа файла VS Code автоматически создает txt файл.
Установите необходимые зависимости:
В данном случае библиотека предустановлена, так что в окно программирования вставляем наш код

Сохранение файла: перед запуском кода всегда сохраняйте свой файл комбинацией
ctrl + S
или черезfile - save
.Запустите ваш код: в правом верхнем углу есть значок запуска кода. Нажав на него, у вас автоматически вылезет терминал, в который и будет выводиться результат. ВАЖНО: когда вы запустите код впервые, у вас попросят выбрать версию


contextlib:
Описание:
contextlib
используется в ровере для работы с контекстными менеджерами.Особенности:
Облегчает создание и использование контекстных менеджеров.
Содержит полезные утилиты, такие как
suppress
, для обработки исключений.
Роль в проекте:
Управление контекстами и исключениями при выполнении задач, что позволяет обеспечить безопасное выполнение кода.
Пример кода с работой библиотеки contextlib:
import contextlib
@contextlib.contextmanager
def open_file(filename, mode):
try:
f = open(filename, mode)
print(f"Файл {filename} открыт.")
yield f
finally:
f.close()
print(f"Файл {filename} закрыт.")
# Использование контекстного менеджера
with open_file('example.txt', 'w') as file:
file.write('Hello, rover Contact!')
print("Данные записаны в файл.")
# Чтение данных из файла
with open_file('example.txt', 'r') as file:
data = file.read()
print(f"Прочитанные данные: {data}")
Описание кода:
@contextlib.contextmanager - Создание контекстного менеджера
Функция open_file:
Функция open_file принимает два аргумента: filename (имя файла) и mode(режим открытия файла).
В блоке try, файл открывается и выводится сообщение о том, что файл открыт.
Ключевое слово yield используется для передачи управления обратно в блок with, где файл будет использоваться.
В блоке finally, файл закрывается и выводится сообщение о том, что файл закрыт.
Использование контекстного менеджера: В этом блоке with, файл example.txt открывается в режиме записи ('w'), и в него записывается строка 'Привет, ровер Контакт!'.
Чтение данных из файла: В этом блоке with, файл example.txt открывается в режиме чтения ('r'), и данные из файла считываются и выводятся на экран.
Итоговый вывод:
Файл example.txt открыт.
Данные записаны в файл.
Файл example.txt закрыт.
Файл example.txt открыт.
Прочитанные данные: Привет, ровер Контакт!
Файл example.txt закрыт.
Процесс запуска кода:
Откройте VS Code:
при открытии VS Code повторно открывается ваша папка. Если он у вас уже открыт, то переходите к следующему пункту.

Создайте новый файл .py:
создайте в окне иерархии новый .py файл и назовите его my_contextlib.py
ПРИМЕЧАНИЕ: Если назвать файл contextlib.py, python будет ругаться на несоответствие файловой конфигурации!

Вставьте ваш код в окно программирования:

Обязательно сохраните файл комбинацией
ctrl + S
или черезfile - save
.Скачайте необходимые зависимости: в данном случае придется докачать библиотеку, так как она не идет стандартом в Python. Установка идет следующим образом:\
Откройте терминал: если терминал у вас не открыт, перейдите в
Terminal - New terminal
. Так же можно запустить через горячие клавишиCtrl + Shift + `
Запуск терминала Пропишите в консоль
pip install contextlib2
и нажмите Enter, у вас запустится загрузка библиотеки

Запустите код:

В консоль выведется следующая информация:

Так же в иерархии добавится файл example.txt, содержащий в себе текст, который мы передали ему.

socket:
Описание:
socket
используется в ровере для работы с сетевыми соединениями.Особенности:
Обеспечивает низкоуровневый интерфейс для работы с сетевыми сокетами.
Роль в проекте:
Создание серверных и клиентских сокетов для обмена данными с ровером через локальную сеть.
Пример кода с использованием библиотеки socket:
Код сервера:
import socket
def start_server():
# Создаем сокет
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Привязываем сокет к адресу и порту
server_address = ('localhost', 10000)
server_socket.bind(server_address)
# Слушаем входящие соединения
server_socket.listen(1)
print("Сервер запущен и слушает на порту 10000...")
while True:
# Ждем соединения
print("Ожидание соединения...")
client_socket, client_address = server_socket.accept()
print(f"Соединение установлено с {client_address}")
try:
# Получаем данные от клиента
data = client_socket.recv(1024)
print(f"Получено сообщение: {data.decode()}")
# Отправляем ответ клиенту
client_socket.sendall("Привет, ровер Контакт!")
finally:
# Закрываем соединение
client_socket.close()
print(f"Соединение с {client_address} закрыто")
if __name__ == "__main__":
start_server()
Описание кода сервера:
Создает сокет с использованием
socket.AF_INET
(для IPv4) иsocket.SOCK_STREAM
(для TCP).Привязывает сокет к адресу
localhost
и порту10000
.Слушает входящие соединения с помощью метода
listen
.Принимает соединение с помощью метода
accept
.Получает данные от клиента и отправляет обратно ответ.
Закрывает соединение после обработки.
Код клиента:
import socket
def start_client():
# Создаем сокет
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Подключаемся к серверу
server_address = ('localhost', 10000)
client_socket.connect(server_address)
try:
# Отправляем сообщение серверу
message = "Привет, сервер!"
client_socket.sendall(message.encode())
# Получаем ответ от сервера
data = client_socket.recv(1024)
print(f"Получено сообщение от сервера: {data.decode()}")
finally:
# Закрываем соединение
client_socket.close()
if __name__ == "__main__":
start_client()
Описание кода клиента:
Создает сокет с использованием
socket.AF_INET
(для IPv4) иsocket.SOCK_STREAM
(для TCP).Подключается к серверу по адресу
localhost
и порту10000
.Отправляет сообщение серверу.
Получает ответ от сервера и выводит его на экран.
Закрывает соединение после обработки.
Как запустить:
Добавьте код сервера и клиента в Visual Studio Code
Запустите код сервера
Запустите код клиента
Если все правильно запущено, в консоль сервера начнут выводиться следующие данные:
Сервер запущен и слушает на порту 10000...
Ожидание соединения...
Соединение установлено с ('localhost', 52346)
Получено сообщение: Привет, сервер!
Соединение с ('127.0.0.1', 52346) закрыто
в консоль клиента:
Получено сообщение от сервера: Привет, ровер Контакт!
Процесс запуска кода:
Создайте два файла: server.py и client.py:

Вставьте соответвующие кода в нужные файлы:
Файл клиент:

Файл сервер:

Запустите код сервера: в данном случае нет необходимости качать дополнительные библиотеки, поэтому просто запускаем код сервера, в консоль выведется следующее:

Запустите код клиента: после запуска кода сервера нужно подключить клиент. Для этого перейдите в файл client.py и запустите код с помощью режима откладки:

Осмотрите консоли клиента и сервера: в клиент было отправлено сообщение "Получено сообщение от сервера":

В сервер было отправлено следующее выражение:

ПРИМЕЧАНИЕ: Ошибки, которые вывелись в консоль - это более подробное описания процесса закрытия соединения с клиентом, так что не пугайтесь этих ошибок!
threading:
Описание:
threading
используется в ровере для работы с потоками.Особенности:
Позволяет выполнять задачи параллельно в разных потоках.
Поддерживает блокировки и события для синхронизации потоков.
Роль в проекте:
Управление потоками для выполнения фоновых задач, таких как обработка видеопотока.
пример кода с использованием библиотеки threading:
import threading
import time
def move():
print("Ровер начинает движение.")
time.sleep(2)
print("Ровер продолжает движение.")
time.sleep(2)
print("Ровер останавливается.")
def blink_lights():
for _ in range(5):
print("Фары мигают.")
time.sleep(1)
# Создаем потоки
move_thread = threading.Thread(target=move)
blink_lights_thread = threading.Thread(target=blink_lights)
# Запускаем потоки
move_thread.start()
blink_lights_thread.start()
# Ждем завершения потоков
move_thread.join()
blink_lights_thread.join()
print("Все задачи завершены.")
Описание кода:
Функции
move
иblink_lights
:move
: Имитирует движение ровера, выводя сообщения о начале, продолжении и остановке движения. Используетtime.sleep
для задержек.blink_lights
: Имитирует мигание фар ровера, выводя сообщения о мигании фар и используяtime.sleep
для задержек.
Создание потоков:
move_thread
иblink_lights_thread
: Создаются объекты потоков, которые будут выполнять функцииmove
иblink_lights
соответственно.
Запуск потоков:
move_thread.start()
иblink_lights_thread.start()
: Запускают потоки, чтобы они начали выполнение своих функций параллельно.
Ожидание завершения потоков:
move_thread.join()
иblink_lights_thread.join()
: Обеспечивают ожидание завершения выполнения потоков, чтобы основной поток не завершился раньше, чем потоки выполнят свои задачи.
Сообщение о завершении:
print("Все задачи завершены.")
: Выводит сообщение после того, как все потоки завершили свою работу.
Процесс запуска кода:
Создайте файл .py и назовите его threadinglib.py:

Вставьте код в окно программирования:

Запустите код: в данном случае устанавливать ничего не нужно. Просто запускаем код и смотрим на результат:

aiofiles:
Описание:
aiofiles
используется в ровере для асинхронной работы с файловой системой.Особенности:
Обеспечивает асинхронное чтение и запись файлов.
Поддерживает интерфейс, аналогичный стандартной библиотеке
open
.
Роль в проекте:
Асинхронное чтение файлов, содержащих данные карты ArUco.
Пример кода с использованием библиотеки aiofiles:
```python
import asyncio
import aiofiles
async def log_data(data):
async with aiofiles.open('rover_log.txt', mode='a') as file:
await file.write(data + '\n')
print(f"Записано: {data}")
async def main():
tasks = [
log_data("The rover started moving."),
log_data("The rover continues to move."),
log_data("The rover stopped."),
log_data("The rover started flashing its headlights."),
log_data("The rover has finished flashing its headlights.")
]
await asyncio.gather(*tasks)
# Запуск событийного цикла
asyncio.run(main())
```
Описание кода:
Функция
log_data
:Принимает строку
data
и асинхронно записывает её в файлrover_log.txt
.Использует
aiofiles.open
для асинхронного открытия файла в режиме добавления (mode='a'
).await file.write(data + '\n')
асинхронно записывает данные в файл и добавляет перенос строки.Выводит сообщение о записанных данных.
Функция
main
:Создает список задач
tasks
, каждая из которых представляет собой вызов функцииlog_data
с различными сообщениями.Использует
asyncio.gather
для одновременного выполнения всех задач.
Запуск событийного цикла:
asyncio.run(main())
запускает асинхронную функциюmain
и управляет событийным циклом.
Обратите внимание: в данном коде так же используется библиотека asyncio! Она описана выше!
Итоговый вывод:
Записано: Ровер начал движение.
Записано: Ровер продолжает движение.
Записано: Ровер остановился.
Записано: Ровер начал мигать фарами.
Записано: Ровер завершил мигание фарами.
Процесс запуска кода:
Создайте файл aiofiles.py:

Пока что файл подсвечен желтым, он подсвечивается как только вы добавляете код в файл. Это потому, что библиотека не установлена. Если вас смущает то, что после установки библиотеки файл до сих пор помечен желтым, можете перезагрузить Visual Studio Code и проблема решится.
Добавьте код в файл:

Установите необходимые зависимости: в данном случае библиотеку aiofiles надо докачать. Открываем терминал и прописываем туда следующую команду:
pip install aiofiles

Запустите код: после скачивания библиотеки можно приступать к тестированию кода. Запустите кода и посмотрите, что происходит:

В консоль выводится информация о записанных данных. Так же в иерархии создался новый текстовый файл rover_log.txt

logging:
Описание:
logging
используется в ровере для ведения логов.Особенности:
Обеспечивает гибкую систему для создания логов различных уровней (информация, предупреждения, ошибки и т.д.).
Поддерживает вывод логов в консоль, файлы и другие обработчики.
Роль в проекте:
Ведение логов для отслеживания работы программы и отладки.
Пример кода с использованием библиотеки logging:
```python
import logging
# Настройка логгера
logging.basicConfig(
filename='rover_log.log',
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Создание логгера
logger = logging.getLogger('rover_logger')
# Примеры логирования сообщений
def log_rover_activity():
logger.debug("The rover started moving.")
logger.info("The rover continues to move.")
logger.warning("The rover is approaching an obstacle.")
logger.error("The rover encountered an obstacle.")
logger.critical("The rover has lost contact with the user.")
# Вызов функции для логирования активности ровера
log_rover_activity()
```
Описание кода:
Настройка логгера:
logging.basicConfig
настраивает базовую конфигурацию логгера.filename='rover_log.log'
указывает, что логи будут записываться в файлrover_log.log
.level=logging.DEBUG
устанавливает минимальный уровень логирования наDEBUG
, что означает, что все сообщения отDEBUG
и выше будут записаны.format='%(asctime)s - %(levelname)s - %(message)s'
задает формат записи логов, включая время, уровень сообщения и само сообщение.datefmt='%Y-%m-%d %H:%M:%S'
задает формат времени.
Создание логгера:
logger = logging.getLogger('rover_logger')
создает экземпляр логгера с именемrover_logger
.
Примеры логирования сообщений:
logger.debug
,logger.info
,logger.warning
,logger.error
,logger.critical
используются для логирования сообщений различных уровней важности.Вызов функции для логирования активности ровера:
log_rover_activity()
вызывает функцию, которая логирует различные сообщения от ровера.
Итоговый вывод:
В консоль данных не выводится. Вместо этого данные записываются в лог
2023-10-05 14:30:00 - DEBUG - Ровер начал движение.
2023-10-05 14:30:00 - INFO - Ровер продолжает движение.
2023-10-05 14:30:00 - WARNING - Ровер приближается к препятствию.
2023-10-05 14:30:00 - ERROR - Ровер столкнулся с препятствием.
2023-10-05 14:30:00 - CRITICAL - Ровер потерял связь с пользователем.
Процесс запуска кода:
Создайте файл с названием logginglib.py:

Добавьте код в окно программирования:

Запустите код: здесь не нужно докачивать библиотеки, поэтому просто запускаем код:

В консоль ничего не вывелось, однако в иерархии добавился файл rover_log.log:

queue:
Описание:
queue
используется в ровере для работы с очередями.Особенности:
Поддерживает различные типы очередей (FIFO, LIFO, приоритетные).
Безопасна для использования в многопоточных программах.
Роль в проекте:
Хранение кадров видеопотока для дальнейшей обработки.
Пример кода с использованием библиотеки logging:
import queue
import threading
import time
# Создание очереди
message_queue = queue.Queue()
# Функция для добавления сообщений в очередь
def producer():
for i in range(10):
message = f"Сообщение {i}"
message_queue.put(message)
print(f"Ровер отправил: {message}")
time.sleep(1)
# Функция для обработки сообщений из очереди
def consumer():
while True:
message = message_queue.get()
if message is None:
break
print(f"Пользователь обработал: {message}")
message_queue.task_done()
# Создание и запуск потоков
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
# Ожидание завершения работы производителя
producer_thread.join()
# Добавление сигнала о завершении для потребителя
message_queue.put(None)
consumer_thread.join()
print("Работа завершена.")
Описание кода:
Создание очереди:
message_queue = queue.Queue()
создает очередь для хранения сообщений.
Функция для добавления сообщений в очередь:
producer
функция добавляет 10 сообщений в очередь с интервалом в 1 секунду.
Функция для обработки сообщений из очереди:
consumer
функция бесконечно обрабатывает сообщения из очереди, пока не получит сигнал о завершении (None
).
Создание и запуск потоков:
producer_thread
иconsumer_thread
создаются и запускаются для параллельной работы.
Ожидание завершения работы производителя:
producer_thread.join()
ожидает завершения работы потока производителя.
Добавление сигнала о завершении для потребителя:
message_queue.put(None)
добавляет сигнал о завершении для потребителя.
Ожидание завершения работы потребителя:
consumer_thread.join()
ожидает завершения работы потока потребителя.
Итоговый вывод:
Ровер отправил: Сообщение 0
Пользователь обработал: Сообщение 0
Ровер отправил: Сообщение 1
Пользователь обработал: Сообщение 1
Ровер отправил: Сообщение 2
Пользователь обработал: Сообщение 2
Ровер отправил: Сообщение 3
Пользователь обработал: Сообщение 3
Ровер отправил: Сообщение 4
Пользователь обработал: Сообщение 4
Ровер отправил: Сообщение 5
Пользователь обработал: Сообщение 5
Ровер отправил: Сообщение 6
Пользователь обработал: Сообщение 6
Ровер отправил: Сообщение 7
Пользователь обработал: Сообщение 7
Ровер отправил: Сообщение 8
Пользователь обработал: Сообщение 8
Ровер отправил: Сообщение 9
Пользователь обработал: Сообщение 9
Работа завершена.
Процесс запуска кода:
Создайте файл queuelib.py:

Добавьте в окно программирования код:

Запустите код: в данном случае ничего устанавливать не нужно, поэтому просто запускаем код:

В итоге наше окно иерархии выглядит так, в нем есть различные файлы и это далеко не всё, что можно сделать с этими библиотеками!

Last updated