Quickstart
Install
Section titled “Install”uv add lexigram-tasksFor Redis-backed queues:
uv add lexigram-tasks[redis]For RabbitMQ-backed queues:
uv add lexigram-tasks[rabbitmq]Minimal Working Example
Section titled “Minimal Working Example”Define a task with the @task decorator and wire it through TasksModule:
import asynciofrom lexigram import Applicationfrom lexigram.contracts.infra.tasks import TaskQueueProtocolfrom lexigram.tasks.module import TasksModulefrom lexigram.tasks.decorators import task
@task(name="greet", max_retries=3)async def greet(name: str) -> str: return f"Hello, {name}!"
async def main(): async with Application.boot( name="my-app", modules=[TasksModule.configure()], ) as app: queue = await app.container.resolve(TaskQueueProtocol) job = await greet.apply_async(queue, "World") print(f"Enqueued: {job.id}")
await asyncio.sleep(0.2) # let the worker pick it up
asyncio.run(main())What Just Happened
Section titled “What Just Happened”| Step | What |
|---|---|
TasksModule.configure() | Created a DynamicModule with a MemoryTaskQueue and one worker |
Application.boot() | Registered TaskQueueProtocol + TaskExecutorProtocol in the container, started the worker pool |
@task(name="greet") | Wrapped greet with .signature(), .s(), .apply_async() methods |
greet.apply_async(queue, ...) | Built a JobProtocol and enqueued it via TaskQueueProtocol.enqueue() |
| Worker | Polled the queue, dispatched the job to the registered handler via the HandlerRegistry |
Next Steps
Section titled “Next Steps”- Guide — concepts, workflows, best practices
- How-Tos — task chaining, scheduling, Redis backends
- Configuration — all config keys and env vars