Зачем пауку восемь ног?

We Are The Robots

Как мы видели в предыдущей заметке, в типичном сценарии веб-краулер:

  1. Обходит каталог.
  2. Получает подробные данные о каждом его элементе (товаре, книге, услуге и т.п.).

Еще одна типичная задача, которой мы не касались — обработка и сохранение полученных данных.

Выполнять эти действия лучше параллельно. Ведь, каждое из них предполагает задержку: ожидание ответа сервера, паузу между запросами, время на запись в базу данных (это дорогая операция)... При последовательном подходе каждый "такт" будет задерживать остальные и когда мы дождемся завершения работы данные, вполне возможно, будут уже неактуальны.

Существует много способов добиться многозадачности. На одном конце шкалы — Threads ("нити", "потоки") и Coroutines ("со-процедуры"), на другом — распределенные приложения, где разные компоненты работают на отдельных машинах. При этом, на какую бы из перечисленных технологий мы ни опирались, существуют разные способы (шаблоны, схемы, модели) связать между собой компоненты приложения. Рассмотрим простую и удобную модель, известную как Producer/Consumer.

Producer / Consumer

Producer-Consumer Model

  • Producer помещает задачи в очередь заданий (Tasks Queue).
  • Один или несколько исполнителей (Worker) достают из очереди задания одно за другим, выполняют их и складывают результаты в очередь отчетов (Reports Queue). Результат может быть либо успешным либо содержать сообщение об ошибке1.
  • Consumer, подключенный к очереди отчетов, вынимает из нее результаты и что-то с ними делает (например, сохраняет).

Рабочая лошадка

Базовым компонентом для построения программы по схеме Producer/Consumer является класс Worker.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import logging

class Worker(object):
    '''
    Base class for various types of workers.

    A typical worker listens to a queue called 'inbox'. When a new task is available,
    the worker takes it out, handles it in 'do_task' method, then puts result to 'outbox' queue.

    Empty task (None) is interpreted as 'poison pill'. It signals to the worker
    to exit its tasks-handling loop.
    '''

    def __init__(self, inbox=None, outbox=None, name=None):
        '''
        Initializer.

        inbox : queue providing tasks
        outbox: queue for task results
        '''
        self._inbox = inbox
        self._outbox = outbox


    def do_task(self, task):
        '''
        Task handling routine. Must be implemented by a successor.
        '''
        raise NotImplementedError('This method must be implemented by a successor')

    def on_tasks_done(self):
        '''
        Called immediately after breaking queue listening loop.
        Override this method for clean-up routines. Default implementation just
        logs 'All tasks done' message.
        '''
        logging.info('All tasks done')

    def work(self):
        '''
        Start listening for the 'inbox' queue.
        To stop the infinite loop, put None to the inbox.
        '''
        logging.info('Working...')
        while True:
            task = self._inbox.get()
            if task is None:
                logging.debug('Got poison pill')
                self._inbox.task_done()
                break
            try:
                self.do_task(task)   
            finally:
                self._inbox.task_done()

        self.on_tasks_done()

Представленный класс ничего не знает о технологии, в рамках которой его используют (threading, multiprocessing или иное), но верит, что очереди, к которым он подключен, предоставляют методы:

метод действие
get снять задание из очереди; метод является блокирующим (см. ниже).
put добавить задание в очередь
task_done подтвердить, что обработка задания завершена2

Worker работает в бесконечном цикле. Единственный способ заставить его выйти из дома — подсунуть в очередь входящих заданий "отравленную таблетку", в качестве которой здесь используется None. Не следует думать, что при отсутствии заданий программа "бегает" по бесконечному циклу, словно белка в колесе, отнимая у соседей процессорное время. Поскольку метод Queue.get является блокирующим, при отсутствии результата наша белка просто застревает на нем.

Workerl

Чтобы проверить, как это работает, напишем простейший скрипт. Здесь будут использоваться старые добрые питоновские Interpreter Threads3. Программа выполняет очень важную работу: она переписывает слова задом наперед.

  1. Функция produce_words читает из текстового файла ../data/countries.txt названия стран. Считав очередное название, она помещает его в очередь заданий.
  2. Исполнители, подключенные к очереди заданий, берут названия, переворачивают их (так, "Russia" превращается в "aissuR") и кладут в очередь отчетов кортеж вида:
    (исходное слово, перевернутое слово).
  3. Reporter забирает результаты из очереди отчетов и выводит их в консоль.

