Pull to refresh
822.32
OTUS
Цифровые навыки от ведущих экспертов

Обзор фреймворка Luigi для построения последовательностей выполнения задач

Reading time 7 min
Views 21K
Доброго времени суток! У нас открылось совершенно новое направление обучения — BigData, а это значит, что немного расширяется горизонт материалов, которыми мы будем делиться с вами. Сегодня рассмотрим Luigi, как часть того, что раскрывается на нашем курсе.

Luigi — фреймворк на языке Python для построения сложных последовательностей по выполнению зависимых задач. Довольно большая часть фреймворка направлена на преобразования данных из различных источников (MySql, Mongo, redis, hdfs) и с помощью различных инструментов (от запуска процесса до выполнения задач разных типов на кластере Hadoop). Разработан в компании Spotify и открыт в виде open source инструмента в 2012 году.

Самое главное преимущество фреймворка — возможность выстраивать последовательности зависимых задач. Фреймворк разрешает зависимости, отслеживает граф выполнения, управляет запуском задач, обрабатывает ошибки с возможностью перезапуска нужных задач, распределяет ресурсы рабочих процессов с возможностью параллельной работы независимых частей графа задач.

Для выполнения всех этих задач существуют и другие инструменты. Это Oozie, Pinball, Airflow (находится в статусе инкубации в Apache — проходит различные проверки, недавно вышел обзор на хабре). В данной статье рассмотрим только Luigi.



Установка и документация

Для установки можно воспользоваться командой:

pip install luigi

Документация доступна тут

Задача (Task)

В файле luigi_demo_tasks.py определяем класс, наследуемый от luigi.Task. Добавляем вызов run для возможности запуска из консоли.

from luigi import Task, run

class MyTask(Task):
   pass


if __name__ == '__main__':
   run()

Запускаем. Дополнительно указываем опцию --local-scheduler, чтобы пока что не обращаться к центральному планировщику задач.

python -m luigi_demo_tasks MyTask --local-scheduler

Примечание. В документации указан другой способ запуска без вызова run и с добавлением директории в PYTHONPATH.

Видим следующий результат:

DEBUG: Checking if MyTask() is complete
/usr/local/lib/python3.4/dist-packages/luigi/worker.py:334: UserWarning: Task MyTask() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   MyTask__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 5369] Worker Worker(salt=920153035, workers=1, host=your_host, username=username, pid=5369) running   MyTask()
INFO: [pid 5369] Worker Worker(salt=920153035, workers=1, host=your_host username=username, pid=5369) done      MyTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   MyTask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=920153035, workers=1, host=your_host, username=username, pid=5369) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 MyTask()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

В сообщениях видим, что задача MyTask ставится на выполнение и успешно выполняется. Повторный запуск дает точно такой же результат.

Сделаем теперь так, чтобы MyTask выполнял некоторую работу. Для этого переопределим метод run из базового класса:

from luigi import Task, run

class MyTask(Task):
   def run(self):
       print("Hello world!")


if __name__ == '__main__':
   run()

В информации о выполнении задачи увидим следующее:

INFO: [pid 7448] Worker Worker(salt=857719525, workers=1, host=your_host, username=username, pid=7448) running   MyTask()
Hello world!
INFO: [pid 7448] Worker Worker(salt=857719525, workers=1, host=your_host, username=username, pid=7448) done      MyTask()

Гарантия однократного выполнения задачи

Часто бывает так, что необходимо выполнить некую задачу единожды. Например из-за того, что её выполнение является ресурсоёмким. В Luigi задача считается сделанной, если сгенерирован некий объект (файл на машине, файл в hdfs, артефакт в MySql, таблица в Hive и другие), и можно проверить его существование. Для указания объекта необходимо переопределить в задаче метод output и в нем вернуть любого наследника или наследников класса Target. Для примера будем использовать LocalTarget — файл в локальной файловой системе.

from luigi import Task, run, LocalTarget

class MyTask(Task):

   filename = "hello_file.txt"

   def run(self):
       with open(self.filename, 'w') as f:
           f.write("Hello world!")

   def output(self):
       return LocalTarget(self.filename)


if __name__ == '__main__':
   run()

Первый запуск задачи генерирует файл hello_file.txt. Повторный запуск задачи сообщает нам, что все задачи выполнены.

Зависимые задачи

В luigi задачи могут зависеть от других задач. Для указания зависимости от другой задачи необходимо переопределить метод requires. В нем вернуть объект класса любой другой задачи. Определим две зависимые задачи, каждая из которых пишет файл.

from luigi import Task, run, LocalTarget

class MyTaskFirst(Task):

   filename = "first.txt"

   def run(self):
       with open(self.filename, 'w') as f:
           f.write("first!")

   def output(self):
       return LocalTarget(self.filename)


class MyTaskSecond(Task):

   filename = "second.txt"

   def run(self):
       with open(self.filename, 'w') as f:
           f.write("second!")

   def requires(self):
       return MyTaskFirst()

   def output(self):
       return LocalTarget(self.filename)


if __name__ == '__main__':
   run()

Запуск задачи немного изменился. Указываем для запуска самую последнюю задачу, фреймворк сам определит и выполнит все зависимости.

python -m luigi_demo_tasks MyTaskSecond --local-scheduler

Внешние зависимости

