Pull to refresh

Асинхронный ETL-процесс на Python

Level of difficultyMedium
Reading time5 min
Views6.7K

Продолжаю цикл статей по разработке ETL-процессов на Python. На этот раз мы преобразуем синхронный etl-процесс из статьи Пишем ETL-процесс на Python в асинхронный.

Напомню, что нас интересует ETL-процесс (extract, transform, load) реализованный через паттерн “Цепочка обязанностей”. Мы разработаем в качестве примера три обработчика, которые будут передавать данные последовательно от одного обработчика к другому. Каждый последующий обработчик решает, может ли он обработать запрос сам и стоит ли передавать запрос дальше по цепи.

Шаг 1 - дополняем синхронный код async/await

Обратите внимание - всё описанное ниде представляет собой переработку кода из статьи и Пишем ETL-процесс на Python.

Для понимания кода необходимо знание составляющих асинхронного программирования в Python.

На этом этапе берем в качестве исходного кода репозиторий из предыдущей статьи и делаем из функций корутины с помощью async. Меняем метод генератора .send() на его асинхронный аналог .asend() и перед вызовом корутин подставляем await.

Строго говоря, первая функция extract у нас станет асинхронной, а все последующие вложенными асинхронными генераторами.

Параллельно меняем синхронные зависимости на асинхронные. В нашем случае psycopg2 меняем на asyncpg.

Постановка задачи

Напомню постановку вымышленной задачки, главное в которой “погонять” данные из функции в функцию.

В базе данных есть таблица, содержащая целые числа. ETL-процесс должен пройтись по всем записям таблицы, возвести каждое число в квадрат и отобразить в консоли. Для каждого четного числа вывести информационное сообщение "the square of an even number". Если число из базы данных равно 3, то прерываем обработку и переходим к следующему числу.

Первая функция

Задача первого обработчика из “цепочки обязанностей” в нашем вымышленном примере сделать sql-запрос к таблице, содержащей числовые строки и по одной передать их в генератор:

async def extract(batch: AsyncGenerator) -> None:

  conn = await asyncpg.connect(
        database="demo", user="sergei", password="sergei", host="localhost"
    )

    stmt = await conn.prepare(SQL)
    async with conn.transaction():
        async for record in stmt.cursor():
            await batch.asend(
                record
            )  # следим за тем, чтобы аргументом был итерируемый объект

    await conn.close()

Вторая (и все промежуточные функции)

У всех промежуточных функций “цепочки обязанностей” три задачи - получить объект из очереди, обработать его и передать в следующую очередь. В продолжении нашего вымышленного примера:

async def transform(batch: AsyncGenerator) -> AsyncGenerator:

    while record := (yield):

        new_number = record["number"] ** 2
        if record["number"] % 2 == 0:
            foo = "an even number"
        elif record["number"] == 3:
            print("skip load stage")
            return
        else:
            foo = 0
        await batch.asend((new_number, foo))

Ветками if/elif/else показано, что можно управлять наборами данных, которые будут направлены на следующий этап. А также через continue можно вообще прервать выполнение цепочки обязанностей.

Заключительная функция

Технически это функция из предыдущего раздела только без инструкции по отправке объектов в следующую очередь. Завершаем нашу вымышленную цепочку:

async def load() -> AsyncGenerator:
    await anyio.sleep(0)

    while subject := (yield):
        match subject:
            case (int(number), str(bar)):
                print("the square of", bar, number)
            case (int(number), int(bar)):
                print(number)
            case _:
                raise SyntaxError(f"Unknown structure of {subject=}")

Шаг 2 - запускаем корутины ETL-процесса

Вызов корутин будет выглядеть интереснее по сравнению с вызовом функций синхронного ETL-процесса. Обратите внимание в четырех строках мы формируем цепочку запуска вложенных асинхронных генераторов. А бизнес-логику запускаем только в последней:

        unloads = load()
        anext(unloads)
        multiplication = transform(unloads, tg)
        anext(multiplication)
        await extract(multiplication, tg)

Шаг 3 - убираем копипасту при вызове корутин

Давайте напишем декоратор, чтобы избавиться от повторяющегося вызова anext():

