FAQ Database Discussion Community


Share a common utility function between Celery tasks

python,django,celery,django-celery,celery-task
I' ve got a bunch of tasks in Celery all connected using a canvas chain. @shared_task(bind=True) def my_task_A(self): try: logger.debug('running task A') do something except Exception: run common cleanup function @shared_task(bind=True) def my_task_B(self): try: logger.debug('running task B') do something else except Exception: run common cleanup function ... So far so...

Send a success signal when the group of tasks in celery is finished

django,signals,task,group,celery
So i have a basic configuration django 1.6 + celery 3.1. Say i have an example task: @app.task def add(x, y): time.sleep(6) return {'result':x + y} And a function that groups and returns job id def nested_add(x,y): grouped_task = group(add.s(x,y) for i in range(0,2)) job = result_array.apply_async() job.save() return job.id...

Python: Mock doesn't work inside celery task [duplicate]

python,django,unit-testing,mocking,celery
This question already has an answer here: Why python mock patch doesn't work? 1 answer I want to use python mock library for testing that my Django application sends email. Test code: # tests.py from django.test import TestCase class MyTestCase(TestCase): @mock.patch('django.core.mail.mail_managers') def test_canceled_wo_claiming(self, mocked_mail_managers): client = Client() client.get('/') print(mocked_mail_managers.called)...

Where is a task running with multiple nodes having python Celery installed?

celery
If I have multiple workers running on different nodes, how can I know a task is assigned to which worker then? e.g. here are two workers 10.0.3.101 and 10.0.3.102; a Redis backend runs on 10.0.3.100; when a task is sent to the task queue to Redis backend, a worker gets...

Celery, find the task caller from the task?

python,celery
Is it possible to lookup what code called (delay(), apply_async(), apply(), etc.) a task from within the task's code? Strings would be fine. Ideally, I would like to get the caller's stack trace.

Celery object editing best-practices

python,celery,background-process
I have a celery task that performs an operation on an objects attributes and saves it. Let's say we're computing a persons bmi. We take a persons height and weight and calculate the bmi, then save it on person.bmi. In Rails's DelayedJob you would just make an instance method: Class...

Celery/RabbitMQ unacked messages blocking queue?

python,rabbitmq,celery,urllib2
I have invoked a task that fetches some information remotely with urllib2 a few thousand times. The tasks are scheduled with a random eta (within a week) so they all don't hit the server at the same time. Sometimes I get a 404, sometimes not. I am handling the error...

Celery, AttributeError: 'module' object has no attribute 'celeryconfig'

python,celery,cron-task,celerybeat,periodic-task
I'm trying to setup a periodic-tasks via Python-Celery. Following is my project structure: proj/ proj/__init__.py proj/celeryconfig.py proj/celery.py proj/tasks.py celery.py from __future__ import absolute_import from celery import Celery # instantiate Celery object app = Celery( include=['proj.tasks']) # Optional configuration, see the application user guide. # import celery config file app.config_from_object('proj.celeryconfig') app.conf.update(...

Celery worker's log contains question marks (???) instead of correct unicode characters

python,logging,unicode,celery,questionmark
I'm using Celery 3.1.18 with Python 2.7.8 on CentOS 6.5. In a Celery task module, I have the following code: # someapp/tasks.py from celery import shared_task from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @shared_task() def foo(): logger.info('Test output: %s', u"测试中") I use the initd script here to run a Celery...

Celery - Only one instance per task/process?

python,celery,celery-task,celerybeat
In the celery docs, section Instantiation (http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-task-classes) the following is stated: A task is not instantiated for every request, but is registered in the task registry as a global instance. This means that the init constructor will only be called once per process, and that the task class is semantically...

How to run a function periodically with Flask and Celery?

python,flask,celery
I have a flask app that roughly looks like this: app = Flask(__name__) @app.route('/',methods=['POST']) def foo(): data = json.loads(request.data) # do some stuff return "OK" Now in addition I would like to run a function every ten seconds from that script. I don't want to use sleep for that. I...

How to invoke Celery task by name

