Celery¶
The logfire.instrument_celery() method will create a span for every task
executed by your Celery workers.
The integration also supports the Celery beat.
Installation¶
Install logfire with the celery extra:
pip install 'logfire[celery]'
uv add 'logfire[celery]'
Distributed Tracing¶
See the distributed tracing guide for more details on how context propagation works.
For distributed tracing to work correctly, you need to call logfire.instrument_celery() in both:
- The worker processes that execute tasks
- The application that enqueues tasks (e.g., your Django or FastAPI web server)
This ensures that trace context is properly propagated from the application that schedules tasks to the workers that execute them, allowing you to see the complete request flow in Logfire.
Celery Worker¶
Info
The broker you use doesn't matter for the Celery instrumentation.
Any broker supported by Celery will work.
For our example, we'll use redis. You can run it with Docker:
docker run --rm -d -p 6379:6379 redis
Below we have a minimal example using Celery. You can run it with celery -A tasks worker --loglevel=info:
from celery import Celery
from celery.signals import worker_init
import logfire
@worker_init.connect() # (1)!
def init_worker(*args, **kwargs):
logfire.configure(service_name='worker') # (2)!
logfire.instrument_celery()
app = Celery('tasks', broker='redis://localhost:6379/0') # (3)!
@app.task
def add(x: int, y: int):
return x + y
add.delay(42, 50) # (4)!
- Celery implements different signals that you can use to run code at specific points in the application lifecycle. You can see more about the Celery signals here.
- Use a
service_nameto identify the service that is sending the spans. - Install
rediswithpip install redis. - Trigger the task synchronously. On your application, you probably want to use
app.send_task("tasks.add", args=[42, 50]). Which will send the task to the broker and return immediately.
Celery Beat¶
As said before, it's also possible that you have periodic tasks scheduled with Celery beat.
Let's add the beat to the previous example:
from celery import Celery
from celery.signals import beat_init, worker_init
import logfire
@worker_init.connect()
def init_worker(*args, **kwargs):
logfire.configure(service_name='worker')
logfire.instrument_celery()
@beat_init.connect() # (1)!
def init_beat(*args, **kwargs):
logfire.configure(service_name='beat') # (2)!
logfire.instrument_celery()
app = Celery('tasks', broker='redis://localhost:6379/0')
app.conf.beat_schedule = { # (3)!
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16),
},
}
@app.task
def add(x: int, y: int):
return x + y
- The
beat_initsignal is emitted when the beat process starts. - Use a different
service_nameto identify the beat process. - Add a task to the beat schedule. See more about the beat schedule here.
The code above will schedule the add task to run every 30 seconds with the arguments 16 and 16.
To run the beat, you can use the following command:
celery -A tasks beat --loglevel=info
The keyword arguments of logfire.instrument_celery() are passed to the
CeleryInstrumentor().instrument() method.