def coroutine(func):
    @wraps(func)
    async def inner(*args: tuple[Any, ...], **kwargs: dict[str, Any]) -> AsyncGenerator:
        fn: AsyncGenerator = func(*args, **kwargs)
        await anext(fn)
        return fn

    return inner

Как видите - в синтаксис вызова anext добавилась управляющая конструкция await. Соответственно пришлось изменить и схему вызова корутин:

unloads = await etl.load()
multiplication = await etl.transform(unloads)
await etl.extract(multiplication)

Применяем декоратор:

@coroutine
async def transform(batch: AsyncGenerator, task_group: TaskGroup) -> AsyncGenerator:
    ...


@coroutine
async def load(task_group: TaskGroup) -> AsyncGenerator:
    ...

Теперь все асинхронные функции выполняются (если дебажить пошагово) точно также, как и в их синхронном аналоге из предыдущей статьи.

Вопрос - и чего мы добились?

Формально мы получили асинхронный код, который работает точно также, как и синхронный. Любой запуск с "секундомером" скорее покажет даже увеличившееся время выполнения логики.

Вот тут и наступает время применить асинхронную магию.

Шаг 4 - учимся запускать задачи конкурентно

Для технической реализации мы воспользуемся пакетом AnyIO. Мне нравится эта библиотка умением создавать объект группы задач TaskGroup, который я могу через параметры асинхронных функций протащить практически в любую корутину проекта.

Группа задач - это асинхронный контекстный менеджер, который следит за тем, чтобы все его дочерние задачи были завершены тем или иным способом после выхода из контекстного блока. Если дочерняя задача или код во вложенном контекстном блоке вызывает исключение, все дочерние задачи отменяются. В противном случае диспетчер контекста просто ожидает завершения всех дочерних задач, прежде чем продолжить.

Обработка задач в AnyIO в общих чертах соответствует модели trio.

Из контекстного менеджера получаем объект TaskGroup и передаем его в асинхронные генераторы, в которых будем запускать код конкурентно:

    async with create_task_group() as tg:
        unloads = await etl.load(tg)
        multiplication = await etl.transform(unloads, tg)
        await etl.extract(multiplication)

Вызов batch.asend() не является функцией, которую можно запустить в объекте TaskGroup, поэтому выделим логику, которую будем запускать конкурентно в отдельные функции. Для однообразия я назвал функции внутри асинхронных генераторов одинаково - process. Чтобы убедится в том, что всё это работает я добавил дополнительный принт и задержку в 2 секунды в генератор transform.

@coroutine
async def transform(batch: AsyncGenerator, task_group: TaskGroup) -> AsyncGenerator:
    async def process(record):
        await anyio.sleep(2)   # "проверочная" задержка
        new_number = record["number"] ** 2
        if record["number"] % 2 == 0:
            foo = "an even number"
        elif record["number"] == 3:
            print("skip load stage")
            return
        else:
            foo = 0
        await batch.asend((new_number, foo))

    while record := (yield):
        print(record)  # дополнительный принт
        task_group.start_soon(process, record)


@coroutine
async def load(task_group: TaskGroup) -> AsyncGenerator:
    await anyio.sleep(0)

    async def process(subject: Tuple) -> None:
        match subject:
            case (int(number), str(bar)):
                print("the square of", bar, number)
            case (int(number), int(bar)):
                print(number)
            case _:
                raise SyntaxError(f"Unknown structure of {subject=}")

    while subject := (yield):
        task_group.start_soon(process, subject)

Как итог при запуске скрипта мы видим сначала результат выполнения корутины extract. А после задержки в две секунды начинают отрабатываться остальные шаги ETL-процесса:

Делаем выводы

Асинхронный python это "сложный" python. Не всякая попытка преобразовать синхронный код в асинхронный может увенчаться успехом. Хотя бы по причине того, что не у всякой синхронной библиотеки есть её асинхронный аналог. И структурно асинхронные проекты другие.

Желаю удачи в освоении асинхронного питона и с удовольствием отвечу на ваши вопросы.

Репозиторий с учебным проектом доступен по ссылке https://github.com/s-klimov/etl-template/tree/04-async

Tags:
Hubs:
Total votes 10: ↑9 and ↓1+9
Comments6

Articles