python-2.7,celery
We have a Python application that uses Celery, with RabbitMQ as the Broker. Think of this application as a management application and only puts messages/tasks into the Broker and won't be acting upon them. There will be another application (which may or may not be Python based) which will be...

Django with celery can't get result from backend

python,django,celery,django-celery
My django project is cloned from Celery repo: https://github.com/celery/celery/tree/3.1/examples/django . In settings after const BROKER_URL I added CELERY_RESULT_BACKEND = 'amqp://' I'm trying task called 'add' from the project. Inside worker terminal result is ok but can't get it back. I call command something like this: from demoapp.tasks import add add.delay(2,2)...

Why adding arguments to Celery causing error on tests in Python?

python,celery,python-mock
I'm trying to test Celery application here's my code @celery.task(bind=True, default_retry_delay=30) def convert_video(gif_url, webhook): // doing something awesome return except Exception as exc: raise convert_video.retry(exc=exc) And in my test I have this. server.convert_video.apply(args=('some_gif', 'http://www.company.com?attachment_id=123')).get() After I added bind=True, default_retry_delay=30 I get this error TypeError: convert_video() takes exactly 2 arguments (3...

monitor celery queue pending tasks with or without flower

celery,flower
I am trying to monitor celery queue so that if no of tasks increases in a queue i can chose to spawn more worker. How can i do this with or without Flower(the celery monitoring tool) eg: I can get a list of all the workers like this curl -X...

django celery send multiple emails

python,django,celery
I have a view like" def MyView(request): SendMyMail(args) and my SendMymail is: def SendMymail(args): send_mail(title, content, sender, receiver) Here I want to queue the message sending process if there is so many messages. I have gone through document of celery and understood the base. I have seen in many examples...

Django Celery Directory Structure and Layout

python,django,rabbitmq,celery,django-celery
I have a django project using the following directory structure. project/ account/ models.py views.py blog/ models.py views.py mediakit/ models.py views.py reports/ celery.py <-- new models.py tasks.py <-- new views.py settings/ __init__.py <-- project settings file system/ cron/ mongodb/ redis/ manage.py Here's the contents of celery.py derived from the celery tutorial...

Starting celery worker from multiprocessing

python,flask,multiprocessing,celery,elastic-beanstalk
I'm new to celery. All of the examples I've seen start a celery worker from the command line. e.g: $ celery -A proj worker -l info I'm starting a project on elastic beanstalk and thought it would be nice to have the worker be a subprocess of my web app....

supervisor - how to run multiple commands

python,celery,supervisor
I'm managing a Celery worker that processes queue via Supervisor. Here's my /etc/supervisor/celery.conf: [program:celery] command = /var/worker/venv/bin/celery worker -A a_report_tasks -Q a_report_process --loglevel=INFO directory=/var/worker user=nobody numprocs=1 autostart=true autorestart=true startsecs=10 stopwaitsecs = 60 stdout_logfile=/var/log/celery/worker.log stderr_logfile=/var/log/celery/worker.log killasgroup=true priority=998 How do I add this second command to run? /var/worker/venv/bin/celery worker -A b_report_tasks -Q...

Celery with Amazon SQS and S3 events

amazon-s3,celery,amazon-sqs
I would like to use Celery to consume S3 events as delivered by Amazon on SQS. However, the S3 message format does not match what Celery expects. How can I consume these messages with minimal hackiness? Should I write a custom serializer? Should I give up and make a custom...

Celery 'module' object has no attribute 'app' when using Python 3

python,python-3.x,celery
I am going through Celery tutorial. They are using Python2 and I am trying to implement the same using python3. I have 2 files: celery_proj.py : from celery import Celery app = Celery( 'proj', broker='amqp://', backend='amqp://', include=['proj.tasks']) app.conf.update(Celery_TAST_RESULT_EXPIRES=3600,) if __name__ == '__main__': app.start() and tasks.py : from celery_proj import app...

celery: get function name by task id?

