Beet

Сельдерей и свекла

Как оно запускается?

"Одного не могу понять: как оно запускается?" — недоумевал коллега, собравшийся переводить свой проект на платформу Celery. Вопрос не выглядел странным. Я уже успел к тому моменту повозиться с Celery и знал, что официальная документация, подробная и объемная, не позволяет разглядеть за деревьями леса. Вот мы все установили и сконфигурировали. Написали модуль tasks.py с функциями, которые будут параллельно выполнять поступающие задания. Подняли фреймворк из консоли командой:

 $ celery -E -A имя_моего_пакета worker

Допустим, я запустил эту команду в фоне как сессию менеджера screen или как сессию Supervisor-а и теперь воркеры ждут входящих заданий. Но что делать дальше? Как раздавать задания? Не из питоновской же консоли, как в примерах! Как узнавать, сколько из них выполнено, сколько осталось? Словом, как построить на базе Celery многозадачное приложение? К сожалению, ни в документации ни в примерах я не нашел подсказок. Понадобилось порядочно времени, чтобы выработать некую стандартную схему.

Структура приложения

Допустим, нужно написать краулер. Пускай он назывется beetcrawler 1. Минимальное дерево проекта будет выглядеть так:

.
├── beetcrawler
│   ├── __init__.py
│   ├── celery.py
│   └── tasks.py
├── celeryconfig.py
└── scripts
    └── producer.py

Для начала создадим корневой каталог проекта, внутри него — пакет beetcrawler.

Конфигурация

В качестве бекенда Celery-приложений чаще всего используют RabbitMQ. Для небольших проектов я предпочитаю другой популярный вариант: Redis, т.к. он представляется более легким.

В корневой директории проекта создаем файл celeryconfig.py, куда прописываем основные настройки фреймворка.

1
2
3
4
5
6
7
8
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost'
task_soft_time_limit = 180
task_time_limit = 600
imports = ('beetcrawler.tasks',)
task_routes = {
    'beetcrawler.tasks.scrap': {'queue': 'scrap'}
}
  • Опции broker_url и result_backend указывают на сервис Redis.

  • task_soft_time_limit и task_time_limit представляют собой установки таймаутов в секундах по умолчанию для параллельно выполняющихся задач. Разница между ними в том, что в первом случае создается исключение SoftTimeLimitExceeded, которое можно обработать, второй же таймаут просто "гасит" ленивого воркера и заменяет его новым. Таймауты, прописанные здесь, могут быть переопределены для каждой функции индивидуально 2.

  • imports указывает на модуль, отвечающий за выполнение задач. Каждую такую функцию можно узнать по декоратору @task.

  • task_routes — директива не обязательная, но весьма полезная. По умолчанию Celery создает одну общую очередь заданий. Удобнее каждому типу задач назначить отдельную очередь. Это позволит управлять ресурсами, т.е. на разные задачи выделять нужное число "работников". Можно каждой задаче предоставить свою очередь, либо — одну очередь нескольким задачами — зависит от проекта.

Приложение

Модуль beetcrawler/celery.py — это приложение 3, которое запускает фреймворк. Поскольку все опции вынесены в отдельный модуль (celeryconfig.py), скрипт cовсем короткий:

1
2
3
4
from celery import Celery

app = Celery('beetcrawler')
app.config_from_object('celeryconfig')

В строке 4 указано, что конфигурация прописана в модуле celeryconfig.

Да, кстати, я наконец-то пересел на третий Python, поэтому здесь и далее отсутствуют инструкции типа from __future__ import. Для версии 2.7 они кое-где понадобятся (см. официальную документацию).

Рабочая лошадка

У типичного краулера есть как минимум одна задача, которую можно и нужно выполнять параллельно. Я говорю об HTTP-запросах. Если мы запустили 16 воркеров и отправили в качестве заданий сто адресов, то первые 16 будут сразу же взяты в обработку. Как только очередной воркер освободился, он берет новое задание. И так пока все задания не будут выполнены или провалены.

Назовем основную функцию scrap. Для регистрации ее как асинхронной задачи Celery служит декоратор @task:

