Представьте себе: вы в кофейне, и там всего один бариста. В синхронном мире этот бариста примет ваш заказ, помолет зёрна, приготовит кофе, подаст его, почистит машину и только потом перейдёт к следующему клиенту. Тем временем двадцать человек стоят в очереди, постукивая ногой и проверяя часы. Звучит неэффективно? Добро пожаловать в мир до asyncio.

Теперь представьте, что тот же бариста может принимать несколько заказов, запускать несколько процессов приготовления и подавать готовые кофе, пока другие ещё готовятся. Вот что такое asyncio — искусство делать много вещей без дополнительных затрат на найм новых бариста (или, в нашем случае, запуск нескольких потоков).

Асинхронная революция: почему вашему коду нужно перестать быть диктатором

Традиционный код на Python похож на того микроменеджера, которого мы все знаем — он настаивает на том, чтобы закончить одну задачу полностью, прежде чем даже думать о начале другой. Но что, если ваша программа большую часть времени ждёт? Ждёт сетевых запросов, чтения файлов, запросов к базе данных или того коллеги, который обещал отправить вам документацию по API «завтра» (шесть месяцев назад).

Асинхронное программирование на Python позволяет писать код, который может обрабатывать несколько задач одновременно без сложности и затрат на множественные потоки. Вместо блокировки всего приложения во время ожидания операций ввода-вывода, asyncio позволяет другим задачам выполняться в эти периоды простоя.

Библиотека Python asyncio является стандартным способом написания асинхронного кода с Python 3.4, предоставляя разработчикам сопрограммы — специальные функции, которые можно приостанавливать и возобновлять для параллельного выполнения в одном потоке.

Понимание цикла событий: дирижёр вашего асинхронного оркестра

Прежде чем мы углубимся в код, давайте разберёмся с главным действующим лицом: циклом событий. Представьте его себе как дирижёра, подпитанного кофеином, который никогда не спит и постоянно проверяет, готов ли кто-либо из оркестрантов (сопрограмм) сыграть следующую ноту.

graph TD A[Начало цикла событий] --> B[Проверка готовых задач] B --> C{Есть готовые задачи?} C -->|Да| D[Выполнение задачи] D --> E{Задача завершена?} E -->|Нет| F[Задача передаёт управление] F --> B E -->|Да| G[Удаление задачи] G --> B C -->|Нет| H[Ожидание событий ввода-вывода] H --> B

Цикл событий — это то, что делает всё волшебство возможным. Он управляет всеми вашими сопрограммами, решает, когда их запускать, и координирует взаимодействие между различными асинхронными операциями.

Ваше первое свидание с async/await

Давайте начнём с чего-то простого. Вот классический пример «Hello, Async World», который не оставит вас в ожидании:

import asyncio
async def greet_async(name):
print(f"Hello, {name}!")
await asyncio.sleep(1)  # Имитация некоторой асинхронной работы
print(f"Goodbye, {name}!")
async def main():
await greet_async("AsyncIO")
# Запуск асинхронной функции
asyncio.run(main())

Ключевое слово async def создаёт функцию-сопрограмму, а await — это как сказать «эй, мне нужно подождать этого, но не просто стой здесь — сделай что-нибудь ещё, если можешь!»

Создание вашего первого реального асинхронного приложения

Теперь давайте создадим что-то более практичное. Представьте, что вам нужно получить данные из нескольких API. В синхронном коде вы бы ждали завершения каждого запроса, прежде чем начать следующий. С asyncio вы можете отправить все запросы одновременно:

import asyncio
import aiohttp
import time
async def fetch_data(session, url, delay):
"""Имитация получения данных из API с некоторой задержкой"""
print(f"Starting request to {url}")
await asyncio.sleep(delay)  # Имитация сетевой задержки
print(f"Completed request to {url} after {delay}s")
return f"Data from {url}"
async def fetch_multiple_sources():
"""Получение данных из нескольких источников параллельно"""
urls_and_delays = [
("https://api.service1.com", 2),
("https://api.service2.com", 1),
("https://api.service3.com", 3),
("https://api.service4.com", 1.5)
]
# Создание задач для параллельного выполнения
tasks = []
async with aiohttp.ClientSession() as session:  # Мы сделаем вид, что используем aiohttp
for url, delay in urls_and_delays:
task = asyncio.create_task(fetch_data(session, url, delay))
tasks.append(task)
# Ожидание завершения всех задач
results = await asyncio.gather(*tasks)
return results
async def main():
start_time = time.time()
results = await fetch_multiple_sources()
end_time = time.time()
print(f"Все запросы выполнены за {end_time - start_time:.2f} секунд")
for result in results:
print(result)
asyncio.run(main())

Преимущество здесь в том, что пока самый длинный запрос занимает 3 секунды, общее время составляет примерно 3 секунды, а не 7,5 секунд (2+1+3+1,5), как было бы синхронно.

Управление асинхронными задачами: искусство жонглирования

Иногда вам нужно больше контроля над вашими асинхронными операциями. Вот где управление задачами пригодится:

import asyncio
import random
async def worker(name, work_time):
"""Рабочий, который выполняет некоторую асинхронную работу"""
print(f"Работник {name} начал")
await asyncio.sleep(work_time)
print(f"Работник {name} закончил через {work_time}с")
return f"Результат от {name}"
async def supervisor():
"""Наблюдение за несколькими работниками с разными стратегиями"""
# Стратегия 1: ожидание завершения всех работников
print("=== Стратегия 1: ожидание завершения всех работников ===")
workers = [
worker("Алиса", 2),
worker("Боб", 1),
worker("Чарли", 3)
]
results = await asyncio.gather(*workers)
print(f"Все работники завершили: {results}")
# Стратегия 2: обработка результатов по мере их завершения
print("\n=== Стратегия 2: обработка по мере завершения ===")
workers = [
asyncio.create_task(worker(f"Работник-{i}", random.uniform(0.5, 2.5)))
for i in range(5)
]
for completed_task in asyncio.as_completed(workers):
result = await completed_task
print(f"Получен результат: {result}")
asyncio.run(supervisor())

Продвинутые паттерны: производитель-потребитель с асинхронными очередями

Реальным приложениям часто нужно обрабатывать потоки данных. Вот как реализовать паттерн производитель-потребитель с очередями asyncio:

import asyncio
import random
async def producer(queue, producer_id, item_count):
"""Производит предметы и кладёт их в очередь"""
for i in range(item_count):
# Имитация некоторой работы для производства предмета
await asyncio.sleep(random.uniform(0.1, 0.5))
item = f"Предмет-{producer_id}-{i}"
await queue.put(item)
print(f"Производитель {producer_id} произвёл: {item}")
print(f"Производитель {producer_id} закончил")
async def consumer(queue, consumer_id):
"""Потребляет предметы из очереди"""
while True:
try:
# Ожидание предмета, но тайм-аут через 3 секунды
item = await asyncio.wait_for(queue.get(), timeout=3.0)
# Имитация обработки предмета
await asyncio.sleep(random.uniform(0.2, 0.8))
print(f"Потребитель {consumer_id} обработал: {item}")
# Отметить задачу как выполненную
queue.task_done()
except asyncio.TimeoutError:
print(f"Потребитель {consumer_id} превысил время ожидания предметов")
break
async def producer_consumer_demo():
"""Демонстрация паттерна производитель-потребитель с asyncio"""
# Создание очереди с максимальным размером
queue = asyncio.Queue(maxsize=10)
# Создание нескольких производителей и потребителей
producers = [
asyncio.create_task(producer(queue, i, 5))
for i in range(3)
]
consumers = [
asyncio.create_task(consumer(queue, i))
for i in range(2)
]
# Ожидание завершения всех производителей
await asyncio.gather(*producers)
# Ожидание обработки всех предметов в очереди
await queue.join()
# Отмена потребителей (они работают бесконечно)
for consumer_task in consumers:
consumer_task.cancel()
print("Все предметы обработаны!")
asyncio.run(producer_consumer_demo())