Количество исполнителей задается в командной строке при вызове скрипта.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
'''
scripts/threading_worker.py

Demonstration of craswlersinfo.Worker class with use of threading multitasking model.

Usage: threading_worker.py NUMWORKERS
'''
import sys, os
from Queue import Queue
import time
import threading

import logging
logging.basicConfig(
    level=logging.DEBUG,
    datefmt='%Y-%m-%d %H:%M:%S',
    format='%(asctime)s - %(levelname)-8s - %(threadName)-11s - %(message)s', 
)

from pkg_resources import resource_filename
ROOTDIR = resource_filename(__name__, '../')
sys.path.append(ROOTDIR)
DATADIR = os.path.join(ROOTDIR, 'data')

# import application packages
from crawlersinfo.multitasking import Worker

class ReverseWorker(Worker):
    def do_task(self, task):
        #logging.debug('Processing task: %s', task)
        self._outbox.put((task, task[::-1]))
        time.sleep(0)


class Reporter(Worker):
    def do_task(self, task):
        print '%s --> %s' % task


def produce_words(filename, queue, numworkers=1):
    logging.info('Opening %s...', filename)
    with open(filename, 'r') as f:
        for line in f:
            queue.put(line.strip())
    for i in range(numworkers):
        logging.debug('Putting poison pill to the queue...')
        queue.put(None)


def start_workers(target, num_workers=1, name='Worker', args=()):
    logging.info("Starting %d %s", num_workers, ('%ss' % name) if num_workers > 1 else name)
    for i in range(num_workers):
        t = threading.Thread(target=target, args=args)
        t.daemon = True
        t.name = '%s-%03d' % (name, i+1) if num_workers > 1 else name
        t.start()

if __name__ == '__main__':

    if len(sys.argv) < 2:
        print "Usage: threading_worker.py NUMWORKERS"
        sys.exit(1)

    numworkers = int(sys.argv[1])
    tasks_queue = Queue()
    reports_queue = Queue()

    reporter = Reporter(reports_queue)
    start_workers(reporter.work, name='Reporter')

    worker = ReverseWorker(tasks_queue, reports_queue)
    start_workers(worker.work, numworkers)
    filename = os.path.abspath(os.path.join(DATADIR, 'countries.txt'))
    start_workers(produce_words, name='Producer', args=(filename, tasks_queue, numworkers))

    while True:
        time.sleep(1)
        if tasks_queue.empty() and reports_queue.empty():
            break

    reports_queue.put(None) # poison pill for the reporter
    logging.info('Done.')

Пояснения к коду

  • Строки 14 - 18. Настройки логгирования. Обратите внимание на сегмент %(threadName)-11s в шаблоне. У каждого Thread-а есть имя, которое и будет сюда выводиться. Это удобно.

  • Строки 20 - 23. Скрипт threading_worker.py расположен в каталоге scripts. Нужно найти каталог data, находящийся на том же уровне, чтобы добраться до файла с названиями стран. Кроме того, необходимо добавить в sys.path пакет crawlersinfo 4.

  • Строки 28 - 47. Здесь объявляются действующие лица нашего сценария:

    • ReverseWorker и Reporter — наследники класса Worker
    • produce_words является простой функцией 5.
  • Строки 50 - 56. Вспомогательная функция start_workers позволяет запустить заданное число Thread-ов с нужными параметрами.

  • Строки 68 - 69. Объявляются две очереди. Вторая из них, reports_queue, является "исходящей" для ReverseWorker-а и "входящей" для Reporter-а.

  • Строки 68 - 74. Компоненты приложения выстраиваются в цепочку: Producer --> Worker --> Reporter. Их запуск происходит строго в обратном порядке. Producer инициирует поток данных; другие участники сценария уже их дожидаются.

  • Строки 76 - 79. Главный поток (MainThread) ждет когда остальные участники завершат свою работу. Признаком этого служат пустые очереди. Предполагается, что одной секунды хватит, чтобы Producer успел что-то положить в tasks_queue. В других приложениях, возможно, понадобится больший интервал ожидания, чтобы выход из программы не произошел сразу после запуска.

  • Строка 81. Reporter все еще продолжает работать. Чтобы остановить его, необходимо положить в очередь, которую он слушает, ядовитую таблетку. В противном случае ничего ужасного не произойдет, поскольку при инициализации Thread-ов устанавливается флаг daemon=True. Но все равно, так правильнее. В "боевых" сценариях аналогичный компонент обычно производит завершающие действия, вроде закрытия файлов и соединений.

