2 Мая


2019

Celery, блокировка периодческих задач.

Обычно в асинхронном программировании для предотвращения доступа у параллельных потоков к общим данным используются мьютексы и семафоры.

Для celery есть специальный модуль - celery-once позволяющий блокировать задачи так, чтобы таск выполнял только 1 воркер за промежуток времени. Однако, лично мне, не удалось настроить его на периодические задачи. Для блокировки периодческих задач был использован следующий декоратор (честно взятый со stack overflow)

import functools

from django.core.cache import cache


def single_instance_task(timeout):
    def task_exc(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            lock_id = "celery-single-instance-" + func.__name__
            acquire_lock = lambda: cache.add(lock_id, "true", timeout)
            release_lock = lambda: cache.delete(lock_id)
            if acquire_lock():
                try:
                    func()
                finally:
                    release_lock()
        return wrapper
    return task_exc

Используется он следующим образом:

@periodic_task(bind=True, run_every=timedelta(seconds=5), )
@single_instance_task(5*60)
def send_verify_email():
    print(current_task, 'started')
    sleep(randint(1, 15))
    print(current_task, 'ended')

Тут стоит сразу отметить, что для cache нельзя использовать дефолтную память Django, т.к. у каждого воркера она своя. Я использовал redis. 

Для фласка и других фрейморвков можно использовать свой модуль кеширования.

Flask
Django