1
2
3
4
5
6
from .celery import app

@app.task(soft_time_limit=30, autoretry_for=(IOError,), retry_kwargs={'max_retries': 10})
def scrap(self, url):
    result = ... # запрос, извлечение информации
    return result

Обратите внимание на параметры декоратора @task.

  • soft_time_limit : пример того, как можно переопределить таймаут по умолчанию.
  • autoretry_for, retry_kwargs : при возникновении IOError фреймворк попытается повторно выполнить задачу столько раз сколько прописано в параметре max_retries. Большинство других исключений приведут к аварийному выходу из функции, если, конечно, мы не напишем для них обработчик.

Продюсер

Вернемся к вопросу: как раздавать задания? Точнее: как автоматизировать этот процесс? Я знаю один хороший способ: написать программу. Создадим управляющий скрипт: scripts/producer.py. Где его размещать — дело вкуса4.

Кода в этом модуле может быть много или мало, в зависимости от того, какие опции предоставляются пользователю, откуда берутся URL-ы: из базы, текстового файла, какого-нибудь архива и т.д. Допустим, имеется список URLS с адресами, которые надо пройти. Тогда сердце управляющей программы может выглядеть так:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import random
import celery
from beetcrawler.tasks import scrap

URLS = ['www.example.com', 'weather.org', ...]
MIN_DELAY = 1
MAX_DELAY = 5
...

random_delay = lambda: random.uniform(MIN_DELAY, MAX_DELAY)
jobs = celery.group([scrap.signature((url,), countdown=random_delay())
                     for url in URLS])
jobs.apply_async().join()
  • random_delay : анонимная функция, возвращающая при каждом вызове случайное число в диапазоне MIN_DELAY..MAX_DELAY. Она нужна, чтобы после каждого задания выдерживать паузу.

  • jobs : из списка адресов формируются задания. Метод celery.group указывает фреймворку, что их следует выполнять параллельно, а не одну за другой или как-то еще. Есть и другие возможности, они описаны на странице Canvas: Designing Work-flows.

  • scrap.signature((url,), countdown=random_delay()). "Сигнатура" — это специальный метод Celery, указывающий на то, что функция должна выполняться отдельным, независимым процессом. Первым параметром передается список аргументов.

  • Инструкция jobs.apply_async.join() отправляет задания в очередь и дожидается завершения всей работы.

Пуск!

Запуск рабочих процессов

Вначале необходимо запустить в отдельной консоли или как фоновый процесс Celery с нашей "начинкой". Из корневой директории проекта:

$ celery -E -A beetcrawler worker --loglevel=info -Q scrap

По умолчанию фреймворк запустит количество воркеров, равное числу ядер на машине. Чтобы задать другое значение, используется ключ -c, например: -c16 — 16 "работников".

Параметр -Q scrap указывает, что воркеры выполняют задачи, привязаные к очереди scrap. Это задается в celeryconfig.py (см. выше).

Запуск producer-а

$ python scripts/producer.py

Producer будет ждать завершения всех задач. После чего воркеры снова "уснут". Остановить их принудительно поможет команда:

pkill -9 -f 'beetcrawler worker'

Есть способ запустить воркеры не как отдельный процесс, а из управляющего скрипта. Иногда это удобнее, хотя мешает отладке. Возможно, я расскажу об этом в одной из следующих заметок. Впрочем, есть вопросы и поважнее. Например: как организовать взаимодействие Celery-приложения с базой данных? Нужно же что-то делать с полученными результатами.


  1. Celery — "сельдерей" по-английски, а beet — свекла. 

  2. Для Celery-приложений, в которых параллельно выполяняются сетевые операции — а в случае многозадачного краулера именно так и происходит — таймауты крайне важны. Фреймворк не выставит их за вас! 

  3. "Приложение" в том же смысле, что и в большинстве GUI- и web- фреймворков, вроде PyQt, Flask, Tornado. 

  4. Я привык размещать стартовые скрипты в каталоге scripts. С таким же успехом producer.py можно сделать модулем внутри пакета beetcrawler

social

Яндекс.Метрика