Асинхронное программирование в Python с использованием async/await позволяет эффективно выполнять задачи, связанные с операциями ввода-вывода (IO), такие как работа с сетью, файлами или базами данных. В этой статье мы рассмотрим основные принципы работы с async/await на практических примерах, а также изучим ключевые методы модуля asyncio.
Рассмотрим принципы работы с async/await на практических примерах. Для начала рассмотрим базовый пример асинхронной функции, которая имитирует загрузку данных с разных URL:
простой пример асинхронного кода
import asyncio
async def fetch_data(url):
await asyncio.sleep(1)
return f"Данные с {url}"
async def main():
# Параллельное создание задач
task1 = asyncio.create_task(fetch_data("site1.com"))
task2 = asyncio.create_task(fetch_data("site2.com"))
# Ждём результатов обоих задач одновременно
result1, result2 = await asyncio.gather(task1, task2)
print(result1)
print(result2)
# Запуск
# asyncio.run(main())
Теперь рассмотрим
параллельное выполнение асинхронных операций
Для лучшего понимания потока выполнения асинхронного кода, рассмотрим рисунок последовательности:
import asyncio
async def fetch_data(url):
await asyncio.sleep(1)
return f"Данные с {url}"
async def main():
# Параллельное создание задач
task1 = asyncio.create_task(fetch_data("site1.com"))
task2 = asyncio.create_task(fetch_data("site2.com"))
# Ждём результатов обоих задач одновременно
result1, result2 = await asyncio.gather(task1, task2)
print(result1)
print(result2)
# Запуск
# asyncio.run(main())

