Pull to refresh
794.35
OTUS
Цифровые навыки от ведущих экспертов

Celery для новичков

Level of difficultyEasy
Reading time4 min
Views13K

Привет, Хабр!

Celery – это асинхронная распределенная очередь задач, написанная на Python, она предназначена для обработки сообщений в реальном времени при помощи многозадачности. Используя Celery, можно организовать выполнение задач в фоновом режиме, не загружая основной поток приложения.

Используя Celery можно легко организовать выполнение фоновых задач.

Установка осуществляется через pip.

Основные возможности Celery

Определение задач

Создадим экземпляр Celery в файле celery_app.py:

from celery import Celery

app = Celery('example', broker='your_broker_url_here')

Можно определить фоновые задачи с помощью декоратора @app.task. К примеру функция, которая просто складывает два числа:

@app.task
def add(x, y):
    return x + y

add — это асинхронная задача. Можно вызвать её с помощью add.delay(x, y).

Celery предлагает параметры для настройки задач:

ignore_result

Если не нужен результат выполнения задачи, есть параметр ignore_result:

@app.task(ignore_result=True)
def add(x, y):
    return x + y

rate_limit ограничивает скорость выполнения задач. Например, если нужно, чтобы задача add выполнялась чаще, чем 10 раз в минуту, можно настроить rate_limit:

@app.task(rate_limit='10/m')
def add(x, y):
    return x + y

retry

Задача может иногда терпеть неудачу из-за проблем (которые вечно бывают):

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def add(self, x, y):
    try:
        # Попытка выполнения задачи
        return x + y
    except SomeTemporaryException as exc:
        # Запланировать повторное выполнение задачи
        raise self.retry(exc=exc)

Задача попытается выполниться до 3 раз с интервалом в 60 секунд, если возникнет временная ошибка.

Вызов задач

Метод delay() — это самый простой способ вызвать задачу асинхронно. Под капотом он использует apply_async():

from tasks import add

# Вызов задачи асинхронно
result = add.delay(4, 4)

apply_async() предлагает больше гибкости, позволяя указать различные параметры выполнения: время и приоритет выполнения, а также колбэки и errbacks:

result = add.apply_async((4, 4), countdown=10)

add будет запланирована к выполнению через 10 секунд после вызова.

signature() создает подпись задачи, которую можно использовать для создания сложных раб. процессов:

from celery import signature

sig = signature('tasks.add', args=(2, 2), immutable=True)
sig.delay()

chain() позволяет соединить несколько задач в одну последовательность, где результат одной задачи передается в качестве аргумента следующей:

from celery import chain

# (4 + 4) -> (8 * 10)
res = chain(add.s(4, 4), multiply.s(10))()

group() используется для параллельного выполнения набора задач. Он возвращает специальный объект GroupResult, который позволяет отслеживать выполнение группы задач:

from celery import group

# выполняет add(2, 2) и add(4, 4) параллельно
group_result = group(add.s(2, 2), add.s(4, 4))()

chord() — это комбинация group() и chain(), позволяющая выполнить группу задач параллельно и затем вызвать callback-задачу с результатами группы:

from celery import chord

# cначала выполняет add(2, 2) и add(4, 4) параллельно, затем результаты передаются в multiply()
result = chord([add.s(2, 2), add.s(4, 4)])(multiply.s(2))

Различные примеры применения

Масштабируемая система обработки изображений

from celery import Celery
import PIL
from PIL import Image

app = Celery('image_processor', broker='pyamqp://guest@localhost//')

@app.task
def resize_image(image_path, output_path, size):
    with Image.open(image_path) as img:
        img.thumbnail(size, PIL.Image.ANTIALIAS)
        img.save(output_path)

@app.task
def crop_image(image_path, output_path, crop_box):
    with Image.open(image_path) as img:
        cropped_img = img.crop(crop_box)
        cropped_img.save(output_path)

# юзаем
resize_image.delay('path/to/image.jpg', 'path/to/resized_image.jpg', (800, 600))
crop_image.delay('path/to/image.jpg', 'path/to/cropped_image.jpg', (100, 100, 400, 400))

Асинхронная рассылка уведомлений

Celery часто юзают для асинхронной рассылки электронных писем, SMS, или push-уведомлений пользователям. Пример отправки электронных писем с помощью SMTP:

from celery import Celery
import smtplib

app = Celery('notifications', broker='pyamqp://guest@localhost//')

@app.task
def send_email(recipient, subject, body):
    server = smtplib.SMTP('smtp.example.com', 587)
    server.starttls()
    server.login("your_email@example.com", "your_password")
    message = f"Subject: {subject}\n\n{body}"
    server.sendmail("your_email@example.com", recipient, message)
    server.quit()

# пример использования
send_email.delay("user@example.com", "Welcome!", "Thank you for registering with us.")

Асинхронное выполнение длительных задач на примере генерации отчетов

from celery import Celery
import time

app = Celery('reports', broker='pyamqp://guest@localhost//')

@app.task
def generate_report(report_id, parameters):
    # операции по извлечению данных, их обработке и анализе
    time.sleep(60) # имитация длительной операции
    # сохранение или отправка отчета
    return f"Report {report_id} generated with parameters {parameters}"

# использование
generate_report.delay("report_123", {"param1": "value1", "param2": "value2"})

Интеграция с Django

Cоздаем проект django и добавляем приложение:

django-admin startproject myproject
cd myproject
django-admin startapp myapp

Создаем файл celery.py в корневой директории проекта Django (на уровне settings.py):

# myproject/celery.py

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

Добавляем следующие строкиsettings.py, чтобы указать Celery использовать Redis в качестве брокера сообщений:

# myproject/settings.py

CELERY_BROKER_URL = 'redis://localhost:6379/0'

Переходим в приложение, созданное внутри проекта Django, и создайте файл tasks.py. Например, создадим задачу для сложения двух чисел:

# myapp/tasks.py

from celery import shared_task

@shared_task
def add(x, y):
    return x + y

Теперь можно юзать эту задачу асинхронно из моделей или любой другой части Django. Например, добавим вызов задачи:

# myproject/celery.py

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

Для выполнения асинхронных задач необходимо запускаем Celery worker:

celery -A myproject worker --loglevel=info

Это запустит worker, который будет слушать и выполнять асинхронные задачи.

Можно еще подрубить мониторинг задач и воркеров с flower:

celery -A myproject flower

Напомню, что в рамках онлайн-курсов OTUS вы можете изучить самые популярные ЯП, а также зарегистрироваться на ряд бесплатных мероприятий.

Tags:
Hubs:
Total votes 10: ↑6 and ↓4+4
Comments14

Articles

Information

Website
otus.ru
Registered
Founded
Employees
101–200 employees
Location
Россия
Representative
OTUS