python,celery
I am using celery on_failure handler to logging all failed tasks for debugging and analysis. And I want to know the task name(function name) of the failed task, how can I get that? from celery import Task class DebugTask(Task): abstract = True def after_return(self, *args, **kwargs): print('Task returned: {0!r}'.format(self.request)) def...

What is the best “architecture” for running celery (and celery beat) in production environment (debian server)?

django,debian,celery,virtualenv,celerybeat
I"m trying to figure out the best "architecture" so our django projects hosted on debian server could use celery and celery beat. He are my requirements: Celery workers and celery beats should be able to run after server restart automatically. Using standard Debian packages is preferred. What doesn't have to...

How to use pyramid_celery in pyramid?

python,celery,pyramid
I want to use celery with pyramid. Try to use pyramid_celery package. Аll attempts have failed. My development.ini has: BROKER_URL = amqp://dev:[email protected]:5672//test CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json', 'application/json'] CELERY_RESULT_BACKEND = amqp://dev:[email protected]:5672//test ;CELERY_ACCEPT_CONTENT = json12 CELERY_IMPORTS = celerypythontest.celery_service And when i try to run this command: celery...

Whys is Celery worker using /root as home directory?

python,rabbitmq,celery,supervisord
We are using Celery and RabbitMQ to process jobs. They are both being run by Supervisor so that they can be alive. Some jobs expect a file in the home directory of a user A. So we run the celery worker as user A in supervisor. But when the job...

Celery with Django - AttributeError: 'AsyncResult' object has no attribute 'replace'

python,django,celery
I wrote a celery task that sets some values in my database when I click on a button on my web page. Everything is fine. Now I want to write a more complex task (disambiguation_task) that returns a string to my Django view (1.6.5). The code is: task_id = disambiguation_task.apply_async([json.dumps(json_request)])...

Set delay between tasks in group in Celery

python,celery
I have a python app where user can initiate a certain task. The whole purpose of a task is too execute a given number of POST/GET requests with a particular interval to a given URL. So user gives N - number of requests, V - number of requests per second....

Django/Python process that checks database object status

python,django,python-3.x,celery
I am building a django app that has some functionality to send email/SMS notifications. These notifications should be sent asynchronously/in the background. I have already come across celery and played with it a bit and I think it's totally awesome. Now, the notifications are stored in a model, where one...

ContentDisallowed error about pickle when trying to enforce JSON serializer in Celery

json,serialization,flask,celery,pickle
I have the following in my tasks.py to specify that json should be used as the default serializer for Celery. celery = Celery('app', broker = 'redis://localhost:6379/4') from kombu import serialization serialization.registry._decoders.pop("application/x-python-serialize") celery.conf.update( CELERY_TASK_SERIALIZER = 'json', CELERY_RESULT_BACKEND = 'redis://localhost:6379/4', CELERY_ACCEPT_CONTENT = ['json'], ) Also, while calling a task I specify the...

RabbitMQ Queued messages keep increasing

python,rabbitmq,celery
We have a Windows based Celery/RabbitMQ server that executes long-running python tasks out-of-process for our web application. What this does, for example, is take a CSV file and process each line. For every line it books one or more records in our database. This seems to work fine, I can...

How to start a celery worker that only pushes tasks to the broker but does not consume them?

celery,django-celery
I have the main producer of tasks in a webserver. I do not want the webserver to consume any tasks, so it should only send the tasks to the broker which get consumed by other nodes. Right now I route tasks using the -Q option in the nodes by specifying...

How to make two tasks mutually exclusive in Celery?

python-2.7,celery,celery-task,celerybeat
Is there a way to disallow two different tasks to run simultaneously in Celery? I was thinking about defining a new queue with concurrency level=1, and send those tasks to that queue, but I couldn't find an example. Is that possible? Thanks!...

Restarting a docker that runs supervisord programs keeps pid files and causes error at restart