Рассмотрим подробнее, что происходит:
1. Создание задачи
Главная программа создаёт две задачи одновременно, не дожидаясь завершения первой
2. Параллельное выполнение
Обе задачи выполняются независимо друг от друга, каждая ожидает по 1 секунде
3. Обработка результата
После завершения обеих задач их результаты собираются через asyncio.gather()
Для лучшего понимания преимуществ async/await, сравним синхронный и асинхронный подходы:
Характеристика |
Синхронный код |
Асинхронный код (async/await) |
Выполнение операций |
Последовательное, блокирующее |
Параллельное, неблокирующее |
Использование CPU |
Ожидание операции занимает ресурсы |
|
Время ожидания используется для других задач |
Сложность кода |
Простой, линейный |
Немного сложнее, но читаемый |
Масштабируемость |
Ограниченная |
Высокая при работе с IO операциями |
Основные преимущества async/await:
- Эффективность:
- Одновременное выполнение нескольких операций
- Оптимальное использование системных ресурсов
- Быстрое выполнение IO-операций
- Читаемость кода:
- Синтаксис близок к обычному синхронному коду
- Логика выполнения более очевидна
- Проще поддерживать и модифицировать
- Масштабируемость:
- Легко добавлять новые асинхронные операции
- Удобно управлять параллельным выполнением
- Простая обработка ошибок через try-except
Для практического применения важно помнить:
- Используйте async/await для IO-операций (сеть, файлы, базы данных)
- Комбинируйте задачи с помощью
asyncio.gather()
для параллельного выполнения
- Обрабатывайте ошибки в каждой асинхронной функции отдельно
- Не используйте async/await для вычислительных задач без IO операций
Такой подход позволяет создавать эффективные и масштабируемые приложения с минимальными изменениями в структуре кода.
Давайте рассмотрим основные полезные процедуры модуля asyncio и их практическое применение.
Основные методы управления задачами
1.
create_task() - создание независимых задач
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
return f"Задача {name} завершена"
async def main():
# Создание задач
task1 = asyncio.create_task(task("Первая", 1))
task2 = asyncio.create_task(task("Вторая", 2))
# Получение результатов
results = await asyncio.gather(task1, task2)
print(results)
asyncio.run(main())
2.
gather() - параллельное выполнение задач
import asyncio
async def fetch_data(url):
await asyncio.sleep(1)
return f"Данные с {url}"
async def main():
# Создание списка задач
tasks = [
fetch_data("site1.com"),
fetch_data("site2.com"),
fetch_data("site3.com")
]
# Параллельное выполнение
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
3. wait_for() - выполнение с таймаутом
3. wait_for() - выполнение с таймаутом
import asyncio
async def long_operation():
await asyncio.sleep(3)
return "Готово"
async def main():
try:
result = await asyncio.wait_for(long_operation(), timeout=2)
print(result)
except asyncio.TimeoutError:
print("Таймаут!")
asyncio.run(main())
4.
wait() - ожидание нескольких задач
import asyncio
async def task1():
await asyncio.sleep(1)
return "Задача 1"
async def task2():
await asyncio.sleep(2)
return "Задача 2"
async def main():
tasks = [task1(), task2()]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"Завершено: {[t.result() for t in done]}")
print(f"В процессе: {len(pending)}")
asyncio.run(main())
5.
Queue - асинхронная очередь
import asyncio
async def producer(queue):
for i in range(3):
await queue.put(i)
print(f"Произведено: {i}")
await asyncio.sleep(0.1)
async def consumer(queue):
while True:
item = await queue.get()
print(f"Потреблено: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue()
# Создание производителя и потребителя
producer_task = asyncio.create_task(producer(queue))
consumer_task = asyncio.create_task(consumer(queue))
# Ожидание завершения производителя
await producer_task
await queue.join() # Ожидание обработки всех элементов
consumer_task.cancel()
asyncio.run(main())
6.
Semaphore - ограничение параллельных операций
import asyncio
async def fetch_data(sem, url):
async with sem:
await asyncio.sleep(1)
return f"Данные с {url}"
async def main():
sem = asyncio.Semaphore(2) # Ограничение до 2 параллельных операций
tasks = [
fetch_data(sem, f"site{i}.com")
for i in range(5)
]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
7. Структура асинхронного приложения
import asyncio
import logging
# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def main():
try:
# Инициализация ресурсов
await setup_resources()
# Запуск основных задач
await asyncio.gather(
task1(),
task2(),
return_exceptions=True
)
except Exception as e:
logger.error(f"Ошибка в основном цикле: {e}")
finally:
# Очистка ресурсов
await cleanup_resources()
if __name__ == "__main__":
asyncio.run(main())
async def create_and_manage_tasks():
# Создание задач с именами для лучшей отладки
task1 = asyncio.create_task(
fetch_data("site1.com"),
name="fetch_site1"
)
task2 = asyncio.create_task(
fetch_data("site2.com"),
name="fetch_site2"
)
# Параллельное выполнение с таймаутом
try:
results = await asyncio.gather(
task1,
task2,
return_exceptions=True,
timeout=5.0
)
except asyncio.TimeoutError:
logger.warning("Таймаут при выполнении задач")
# Отмена задач, если они всё ещё выполняются
task1.cancel()
task2.cancel()
raise
class ResourceManager:
def __init__(self):
self._lock = asyncio.Lock()
self._resources = {}
async def acquire_resource(self, resource_id):
async with self._lock:
if resource_id not in self._resources:
self._resources[resource_id] = await self._create_resource(resource_id)
return self._resources[resource_id]
async def _create_resource(self, resource_id):
try:
# Здесь создание ресурса
return Resource(resource_id)
except Exception as e:
logger.error(f"Ошибка при создании ресурса {resource_id}: {e}")
raise
async def safe_execute(func, *args, **kwargs):
try:
return await func(*args, **kwargs)
except asyncio.CancelledError:
logger.warning(f"Задача была отменена: {func.__name__}")
raise
except Exception as e:
logger.error(f"Ошибка в {func.__name__}: {e}")
raise
finally:
await cleanup_resources(func.__name__)
# Использование
async def main():
try:
result = await safe_execute(fetch_data, "site.com")
process_result(result)
except Exception as e:
logger.error(f"Глобальная ошибка: {e}")
await handle_error(e)
Рекомендации по использованию
- Логирование:
- Всегда используйте именованные логгеры
- Логируйте ошибки с контекстом
- Используйте разные уровни логирования (INFO, WARNING, ERROR)
- Управление ресурсами:
- Используйте контекстные менеджеры (async with)
- Всегда очищайте ресурсы в блоке finally
- Используйте семафоры для ограничения параллельных операций
- Обработка ошибок:
- Обрабатывайте специфические исключения
- Используйте return_exceptions в gather
- Логируйте ошибки с контекстом
- Таймауты:
- Устанавливайте разумные таймауты для операций
- Обрабатывайте TimeoutError
- Отменяйте задачи при таймауте
- Именование задач:
- Давайте осмысленные имена задачам
- Используйте name параметр в create_task
- Это помогает в отладке и мониторинге
Анти-паттерны, которых следует избегать
1.
Небезопасное создание зада
# ПЛОХО
task = asyncio.create_task(fetch_data())
# ХОРОШО
task = asyncio.create_task(fetch_data(), name="fetch_data_task")
2.
Неправильная обработка ошибок
# ПЛОХО
try:
await asyncio.gather(task1, task2)
except Exception:
pass
# ХОРОШО
try:
await asyncio.gather(task1, task2, return_exceptions=True)
except Exception as e:
logger.error(f"Ошибка: {e}")
3.
Нарушение иерархии ресурсов
# ПЛОХО
async with session.get(url) as response:
data = await response.text()
# Возврат без очистки
return data
# ХОРОШО
async with session.get(url) as response:
try:
data = await response.text()
return data
finally:
await response.release()
Заключение
Асинхронное программирование с использованием async/await позволяет создавать высокопроизводительные и масштабируемые приложения. Правильное использование методов модуля asyncio и соблюдение рекомендаций помогут избежать распространённых ошибок и улучшить качество кода.
Эта статья поможет вам освоить асинхронное программирование в Python и эффективно применять его в своих проектах.