Иногда для выполнения задачи необходимы данные, генерируемые внешними системами. Так как в зависимостях в методе requires можно указывать только другие задачи, то нам понадобится задача-обертка для внешних данных. Для примера рассмотрим задачу подсчета частоты каждого символа в файле hello_file.txt:

from collections import defaultdict

from luigi import Task, run, LocalTarget, ExternalTask


class ExternalData(ExternalTask):
   def output(self):
       return LocalTarget("hello_file.txt")


class TaskWithExternalData(Task):

   filename = "char_counts.txt"

   def run(self):
       frequencies = defaultdict(int)

       with open(self.requires().output().path) as f_in:
           for line in f_in:
               for c in line:
                   frequencies[c] += 1

       with open(self.filename, 'w') as f_out:
           for c, count in frequencies.items():
               f_out.write('{}\t{}\n'.format(c, count))

   def requires(self):
       return ExternalData()

   def output(self):
       return LocalTarget(self.filename)


if __name__ == '__main__':
   run()

Планировщик

Для централизованного выполнения задач можно использовать центральный планировщик. Его основные задачи: обеспечить отсутствие одновременного выполнения одинаковых задач, предоставить визуализацию всех запущенных задач с их зависимостями.

Из документации запуск планировщика:

$ luigid --background --pidfile <PATH_TO_PIDFILE> --logdir <PATH_TO_LOGDIR> --state-path <PATH_TO_STATEFILE>

Адрес планировщика по умолчанию:
localhost:8082/

Запустим предыдущую задачу с использованием планировщика, предварительно удалив файл char_counts.txt:

python -m luigi_demo_tasks TaskWithExternalData

В планировщике увидим обе задачи:



А так же граф зависимостей:



Параллельный запуск задач

Реализуем несколько зависимых задач, зависимость оформим в виде песочных часов: корневая задача типа 1 зависит от десяти одинаковых задач типа 2. Эти 10 задач типа 2 зависят от одной типа 3. она, в свою очередь, зависит от 10 задач типа 4, и все эти 10 зависят от одной задачи типа 5.

Каждая задача пишет в результате работы файл со своим именем и номером, а так же спит 10 секунд. Это нужно для того, чтобы в планировщике можно было проследить порядок выполнения задач. Обратите внимание на то, что в задачу можно передать параметр. Это позволяет запускать разные по сути задачи с одинаковым кодом.

Для реализации будем использовать наследование, так как это позволит сократить код. Запустим одновременно 5 процессов указав опцию --workers=5

python -m luigi_demo_tasks Task1 --Task1-task-index=0 --workers=5

В планировщике обновляя страницу увидим следующую последовательность:










Одновременно выполняется не более пяти задач, а иногда только одна, так как от нее зависят все остальные. При этом выделенные воркеры простаивают.

Запуск задач

Для запуска задач необходимо использовать какой-либо внешний планировщик, например cron. Соответственно необходимо самостоятельно настраивать получение актуального кода для запуска, логирование и конфигурирование всех задач.

Дополнительные возможности

В случае возникновения ошибок в работе luigi может отправить email.

Для каждого набора задач можно указать внешний файл с настройками, настроив перед запуском переменную LUIGI_CONFIG_PATH.

Каждая задача может быть запущена с некоторым приоритетом, для указания приоритета необходимо в классе указать поле priority.

Реализовано довольно много классов задач, связанных с типичными примерами обработки данных на кластере — hadoop streaming задача на Python, hadoop jar задача, spark задача и другие. При этом часто они требуют существенной доработки.

Возможно выполнение любой задачи в виде запуска консольной команды с отслеживанием процесса выполнения.

Код самой библиотеки чаще всего довольно прост. Для понимания того, как выстроить зависимые задачи, достаточно посмотреть исходный код базовых классов в luigi. У них довольно простой интерфейс и неплохая документация.

Разработан крупной компанией. На данный момент стабильно есть несколько коммитов в мастер каждую неделю. Скорее всего будет и дальше поддерживаться.

Недостатки

Проверка того, выполнена ли задача, происходит только во время построения графа зависимостей. Из-за этого, если запускать выполнение набора задач чаще, чем он весь успевает выполниться, может возникнуть ситуация, когда планировщик запускает две одинаковые задачи.

Нет встроенного планировщика.

Нет способа получить метаинформацию о выполняемых задачах. Нельзя без вспомогательных средств получить документацию по запускаемым задачам, процессам обработки данных. Нет способа, например, получить общий список задач, которые зависят от данной задачи.

Веб-интерфейс планировщика полезен только для того, чтобы увидеть, почему тот или иной набор задач не выполняется. Обычно можно посмотреть на граф зависимостей и увидеть, каких именно данных не хватает.

Не совсем очевидна настройка логирования выполнения задач.

Довольно часто встречается неожиданное поведение.

Поддержка и развитие менее активное, чем, например у Airflow. Для сравнения в luigi и в airflow

Вывод

Luigi хорошо подходит для построения процессов обработки данных. Однако прежде, чем начинать использовать эту библиотеку, есть смысл попробовать реализовать несколько типичных задач на разных фреймворках и выбрать такой, который лучше подойдет именно для ваших задач.

THE END

Как всегда рады мнениям, вопросам и тапкам.
Tags:
Hubs:
+8
Comments 4
Comments Comments 4

Articles

Information

Website
otus.ru
Registered
Founded
Employees
101–200 employees
Location
Россия
Representative
OTUS