docker,celery,supervisord,fig
I have a docker that runs django celery worker via supervisord, the program setup is pretty simple [program:celery_priority] command=python manage.py celery worker -E -Q priority --concurrency=2 --loglevel=ERROR directory=/var/lib/app stdout_events_enabled = true stderr_events_enabled = true stopwaitsecs = 600 [program:celery_medium] command=python manage.py celery worker -E -Q medium --concurrency=2 --loglevel=ERROR directory=/var/lib/app stdout_events_enabled =...

Django Celery cannot connect to remote RabbitMQ on EC2

django,amazon-ec2,rabbitmq,celery
I created a rabbitmq cluster on two instances on EC2. My django app uses celery for async tasks which in turn uses RabbitMQ for message queue. Whenever I start celery with the command: python manage.py celery worker --loglevel=INFO OR python manage.py celeryd --loglevel=INFO I keep getting following error message related...

How to composite large images stored on S3 in ImageMagick from EC2 instances?

amazon-ec2,parallel-processing,imagemagick,celery
I have an ongoing list of image processing tasks to do, using ImageMagick to composite large individual graphic files (20MB each). These images are currently stored on S3 (approximately 2.5GB in total). I was thinking to use multiple EC2 instances to process the tasks, composite the images and upload the...

Celery limit number of specific task in queue

python,queue,task,celery,worker
I'm using Celery 3.1.x with 2 tasks. The first task (TaskOne) is enqueued when Celery starts up through the celeryd_after_setup signal: @celeryd_after_setup.connect def celeryd_after_setup(*args, **kwargs): TaskOne().apply_async(countdown=5) When TaskOne is run, it does some calculations and then enqueues TaskTwo. Imagine the following workflow: I start celery, thus the signal is fired...

Celery : CELERYD_CONCURRENCY and number of workers