Вывод выглядит примерно так:

$ ./scripts/threading_worker.py 3
2015-10-11 14:17:40 - INFO     - MainThread  - Starting 1 Reporter
2015-10-11 14:17:40 - INFO     - Reporter    - Working...
2015-10-11 14:17:40 - INFO     - MainThread  - Starting 3 Workers
2015-10-11 14:17:40 - INFO     - Worker-001  - Working...
2015-10-11 14:17:40 - INFO     - Worker-002  - Working...
2015-10-11 14:17:40 - INFO     - Worker-003  - Working...
2015-10-11 14:17:40 - INFO     - MainThread  - Starting 1 Producer
2015-10-11 14:17:40 - INFO     - Producer    - Opening .../countries.txt...
Afghanistan --> natsinahgfA
Aland Islands --> sdnalsI dnalA
Albania --> ainablA
...
Czech Republic --> cilbupeR hcezC
Democratic Republic of the Congo --> ognoC eht fo cilbupeR citarcomeD
2015-10-11 14:17:40 - DEBUG    - Producer    - Putting poison pill to the queue...
Denmark --> kramneD
Djibouti --> ituobijD
2015-10-11 14:17:40 - DEBUG    - Producer    - Putting poison pill to the queue...
2015-10-11 14:17:40 - DEBUG    - Producer    - Putting poison pill to the queue...
Dominica --> acinimoD
Dominican Republic --> cilbupeR nacinimoD
...
Palestinian Territory --> yrotirreT nainitselaP
2015-10-11 14:17:40 - DEBUG    - Worker-002  - Got poison pill
Panama --> amanaP
2015-10-11 14:17:40 - INFO     - Worker-002  - All tasks done
2015-10-11 14:17:40 - DEBUG    - Worker-001  - Got poison pill
Papua New Guinea --> aeniuG weN aupaP
2015-10-11 14:17:40 - DEBUG    - Worker-003  - Got poison pill
2015-10-11 14:17:40 - INFO     - Worker-003  - All tasks done
2015-10-11 14:17:40 - INFO     - Worker-001  - All tasks done
Paraguay --> yaugaraP
...
Yemen --> nemeY
Zambia --> aibmaZ
Zimbabwe --> ewbabmiZ
Vietnam --> manteiV
Venezuela --> aleuzeneV
2015-10-11 14:17:41 - INFO     - MainThread  - Done.
2015-10-11 14:17:41 - DEBUG    - Reporter    - Got poison pill

Заключение

В представленной программе отсутствуют какие-либо операции, характерные для веб-краулера. Моей целью было представить модель Producer/Consumer схематически, без связи с какой-либо бизнес-логикой. Однако, уже на этом этапе несложно спроецировать основные компоненты на сценарий сбора данных.

  • Producer (функция produce_words) обходит каталог и складывает в tasks_queue адреса страниц.
  • Несколько Worker-ов берут адрес за адресом из tasks_queue, запрашивают страницы, извлекают из них данные и помещают их в очередь reports_queue. В случае сбоя вместо результата в очередь отчетов отправляется сообщение об ошибке.
  • Reporter принимает отчеты, отделяет хорошие результаты от ошибок. Результаты сохраняет, а ошибки обрабатывает тем или иным способом.






  1. Рекомендую включать в отчет также основные параметры исходного задания. Это свяжет задание с результатом, что удобно для мониторинга. Кроме того, позволяет повторно поместить в очередь проваленное задание. 

  2. Не все типы библиотечных очередей поддерживают метод task_done. Чтобы обойти это ограничение, можно обернуть self._inbox.task_done() в блок, обрабатывающий ошибку AttributeError

  3. Питоновские Thread-ы принято ругать. Предвосхищая возражения по поводу их применения, рекомендую прочитать главу "Dealing with the GIL (and How to Stop Worrying About It)" из книги "Python Cookbook", David Beazley and Brian K. Jones, 3 edition, O'Reilly, 2013

  4. См. Структура проекта 

  5. Работа Worker-а невозможна без очереди входящих заданий. У Producer-а в нашем сценарии их нет, он берет названия стран из текстового файла. Зато Worker способен обходиться без очереди результатов, что и происходит в случае с Reporter-ом. Разумеется, можно создать более гибкую структуру классов. 

social

Яндекс.Метрика