Troubleshooting
TaskRegistrationError: ... lacks _task_name
Section titled “TaskRegistrationError: ... lacks _task_name”Error:
TaskRegistrationError: register_scheduled_task: object <function ...> lacks _task_nameCause: A function decorated with @scheduled is missing the required name parameter.
Fix: Always provide a name to @scheduled():
@scheduled(cron="*/5 * * * *", name="my-task") # ✅async def my_task() -> None: ...TaskRegistrationError: ... lacks _cron
Section titled “TaskRegistrationError: ... lacks _cron”Error:
TaskRegistrationError: register_scheduled_task: task 'my-task' lacks _cronCause: A function decorated with @task was passed to register_scheduled_task(), which expects @scheduled-decorated functions.
Fix: Use @scheduled for cron-triggered tasks or register_handler() for on-demand tasks:
@task(name="on-demand") # ✅ use register_handler()@scheduled(cron="0 * * * *") # ✅ use register_scheduled_task()NotImplementedError: Use TaskProvider.enqueue_job() or apply_async(queue, ...)
Section titled “NotImplementedError: Use TaskProvider.enqueue_job() or apply_async(queue, ...)”Error:
NotImplementedError: Use TaskProvider.enqueue_job() or apply_async(queue, ...)Cause: Calling .delay() on a task function. The delay() method is intentionally not implemented.
Fix: Use .apply_async(queue, ...) or TaskProvider.enqueue_job():
# ✅ Correctawait my_task.apply_async(queue, arg1, arg2)
# ✅ Also correctprovider = await container.resolve(TaskProvider)await provider.enqueue_job(my_task.signature(arg1, arg2))QueueFullError: Task queue is full
Section titled “QueueFullError: Task queue is full”Error:
QueueFullError: Task queue is fullCause: The queue has reached its capacity limit and can’t accept new tasks.
Fix: Apply backpressure — retry after a delay or increase queue capacity:
import asyncio
for attempt in range(3): result = await queue.enqueue(task) if result.is_ok(): break await asyncio.sleep(2 ** attempt) # exponential backoffTaskTimeoutError: Task execution exceeded timeout
Section titled “TaskTimeoutError: Task execution exceeded timeout”Error:
TaskTimeoutError: Task execution exceeded timeoutCause: A task ran longer than its configured timeout (default 300s).
Fix: Either increase the timeout for long-running tasks or optimize the handler:
@task(name="heavy-report", timeout=600.0) # 10 minutesasync def generate_report() -> None: ...MemoryTaskQueue warning in production
Section titled “MemoryTaskQueue warning in production”Warning:
MemoryTaskQueue is volatile — all enqueued tasks will be lost on process restart.Cause: Using MemoryTaskQueue in a non-development environment.
Fix: Switch to RedisTaskQueue or RabbitMQTaskQueue for deployments:
from lexigram.tasks.backends.redis import RedisTaskQueue
TasksModule.configure(queue=RedisTaskQueue("redis://production:6379"))TaskDependencyCycleError: Task dependency cycle detected
Section titled “TaskDependencyCycleError: Task dependency cycle detected”Error:
TaskDependencyCycleError: Task dependency cycle detected: A → B → C → ACause: Circular dependencies in scheduled job chains.
Fix: Break the cycle by removing one of the depends_on edges between jobs.
Handler Not Found
Section titled “Handler Not Found”Error: A worker dequeues a job but no handler is registered for the task name.
Cause: The module containing the @task/@scheduled handler was not imported before the provider booted.
Fix: Ensure the handler module is imported during application setup:
import my_app.tasks.handlers # side effect: registers handlers
async with Application.boot(...) as app: ...Debug Tips
Section titled “Debug Tips”- Set
LEX_LOG_LEVEL=debugto see queue operations, handler registrations, and worker state - Check
app.provider.get_worker_stats()to inspect pool health - Check
app.provider.get_scheduled_jobs()to verify scheduler state - Enable
MetricsMiddlewareto collect task duration/retry statistics