celery,django-celery,worker
From the other stackoverflow answer, I've tried to limit celery's number of workers After I terminated all the celery worker, I restarted celery with new configuration. CELERYD_CONCURRENCY = 1 (in Django's settings.py) Then I typed following command to check how many celery workers are working. ps auxww | grep 'celery...

PeriodicTask with Django + Celery

python,django,task,celery
I am trying to do a Periodic task in my project but I can't make it work. In my dev I execute celery like this: python manage.py celery worker -l info But when I try: celery -B -l info or python manage.py celery -B -l info I get a Connection...

celerybeat - multiple instances & monitoring

python,python-2.7,celery,celerybeat
I'm having application built using celery and recently we got a requirement to run certain tasks on schedule. I think celerybeat is perfect for this, but I got few questions: Is it possible to run multiple celerybeat instances, so that tasks are not duplicated? How to make sure that celerybeat...

Why are Celery task test result inconsistent?

django,celery,celery-task
I've written two simple integration tests for two Celery tasks but when I run them I get inconsistent results. I can run them one minute and one or both will pass and then run them a couple of seconds and one or both of them will fail. Why are these...

Managing Celery Task Results

python,redis,celery,task-queue
I am pretty new to Celery and I thought I had read somewhere that the task results only stay around for a limited time. However my backend (redis) is getting pretty bloated after running a lot of tasks through it. Is there a way to set a TTL on task...

How does Celery handle task failures within a chain?

python,django,celery,django-celery,celery-task
What happens when a Celery task within a chain fails? When the failed task is retried with success, does it simply resume from where it was within a chain? For example: my_chain = (task1.s() | task2.s() | task3.s()) my_chain.apply_async((**params)) If task2 fails and retried with success, would task3 then be...

Celery can't get result from given task

python,celery
I try to configure celery with rabbitmq. Server works fine, my worker receive task and return succeded result but communication(?) fail. I'm following first steps from celery doc. I started tasks worker and created file tasks.py. My connection: app = Celery('tasks', backend='amqp', broker='amqp://') Logs inside worker (correct): [2015-03-13 21:00:46,146: INFO/MainProcess]...

supervisord always returns exit status 127 at WebFaction

python,celery,supervisord,webfaction
I keep getting the following errors from supervisord at webFaction when tailing the log: INFO exited: my_app (exit status 127; not expected) INFO gave up: my_app entered FATAL state, too many start retries too quickly Here's my supervisord.conf: [unix_http_server] file=/home/btaylordesign/tmp/supervisord.sock [rpcinterface:supervisor] supervisor.rpcinterface_factory=supervisor.rpcinterface:make_main_rpcinterface [supervisorctl] serverurl=unix:///home/btaylordesign/tmp/supervisord.sock [supervisord]...

Deadlock when updating `djcelery_periodictask` table

celery,django-celery,database-deadlocks,celerybeat
I have a two simple periodic celery tasks which run at night, one at 2AM, other at 3AM, and from time time throw following exception about a deadlock on djcelery_periodictask table which I do not quite understand. It seems that one task tries to update djcelery_periodictask while other is still...

mysql command out of sync when executing insert from celery

python,mysql,database,celery
I am running in to the dreaded MySQL Commands out of Sync when using a custom DB library and celery. The library is as follows: import pymysql import pymysql.cursors from furl import furl from flask import current_app class LegacyDB: """Db Legacy Database connectivity library """ def __init__(self,app): with app.app_context(): self.rc...

Python Celery Chain of Tasks on a Single Node

python,celery
I have two celery nodes on 2 machines (n1, n2) and my task enqueue is on another machine (main). The main machine may not know the available node names. My question is whether there is any guarantee that a chain of tasks will run on a single node. res =...

absolute_import not working correctly (in celery project)

python,import,celery
I am trying to setup a simple celery project. As per the official documentation, the layout is as follow: [email protected] ~/dev/debian/debsources/debsources % find new_updater -name "*.py" new_updater/tasks.py new_updater/updater.py new_updater/__init__.py new_updater/celery.py In celery.py, I import celery.Celery this way: from __future__ import absolute_import from celery import Celery In IPython, I can import...

Queues with random GUID being generated in RabbitMQ server

rabbitmq,celery
Queues with a random GUID are being generated comming from exchange 'celeryresults'. This happened when I fired a task from the shell, using delay method, but I forgot to enter parameters of my original function in the arguments list of delay. Error displayed in terminal where I run the celery...

Celery task group not being executed in background and results in exception

python,django,celery
My Celery task isn't executing in the background in my Django 1.7/Python3 project. # settings.py BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULTBACKEND = BROKER_URL CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' CELERY_ALWAYS_EAGER = False I have celery.py in my root app module as such: from __future__ import absolute_import import os import django from celery import Celery from...

Celery worker will not pick up a new task after the current one is finished

python,django,rabbitmq,celery
I have three tasks: @app.task(name='timey') def timey(): print "timey" while True: pass return 1 @app.task(name='endtimey') def endtimey(): for i in range(10): print "ENDTIMEY", time() sleep(3) return 1 @app.task(name='nexttask') def nexttask(n): print "NEXT TASK" return 1 If the only thing I do is chain endtimey and nexttask together - chain(endtimey.s() |...

Celery ImportError: No module named proj

python,django,celery
I'm trying to setup Celery with Django. I have followed the guide: project/project/celery.py: from __future__ import absolute_import import os from celery import Celery from django.conf import settings # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings') app = Celery('proj') # Using a string here means the...

Django: How to trigger events based on datetimes in the database

python,django,database,celery,django-celery
I have a simple Django app with a database which stores a series of messages and datetime at which I want them to printed to screen. Is there a way to have Django call a method which would check to see if any new messages needed printing and, if so,...

How can I install gdbm module on heroku?

python,heroku,celery,gdbm
I'm running a celery worker on heroku and when starting up the worker I always get an error: ImportError: No module named gdbm I've confirmed this by running python on heroku and trying to import gdbm: $ heroku run python Running `python` attached to terminal... up, run.1960 Python 2.7.8 (default,...

Celery worker with Redis broker can't execute Django task

redis,celery,django-celery,celery-task,celerybeat
I'm learning Python(2.7)/Django(1.5) these days via developing my own Reddit clone (on ubuntu 14.04 LTS). I'm trying to incorporate Celery(3.1) with Redis into the mix, using it to periodically run a ranking algo as a task (on my local set up). But unfortunately, I can't get this simple task to...

Celery + Eventlet + non blocking requests

python,celery,python-requests,eventlet,grequests
I am using Python requests in celery workers to make large number of (~10/sec) API calls(includes GET,POST, PUT, DELETE). Each request takes around 5-10s to complete. I tried running celery workers in eventlet pool, with 1000 concurrency. Since requests are blocking process each concurrent connection is waiting on one request....

Why isn't Celery starting up?

django,celery
I have a Django app, which I've configured to use Celery. It works fine using the dev settings from the tutorial. Now I'm trying to get it working with a daemonized server. I have Celery installed in a virtualenv, and am starting it (from the project's root directory) as follows:...

celery worker not working though rabbitmq has queue buildup

python,python-2.7,rabbitmq,celery,digital-ocean
I am getting in touch with celery and I wrote a task by following Tutorial but somehow worker not getting up and I get following log After entering command: celery worker -A tasks -l debug I get a log: Running a worker with superuser privileges when the worker accepts messages...

celery: daemonic processes are not allowed to have children

python,python-2.7,celery,daemon,python-multiprocessing
In Python (2.7) I try to create processes (with multiprocessing) in a celery task (celery 3.1.17) but it gives the error: daemonic processes are not allowed to have children Googling it, I found that most recent versions of billiard fix the "bug" but I have the most recent version (3.3.0.20)...

Celery worker errors using eventlet on Solaris

celery,solaris,eventlet
I'm running a standard celery worker using the eventlet class and concurrency set to 8. These are pretty busy workers when this is happening (but may happen when not busy, it's hard to tell). I know I don't have any leaks in my task, and have run it with max...

CELERY_ALWAYS_EAGER seems to have no effect

django,celery
I have a Django + Celery setup working correctly in production with RabbitMQ. AFAIK, it should be sufficient to set CELERY_ALWAYS_EAGER to True in django settings locally, to avoid setting up RabbitMQ for development. I have done this but when calling .delay() on a task I get: Traceback (most recent...

Retrieve task result by id in Celery

celery
I am trying to retreive the result of a task which has completed. This works from proj.tasks import add res = add.delay(3,4) res.get() 7 res.status 'SUCCESS' res.id '0d4b36e3-a503-45e4-9125-cfec0a7dca30' But I want to run this from another application. So I rerun python shell and try: from proj.tasks import add res =...

Task progress is not updated latest status on Celery+RabbitMQ

python,rabbitmq,celery
I implemented progress feedback of long task with custom states on Celery + RabbitMQ result backend. But the caller can't retrive the latest progess status as I expected. In following code, result.info['step'] always return 0, and then the task will be finished with "result=42". # tasks.py -- celery worker from...

Mocking out a call within a celery task

unit-testing,flask,celery,python-mock
I have a flask app that runs a celery task. I'm trying to mock out a single API call that happens deep within that task. views.py from mypackage.task_module import my_task @app.route('/run_task') def run_task(): task = my_task.delay() return some_response task_module.py from mypackage.some_module import SomeClass @celery.task def my_task(): return SomeClass().some_function() some_module.py from...

Celery + Django best pratice

django,celery
I have been reading about the celery and django in these posts (here and here), and all the logic/tasks works in the celery.py, but in the official documentation they separated in two files: celery.py and tasks.py. So which is the best practice? This affects the performance?

Scaling periodic tasks in celery

celery
We have a 10 queue setup in our celery, a large setup each queue have a group of 5 to 10 task and each queue running on dedicated machine and some on multiple machines for scaling. On the other hand, we have a bunch of periodic tasks, running on a...

Celery KeyError when wrapping app.task function with imported decorator; errors only w/ import

python,celery,celery-task,celeryd
Given the layout: background \ tasks \ __init__.py generic.py helpers.py __init__.py _server.py config.py router.py server.py And launching _server.py with celery -A background._server worker I'm given a KeyError: u'generic.adder' in the Worker when trying to call the generic.adder function with a .delay(..) The adder function: File generic.py from background.server import app...