Changeset 416 in main


Ignore:
Timestamp:
10/26/11 12:32:43 (8 years ago)
Author:
pcosquer
Message:

ensure only one update_index task is running at once

Location:
trunk/openPLM
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/openPLM/etc/default/celeryd

    r411 r416  
    11# Name of nodes to start 
    2 #CELERYD_NODES="w1 w2 w3" 
     2# Only one worker should run the "index" queue 
     3CELERYD_NODES="w1" 
    34 
    45# Where to chdir at start. 
     
    910 
    1011# Extra arguments to celeryd 
    11 CELERYD_OPTS="--time-limit=300 --concurrency=8" 
     12CELERYD_OPTS="-Q index,mails,celery --time-limit=300 --concurrency=8" 
    1213 
    1314# Name of the celery config module. 
  • trunk/openPLM/plmapp/tasks.py

    r410 r416  
    33# by sdcooke 
    44 
     5from functools import wraps 
     6 
    57from django.db.models.loading import get_model 
     8from django.core.cache import cache 
    69 
    710from haystack import site 
     
    912from celery.task import task 
    1013 
     14def synchronized(cls): 
     15    """Class decorator to synchronize execution of a task's run method. 
     16 
     17    This prevents parallel execution of two instances of the same task within 
     18    the same worker. If an instance of the same task is running in the same 
     19    worker, the second invocation calls :meth:`~celery.task.base.Task.retry` 
     20    is called instead of running the task. 
     21 
     22    Note that this applies to the task class, so `@synchronized` should 
     23    appear before `@task` or `@periodic_task` when tasks are defined with 
     24    decorators. 
     25 
     26    .. code-block:: python 
     27 
     28        @synchronized 
     29        @task 
     30        def cleanup_database(**kwargs): 
     31            logger = cleanup_database.get_logger(**kwargs) 
     32            logger.warn("Task running...") 
     33    """ 
     34    from multiprocessing import Lock 
     35    cls.lock = Lock() 
     36    cls.unsynchronized_run = cls.run 
     37    @wraps(cls.unsynchronized_run) 
     38    def wrapper(*args, **kwargs): 
     39        if cls.lock.acquire(False): 
     40            try: 
     41                cls.unsynchronized_run(*args, **kwargs) 
     42            finally: 
     43                cls.lock.release() 
     44        else: 
     45            cls.retry(args=args, kwargs=kwargs) 
     46    cls.run = wrapper 
     47    return cls 
     48 
     49@synchronized 
    1150@task(default_retry_delay = 5 * 60, max_retries = 1) 
    1251def update_index(app_name, model_name, pk, **kwargs): 
    1352    logger = update_index.get_logger(**kwargs) 
    14     try: 
    15         model_class = get_model(app_name, model_name) 
    16         instance = model_class.objects.get(pk=pk) 
    17         search_index = site.get_index(model_class) 
    18         search_index.update_object(instance) 
    19     except Exception, exc: 
    20         logger.error(exc) 
    21         update_index.retry([app_name, model_name, pk], kwargs, exc=exc) 
     53    model_class = get_model(app_name, model_name) 
     54    instance = model_class.objects.get(pk=pk) 
     55    search_index = site.get_index(model_class) 
     56    search_index.update_object(instance) 
    2257 
    2358@task 
  • trunk/openPLM/runserver.sh

    r181 r416  
    11#! /usr/bin/env sh 
     2./manage.py celeryd -Q mails,index,celery -c 3 -l info --settings=my_settings & 
    23python -m smtpd -n -c DebuggingServer localhost:1025 & 
    34./manage.py runserver 
     5exit 0 
  • trunk/openPLM/settings.py

    r410 r416  
    106106AUTH_PROFILE_MODULE = 'plmapp.UserProfile' 
    107107 
     108CELERY_CREATE_MISSING_QUEUES = True 
     109CELERY_ROUTES = { 
     110    "openPLM.plmapp.tasks.update_index": {"queue": "index"}, 
     111    "openPLM.plmapp.mail.do_send_histories_mail" : {"queue" : "mails"}, 
     112    "openPLM.plmapp.mail.do_send_mail" : {"queue" : "mails"}, 
     113} 
     114 
    108115EMAIL_HOST = 'localhost' 
    109116EMAIL_PORT = 1025 
Note: See TracChangeset for help on using the changeset viewer.