Skip to content
GitHub

API Reference

Observes agent execution steps for logging and monitoring.
on_step
async def on_step(
    step: int,
    action: str,
    result: Any
) -> None
on_complete
async def on_complete(
    total_steps: int,
    final_result: Any
) -> None

Generates an execution plan from a goal.
plan
async def plan(
    goal: str,
    context: dict[str, Any]
) -> list[str]

Base class for agent reasoning strategies.

Strategies implement the reasoning loop that drives an agent’s behavior.

execute
async def execute(
    message: str,
    tools: list[ToolProtocol],
    history: list[dict[str, Any]],
    llm: LLMClientProtocol,
    **kwargs: Any
) -> Result[AgentResponse, AgentError]

Execute the reasoning strategy.

Parameters
ParameterTypeDescription
`message`strThe user's input message.
`tools`list[ToolProtocol]Tools available to the agent.
`history`list[dict[str, Any]]Conversation history as ChatMessage objects.
`llm`LLMClientProtocolLLM client implementing LLMClientProtocol. **kwargs: Additional strategy-specific parameters.
Returns
TypeDescription
Result[AgentResponse, AgentError]Ok(AgentResponse) on success, Err(AgentError) on failure.

Abstract base class for class-based agent tools.

Subclass this to create tools with more complex behavior than simple function wrappers.

Example

class OrderLookupTool(AbstractTool):
def __init__(self, order_service: OrderService):
self.order_service = order_service
@property
def name(self) -> str:
return "lookup_order"
@property
def description(self) -> str:
return "Look up an order by its ID"
@property
def parameters_schema(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"order_id": {"type": "string"}
},
"required": ["order_id"]
}
async def execute(self, **kwargs: Any) -> Any:
order_id = kwargs.get("order_id")
return await self.order_service.find(order_id)
class OrderLookupTool(AbstractTool):
def __init__(self, order_service: OrderService):
self.order_service = order_service
@property
def name(self) -> str:
return "lookup_order"
@property
def description(self) -> str:
return "Look up an order by its ID"
@property
def parameters_schema(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"order_id": {"type": "string"}
},
"required": ["order_id"]
}
async def execute(self, **kwargs: Any) -> Any:
order_id = kwargs.get("order_id")
return await self.order_service.find(order_id)
name
property name() -> str

Unique tool identifier. Must be implemented by subclass.

description
property description() -> str

Human-readable description for the LLM. Must be implemented by subclass.

parameters_schema
property parameters_schema() -> dict[str, Any]

JSON Schema describing the tool’s parameters. Must be implemented by subclass.

execute
async def execute(**kwargs: Any) -> Any

Execute the tool with the given arguments. Must be implemented by subclass.


Wraps an ``AgentProtocol`` as a ``ToolProtocol``.

The adapter satisfies the ToolProtocol contract so that any agent can be injected into another agent’s tool list. When execute() is called, the adapter delegates to the provided AgentExecutorProtocol.run() with the given message.

Attributes: name: delegate_to_{agent.name} — uniquely identifies this delegation tool. description: Derived from the wrapped agent’s system prompt, truncated to _DESCRIPTION_MAX_CHARS. parameters_schema: Accepts a single message string.

__init__
def __init__(
    agent: AgentProtocol,
    executor: AgentExecutorProtocol,
    *,
    session_id: str | None = None,
    user_id: str | None = None
) -> None

Initialize the agent-to-tool adapter.

Parameters
ParameterTypeDescription
`agent`AgentProtocolThe agent to expose as a tool.
`executor`AgentExecutorProtocolThe executor used to run the agent.
`session_id`str | NoneOptional session ID to pass through to the executor.
`user_id`str | NoneOptional user ID for governance tracking.
name
property name() -> str

Unique tool identifier derived from the wrapped agent name.

description
property description() -> str

Human-readable description for LLM tool selection.

parameters_schema
property parameters_schema() -> dict[str, Any]

JSON Schema for the delegation tool parameters.

execute
async def execute(**kwargs: Any) -> Any

Execute by delegating to the wrapped agent.

Parameters
ParameterTypeDescription
Returns
TypeDescription
AnyThe agent's response message string on success, or an error description string on failure.

Base class for AI agents.

Agents declare their identity, capabilities (tools), and persona (system prompt). The AgentExecutorImpl uses this to drive the reasoning loop.

An optional memory object can be attached to give the agent access to episodic and semantic memories across conversation turns. When present it is passed through to the reasoning strategy so that relevant context can be retrieved before each LLM call.

Example

from lexigram.ai.agents import AgentBase, tool
@tool
async def lookup_order(order_id: str) -> dict:
"""Look up an order by its ID."""
return {"order_id": order_id, "status": "shipped"}
class OrderAgent(AgentBase):
name = "order_agent"
system_prompt = "You are a helpful order support agent."
@property
def tools(self):
return [lookup_order]
from lexigram.ai.agents import AgentBase, tool
@tool
async def lookup_order(order_id: str) -> dict:
"""Look up an order by its ID."""
return {"order_id": order_id, "status": "shipped"}
class OrderAgent(AgentBase):
name = "order_agent"
system_prompt = "You are a helpful order support agent."
@property
def tools(self):
return [lookup_order]
tools
property tools() -> list[ToolProtocol]

Tools available to this agent. Must be implemented in subclass.

__init__
def __init__(memory: MemoryProtocol | None = None) -> None

Validate agent configuration and store optional memory.

Parameters
ParameterTypeDescription
`memory`MemoryProtocol | NoneOptional conversation memory backend. When provided it is passed to the reasoning strategy so that relevant past context can be retrieved before each LLM call. Defaults to None (no memory, backward compatible).
memory
property memory() -> MemoryProtocol | None

Optional memory backend attached to this agent.

Returns
TypeDescription
MemoryProtocol | NoneThe memory backend instance, or None if no memory is configured.
builder
def builder(
    cls,
    name: str
) -> AgentBuilder

Create a fluent builder for constructing an agent.

Parameters
ParameterTypeDescription
`name`strUnique agent identifier.
Returns
TypeDescription
AgentBuilderAn ``AgentBuilder`` instance pre-configured with the given name.

Fluent builder for creating agents programmatically.

Produces a lightweight agent instance that can be passed straight to AgentExecutorImpl.run().

Example

agent = (
AgentBuilder("order_agent")
.with_system_prompt("You are a helpful order support agent.")
.with_tools(lookup_order, search_products)
.with_strategy("react", max_iterations=10)
.with_memory(memory_backend)
.with_guards(pii_guard, toxicity_guard)
.with_governance(budget_limit=10.0)
.build()
)
agent = (
AgentBuilder("order_agent")
.with_system_prompt("You are a helpful order support agent.")
.with_tools(lookup_order, search_products)
.with_strategy("react", max_iterations=10)
.with_memory(memory_backend)
.with_guards(pii_guard, toxicity_guard)
.with_governance(budget_limit=10.0)
.build()
)
__init__
def __init__(name: str) -> None
with_system_prompt
def with_system_prompt(prompt: str) -> AgentBuilder

Set the agent’s system prompt.

with_tools
def with_tools(*tools: ToolProtocol) -> AgentBuilder

Add tools to the agent.

with_strategy
def with_strategy(
    strategy: str,
    **kwargs: Any
) -> AgentBuilder

Set the reasoning strategy by name.

Parameters
ParameterTypeDescription
`strategy`strStrategy name (e.g. ``"react"``, ``"simple"``). **kwargs: Keyword arguments forwarded to the strategy constructor (e.g. ``max_iterations=10``).
with_memory
def with_memory(memory: MemoryProtocol) -> AgentBuilder

Attach a memory backend to the agent.

Parameters
ParameterTypeDescription
`memory`MemoryProtocolA memory backend instance implementing ``MemoryProtocol``.
with_guards
def with_guards(*guards: ToolProtocol) -> AgentBuilder

Add content-safety guards to the agent.

Parameters
ParameterTypeDescription
with_guard_pipeline
def with_guard_pipeline(pipeline: GuardPipelineProtocol) -> AgentBuilder

Set a pre-built guard pipeline.

Parameters
ParameterTypeDescription
`pipeline`GuardPipelineProtocolA fully configured ``GuardPipelineProtocol``.
with_governance
def with_governance(**kwargs: Any) -> AgentBuilder

Configure governance parameters.

Parameters
ParameterTypeDescription
with_temperature
def with_temperature(temperature: float) -> AgentBuilder

Set the LLM temperature for this agent.

Parameters
ParameterTypeDescription
`temperature`floatSampling temperature (0.0–2.0).
build
def build() -> AgentProtocol

Build and return the agent.

Raises
ExceptionDescription
ValueErrorIf the agent name is empty.

Payload fired when an agent finishes a run (success or error).

Attributes: agent_name: Name of the agent that completed its run.


Configuration for the agent system.

Attributes: max_iterations: Maximum reasoning iterations per execution. default_temperature: Default temperature for LLM calls. default_max_tokens: Default max tokens for LLM responses. tool_max_retries: Maximum retry attempts for transient tool errors. enable_tracing: Enable OpenTelemetry tracing. enable_metrics: Enable Prometheus metrics.

Example

config = AgentConfig(
max_iterations=10,
default_temperature=0.7
)

Runs an agent with full infrastructure integration.

Wraps agent strategy execution with:

  1. Governance — budget/rate limit checks
  2. Memory — conversation history load/save (legacy + working memory)
  3. Metrics — execution duration, tokens, tool calls
  4. Tracing — distributed spans per execution and tool call
  5. Events — domain events for agent lifecycle
  6. Resilience — circuit breakers on tool calls (via ToolRegistry)
  7. Sessions — stateful multi-turn conversation management
  8. Skills — composable skill execution and discovery

Usage

executor = AgentExecutorImpl(llm=llm_client)
result = await executor.run(
agent=my_agent,
message="Where is my order?",
session_id="session-123",
)
executor = AgentExecutorImpl(llm=llm_client)
result = await executor.run(
agent=my_agent,
message="Where is my order?",
session_id="session-123",
)
__init__
def __init__(
    llm: LLMClientProtocol | None = None,
    memory: MemoryProtocol | None = None,
    working_memory: WorkingMemoryProtocol | None = None,
    session_manager: SessionManagerProtocol | None = None,
    skill_executor: SkillExecutorProtocol | None = None,
    skill_registry: SkillRegistryProtocol | None = None,
    observability: AgentObservability | None = None,
    safety: AgentSafetyInfra | None = None,
    governance: AIGovernanceProtocol | None = None,
    guard_pipeline: GuardPipelineProtocol | None = None,
    metrics: AgentMetrics | None = None,
    tracer: AgentTracer | None = None,
    event_bus: EventBusProtocol | None = None
) -> None

Initialize the agent executor.

Parameters
ParameterTypeDescription
`llm`LLMClientProtocol | NoneLLM client for agent reasoning.
`memory`MemoryProtocol | NoneConversation memory for multi-turn sessions (legacy).
`working_memory`WorkingMemoryProtocol | NoneWorking memory for context assembly.
`session_manager`SessionManagerProtocol | NoneSession manager for stateful conversations.
`skill_executor`SkillExecutorProtocol | NoneSkill executor for running skills.
`skill_registry`SkillRegistryProtocol | NoneSkill registry for discovering available skills.
`observability`AgentObservability | NoneComposite for metrics, tracer, and event bus.
`safety`AgentSafetyInfra | NoneComposite for governance and guard pipeline.
`governance`AIGovernanceProtocol | NoneBackward-compatible direct governance dependency.
`guard_pipeline`GuardPipelineProtocol | NoneBackward-compatible direct guard pipeline dependency.
`metrics`AgentMetrics | NoneBackward-compatible direct metrics dependency.
`tracer`AgentTracer | NoneBackward-compatible direct tracer dependency.
`event_bus`EventBusProtocol | NoneBackward-compatible direct event bus dependency.
run
async def run(
    agent: AgentProtocol,
    message: str,
    session_id: str | None = None,
    user_id: str | None = None,
    **kwargs: Any
) -> Result[AgentResponse, AgentError]

Execute an agent with full infrastructure integration.

Parameters
ParameterTypeDescription
`agent`AgentProtocolThe agent to execute.
`message`strUser's input message.
`session_id`str | NoneSession ID for multi-turn memory.
`user_id`str | NoneUser ID for governance tracking. **kwargs: Additional parameters passed to the strategy.
Returns
TypeDescription
Result[AgentResponse, AgentError]``Ok(AgentResponse)`` on success, ``Err(AgentError)`` on failure.
astream
async def astream(
    agent: AgentProtocol,
    message: str,
    session_id: str | None = None,
    user_id: str | None = None,
    **kwargs: Any
)

Stream agent execution events.

Yields AgentEvent objects as the agent executes, enabling real-time monitoring of thoughts, tool calls, and messages.

Parameters
ParameterTypeDescription
`agent`AgentProtocolThe agent to execute.
`message`strUser's input message.
`session_id`str | NoneSession ID for multi-turn memory.
`user_id`str | NoneUser ID for governance tracking. **kwargs: Additional parameters passed to the strategy.
Yields
TypeDescription
AgentEvent objects with type, data, and run_id.

Complete response from an agent execution.

Contains the final message, the full reasoning trace (steps), all tool calls made, token usage, cost, and timing metadata.

Note: ToolCall and ReasoningStep are defined in lexigram-ai-agents and imported here for use in this type’s field annotations.

tool_call_count
property tool_call_count() -> int

Number of tool calls made.

step_count
property step_count() -> int

Number of reasoning steps taken.

successful_tool_calls
property successful_tool_calls() -> list[Any]

Tool calls that completed without error.

failed_tool_calls
property failed_tool_calls() -> list[Any]

Tool calls that failed.

to_dict
def to_dict() -> dict[str, Any]

Serialize to a JSON-compatible dict.


Emitted when an agent run finishes (success or failure).

Consumed by: audit, analytics, cost tracking, safety review.


Payload fired when an agent begins executing a run.

Attributes: agent_name: Name of the agent that started (e.g. "order_agent").


Registry of agent reasoning strategy implementations.

Usage

registry = AgentStrategyRegistry.with_defaults()
strategy = registry.instantiate("react", max_iterations=10)
registry = AgentStrategyRegistry.with_defaults()
strategy = registry.instantiate("react", max_iterations=10)
__init__
def __init__() -> None
with_defaults
def with_defaults(cls) -> AgentStrategyRegistry

Create a registry pre-populated with the built-in strategies.

Returns
TypeDescription
AgentStrategyRegistryA new registry with ``react``, ``plan-execute``, and ``reflexion`` strategies registered.
default_strategies
def default_strategies() -> dict[str, type]

Return built-in strategy key → class mapping.

Returns
TypeDescription
dict[str, type]Dict mapping strategy name to its class.

Payload fired each time an agent dispatches a tool call.

Attributes: agent_name: Name of the agent that issued the tool call. tool_name: Name of the tool that was called.


Agent system module for Lexigram applications.

Provides agent execution, tool registry, and strategy support.

Usage

from lexigram.ai.agents import AgentsModule
from lexigram.ai.agents.config import AgentConfig
@module(
imports=[AgentsModule.configure(AgentConfig(...))]
)
class AppModule(Module):
pass
from lexigram.ai.agents import AgentsModule
from lexigram.ai.agents.config import AgentConfig
@module(
imports=[AgentsModule.configure(AgentConfig(...))]
)
class AppModule(Module):
pass
configure
def configure(
    cls,
    config: AgentConfig | None = None,
    enable_multi_agent: bool = False
) -> DynamicModule

Create an AgentsModule with explicit configuration.

Parameters
ParameterTypeDescription
`config`AgentConfig | NoneAgentConfig or ``None`` for defaults.
`enable_multi_agent`boolEnable multi-agent orchestration support, allowing agents to delegate tasks to peer agents. Defaults to ``False``.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.
stub
def stub(
    cls,
    config: AgentConfig | None = None
) -> DynamicModule

Create an AgentsModule suitable for unit and integration testing.

Uses in-memory or no-op agent implementations with minimal side effects. Multi-agent orchestration is disabled by default to keep the test container lightweight.

Parameters
ParameterTypeDescription
`config`AgentConfig | NoneOptional AgentConfig override. Uses safe test defaults when ``None``.
Returns
TypeDescription
DynamicModuleA DynamicModule descriptor.

Registers agent infrastructure with full Lexigram integration.

Registers:

  • ToolRegistry — tool storage with module visibility
  • AgentExecutorImpl — execution engine with governance, memory, metrics, tracing, and events
  • AgentMetrics — agent metrics collector
  • AgentTracer — agent distributed tracing

Auto-discovers at boot:

  • LLMClientProtocol (required)
  • AIGovernanceManager (optional — budget/rate control)
  • ConversationBuffer (optional — multi-turn memory)
  • MetricsRecorderProtocol (optional — from lexigram-monitor)
  • TracerProtocol (optional — from lexigram-monitor)
  • EventBusProtocol (optional — from lexigram-events)
  • CompiledModuleGraph (optional — for tool visibility)
__init__
def __init__(
    config: AgentConfig | None = None,
    enable_multi_agent: bool = False
) -> None

Initialise the provider.

Parameters
ParameterTypeDescription
`config`AgentConfig | NoneAgent configuration. Defaults to ``AgentConfig()``.
`enable_multi_agent`boolEnable multi-agent orchestration support.
from_config
def from_config(
    cls,
    config: AgentConfig,
    **context: object
) -> AgentsProvider

Create an AgentsProvider from the resolved config.

register
async def register(container: ContainerRegistrarProtocol) -> None

Register agent infrastructure.

boot
async def boot(container: ContainerResolverProtocol) -> None

Boot agent infrastructure — resolve all integrations.

shutdown
async def shutdown() -> None

Shutdown agent infrastructure.

health_check
async def health_check(timeout: float = 5.0) -> HealthCheckResult

Agent system health check.


A crew of agents executing tasks.

Attributes: agents: List of agents participating in this crew. tasks: List of tasks to be executed. process: Execution process (sequential or hierarchical). Defaults to SEQUENTIAL.

builder
def builder(cls) -> CrewBuilder

Create a CrewBuilder for fluent crew construction.

Returns
TypeDescription
CrewBuilderA CrewBuilder instance.

Example

crew = (
Crew.builder()
.add_agent(agent1)
.add_task(task1)
.process(Process.SEQUENTIAL)
.build()
)
crew = (
Crew.builder()
.add_agent(agent1)
.add_task(task1)
.process(Process.SEQUENTIAL)
.build()
)

Fluent builder for creating crews programmatically.

Example

crew = (
CrewBuilder()
.add_agent(researcher)
.add_agent(reporter)
.add_task(CrewTask(name="research", description="Research X"))
.add_task(CrewTask(name="report", description="Write report"))
.process(Process.SEQUENTIAL)
.build()
)
crew = (
CrewBuilder()
.add_agent(researcher)
.add_agent(reporter)
.add_task(CrewTask(name="research", description="Research X"))
.add_task(CrewTask(name="report", description="Write report"))
.process(Process.SEQUENTIAL)
.build()
)
__init__
def __init__() -> None
add_agent
def add_agent(agent: Any) -> Self

Add an agent to the crew.

add_task
def add_task(task: CrewTask) -> Self

Add a task to the crew.

process
def process(process: Process) -> Self

Set the execution process.

build
def build() -> Crew

Build and return the crew.


Result of executing a crew.

Executes crew tasks using the specified process.

Supports sequential (tasks run one after another) and hierarchical (manager assigns tasks to workers) execution modes.

run
async def run(
    crew: Crew,
    input_text: str,
    context: dict[str, Any] | None = None
) -> CrewExecutionResult

Execute the crew with the given input.

Parameters
ParameterTypeDescription
`crew`CrewThe crew to execute.
`input_text`strThe input query or task description.
`context`dict[str, Any] | NoneOptional context dict for passing prior task outputs.
Returns
TypeDescription
CrewExecutionResultCrewExecutionResult containing all task results.

A task to be executed by a crew member.

Attributes: name: Task identifier. description: What this task should accomplish. expected_output: Optional description of expected output format. context_tasks: List of task names whose outputs should be available as context when executing this task.


Plan and Execute reasoning strategy.

Decomposes complex tasks into explicit plans, executes each step sequentially, and synthesizes the results into a final answer.

This strategy is best for:

  • Multi-step research tasks
  • Tasks requiring ordered operations
  • Complex queries that benefit from explicit decomposition

Example

strategy = PlanAndExecuteStrategy(max_steps=8, max_replans=2)
result = await strategy.execute(
message="Compare the revenue of Apple and Google",
tools=[search_tool, calculator_tool],
history=[],
llm=llm_client,
)
strategy = PlanAndExecuteStrategy(max_steps=8, max_replans=2)
result = await strategy.execute(
message="Compare the revenue of Apple and Google",
tools=[search_tool, calculator_tool],
history=[],
llm=llm_client,
)
__init__
def __init__(
    max_steps: int = 10,
    max_replans: int = 2,
    tool_timeout: float = 30.0,
    observation_max_chars: int = 10000,
    llm_timeout: float = 120.0
) -> None

Initialize the Plan and Execute strategy.

Parameters
ParameterTypeDescription
`max_steps`intMaximum number of plan steps.
`max_replans`intMaximum replanning attempts on step failure.
`tool_timeout`floatPer-tool execution timeout in seconds.
`observation_max_chars`intMax characters for tool output before truncation.
`llm_timeout`floatPer-LLM-call timeout in seconds.
execute
async def execute(
    message: str,
    tools: list[ToolProtocol],
    history: list[dict[str, Any]],
    llm: LLMClientProtocol,
    **kwargs: Any
) -> Result[AgentResponse, AgentError]

Execute the Plan-and-Execute reasoning loop.

Parameters
ParameterTypeDescription
`message`strThe user's input message.
`tools`list[ToolProtocol]Tools available to the agent.
`history`list[dict[str, Any]]Conversation history as list of message dicts.
`llm`LLMClientProtocolLLM client implementing ``LLMClientProtocol``. **kwargs: Additional parameters (system_prompt, etc.).
Returns
TypeDescription
Result[AgentResponse, AgentError]``Ok(AgentResponse)`` with the final synthesized answer and full reasoning trace. ``Err(AgentError)`` on failure.

Status of a plan step.

Execution process for crew tasks.

Sequential: Tasks run one after another, later tasks see prior outputs. Hierarchical: Manager LLM assigns tasks to workers.


ReAct (Reason + Act) reasoning strategy.

The ReAct strategy follows a Think → Act → Observe loop:

  1. Think — LLM reasons about the current state.
  2. Act — Parse and execute a tool call from the LLM response.
  3. Observe — Feed tool result back as an observation.
  4. Decide — Check if the LLM signals completion.

This is the default strategy for Lexigram agents.

Example

from lexigram.ai.agents import Agent
from lexigram.ai.agents.strategies import ReActStrategy
agent = Agent(
llm=my_llm_client,
strategy=ReActStrategy(max_iterations=10),
)
response = await agent.run("What's the weather?")
from lexigram.ai.agents import Agent
from lexigram.ai.agents.strategies import ReActStrategy
agent = Agent(
llm=my_llm_client,
strategy=ReActStrategy(max_iterations=10),
)
response = await agent.run("What's the weather?")
__init__
def __init__(
    max_iterations: int = 10,
    tool_timeout: float = 30.0,
    observation_max_chars: int = 10000,
    timeout: float = 120.0,
    tool_max_retries: int = 3
) -> None

Initialise the ReAct strategy.

Parameters
ParameterTypeDescription
`max_iterations`intMaximum number of Think→Act→Observe cycles.
`tool_timeout`floatPer-tool execution timeout in seconds.
`observation_max_chars`intMaximum characters for tool output before truncation.
`timeout`floatPer-LLM-call timeout in seconds.
`tool_max_retries`intRetry attempts for transient tool errors (``ConnectionError``, ``OSError``). Each retry waits with exponential back-off (1s, 2s, 4s, …).
execute
async def execute(
    message: str,
    tools: list[ToolProtocol],
    history: list[dict[str, Any]],
    llm: LLMClientProtocol,
    **kwargs: Any
) -> Result[AgentResponse, AgentError]

Execute the ReAct reasoning loop.

Parameters
ParameterTypeDescription
`message`strThe user's input message.
`tools`list[ToolProtocol]Tools available to the agent.
`history`list[dict[str, Any]]Conversation history as ChatMessage objects.
`llm`LLMClientProtocolLLM client implementing ``LLMClientProtocol``. **kwargs: Additional parameters: - ``system_prompt`` (str): Optional system prompt prefix. - ``memory``: Optional memory backend for context retrieval.
Returns
TypeDescription
Result[AgentResponse, AgentError]``Ok(AgentResponse)`` with the final answer and full reasoning trace. ``Err(AgentError)`` on unrecoverable failure.

A single step in the agent's reasoning process.

Each step captures the agent’s thought, the action it decided to take (if any), the tool call (if any), and the observation from the tool result or LLM response.


Reflexion reasoning strategy with self-critique and iterative refinement.

The Reflexion strategy follows a three-phase loop:

  1. Generate — produce an initial response to the user’s request.
  2. Critique — ask the same LLM to evaluate its own response, identifying errors or improvements.
  3. Refine — generate an improved response based on the critique.

The loop repeats up to max_iterations times, stopping early when the critique declares the response optimal (NO_CHANGES_NEEDED).

Example

from lexigram.ai.agents import Agent
from lexigram.ai.agents.strategies import ReflexionStrategy
agent = Agent(
llm=my_llm_client,
strategy=ReflexionStrategy(max_iterations=3),
)
response = await agent.run("Explain quantum entanglement")
__init__
def __init__(
    max_iterations: int = 3,
    temperature_critique: float = 0.3,
    temperature_refine: float = 0.5
) -> None

Initialise the ReflexionStrategy.

Parameters
ParameterTypeDescription
`max_iterations`intMaximum number of critique–refine cycles (default 3). The strategy stops early when the critique returns ``NO_CHANGES_NEEDED``.
`temperature_critique`floatLLM temperature for the self-critique pass. Lower values yield more deterministic critiques.
`temperature_refine`floatLLM temperature for the refinement pass.
execute
async def execute(
    message: str,
    tools: list[ToolProtocol],
    history: list[dict[str, Any]],
    llm: LLMClientProtocol,
    **kwargs: Any
) -> Result[AgentResponse, AgentError]

Execute the Reflexion reasoning loop.

Parameters
ParameterTypeDescription
`message`strThe user's input message.
`tools`list[ToolProtocol]Tools available to the agent (informational; reflexion does not call tools directly).
`history`list[dict[str, Any]]Conversation history as list of message dicts.
`llm`LLMClientProtocolLLM client implementing LLMClientProtocol. **kwargs: Additional parameters: - ``system_prompt`` (str): Optional system prompt. - ``timeout`` (float): Per-call timeout in seconds.
Returns
TypeDescription
Result[AgentResponse, AgentError]``Ok(AgentResponse)`` with the final refined message and the full reasoning trace (initial draft + critique + final response per iteration). ``Err(AgentError)`` if the LLM call fails or is cancelled.

Supervisor strategy that orchestrates multiple sub-agents.

The supervisor uses an LLM to decide which sub-agent should handle each part of a task. Sub-agents are exposed as tools using AgentAsToolAdapter, allowing the supervisor to delegate via the standard tool-calling mechanism.

This enables hierarchical multi-agent patterns:

  • Customer support routing (classify → specialist agent)
  • Research tasks (delegate to searcher, analyzer, summarizer)
  • Quality assurance (generate → review → revise)

Example

strategy = SupervisorStrategy(
sub_agents={"research": research_agent, "writing": writing_agent},
executor=executor,
)
result = await strategy.execute(
message="Write a report about quantum computing",
tools=[], # supervisor uses sub-agents, not tools
history=[],
llm=llm_client,
)
strategy = SupervisorStrategy(
sub_agents={"research": research_agent, "writing": writing_agent},
executor=executor,
)
result = await strategy.execute(
message="Write a report about quantum computing",
tools=[], # supervisor uses sub-agents, not tools
history=[],
llm=llm_client,
)
__init__
def __init__(
    sub_agents: dict[str, AgentProtocol],
    executor: AgentExecutorProtocol,
    *,
    max_delegations: int = 5,
    llm_timeout: float = 120.0
) -> None

Initialize the supervisor strategy.

Parameters
ParameterTypeDescription
`sub_agents`dict[str, AgentProtocol]Named sub-agents available for delegation.
`executor`AgentExecutorProtocolAgent executor used to run sub-agents.
`max_delegations`intMaximum number of delegation rounds.
`llm_timeout`floatTimeout per LLM call in seconds.
execute
async def execute(
    message: str,
    tools: list[ToolProtocol],
    history: list[dict[str, Any]],
    llm: LLMClientProtocol,
    **kwargs: Any
) -> Result[AgentResponse, AgentError]

Execute the supervisor delegation loop.

Parameters
ParameterTypeDescription
`message`strThe user's input message.
`tools`list[ToolProtocol]Additional tools (merged with agent-as-tool adapters).
`history`list[dict[str, Any]]Conversation history as list of message dicts.
`llm`LLMClientProtocolLLM client implementing ``LLMClientProtocol``. **kwargs: Additional parameters (system_prompt, etc.).
Returns
TypeDescription
Result[AgentResponse, AgentError]``Ok(AgentResponse)`` with the supervisor's final answer and full delegation trace. ``Err(AgentError)`` on failure.

Result of executing a single crew task.

Emitted when a tool call within an agent run completes.

Consumed by: analytics, tool usage monitoring, safety review.


Record of a single tool invocation during agent execution.

Captures the tool name, arguments, result (or error), and timing for observability and debugging.

succeeded
property succeeded() -> bool

Whether the tool call completed without error.


Registry for atomic agent tools with optional module visibility control.

Tools are stateless, single-purpose functions invoked directly by the agent reasoning loop (ReAct, PlanExecute, etc.). They should be fast (<500ms) and have no side effects beyond their stated purpose.

Boundary rule: Tools do NOT invoke skills. Skills may invoke tools.

Extends Registry for unified introspection, lifecycle hooks, and thread-safe storage while implementing ToolRegistryProtocol.

When a CompiledModuleGraph is provided, tool access is checked against the module visibility map before execution.

Example

registry = ToolRegistry()
registry.register(lookup_order, module_class=OrdersModule)
registry.register(process_refund, module_class=PaymentsModule)
# Without visibility — all tools accessible
result = await registry.execute("lookup_order", order_id="123")
# With visibility — only tools visible to the caller's module
registry.set_module_graph(compiled_graph)
registry.set_caller_module(SupportModule)
result = await registry.execute("lookup_order", order_id="123")
registry = ToolRegistry()
registry.register(lookup_order, module_class=OrdersModule)
registry.register(process_refund, module_class=PaymentsModule)
# Without visibility — all tools accessible
result = await registry.execute("lookup_order", order_id="123")
# With visibility — only tools visible to the caller's module
registry.set_module_graph(compiled_graph)
registry.set_caller_module(SupportModule)
result = await registry.execute("lookup_order", order_id="123")
__init__
def __init__() -> None
register
def register(
    tool: ToolProtocol,
    module_class: type | None = None
) -> None

Register a tool.

Parameters
ParameterTypeDescription
`tool`ToolProtocolTool satisfying ``ToolProtocol``.
`module_class`type | NoneOptional module that owns this tool (for visibility enforcement).
Raises
ExceptionDescription
ValueErrorIf a tool with the same name is already registered.
unregister
def unregister(name: str) -> ToolProtocol | None

Remove and return a tool by name.

get
def get(name: str) -> ToolProtocol | None

Get a tool by name, or None if not found.

list_tools
def list_tools() -> list[ToolProtocol]

List all registered tools.

list_tool_names
def list_tool_names() -> list[str]

List all registered tool names.

list_visible_tools
def list_visible_tools() -> list[ToolProtocol]

List tools visible to the current caller module.

If no module graph or caller is set, returns all tools.

list_visible_tool_names
def list_visible_tool_names() -> list[str]

List names of tools visible to the current caller module.

list_tool_schemas
def list_tool_schemas() -> list[dict[str, Any]]

Generate the tool schema list for LLM function calling.

Returns only tools visible to the current caller module.

set_module_graph
def set_module_graph(graph: CompiledModuleGraphProtocol | None) -> None

Set the compiled module graph for visibility enforcement.

Parameters
ParameterTypeDescription
`graph`CompiledModuleGraphProtocol | NoneA ``CompiledModuleGraph`` from the module compiler.
set_caller_module
def set_caller_module(module_class: type | None) -> None

Set the calling module for visibility checks.

Parameters
ParameterTypeDescription
`module_class`type | NoneThe module class of the agent using this registry. ``None`` means standalone (no visibility restrictions).
execute
async def execute(
    name: str,
    **kwargs: Any
) -> Result[Any, ToolError]

Execute a tool by name with visibility and error handling.

Checks module visibility before execution when a module graph is configured.

Parameters
ParameterTypeDescription
`name`strTool name to execute. **kwargs: Arguments to pass to the tool.
Returns
TypeDescription
Result[Any, ToolError]``Ok(result)`` on success, ``Err(ToolError)`` on failure.
clear
def clear() -> None

Remove all registered tools.


strategy
def strategy(name: str) -> Callable[[type], type]
Register a class as an agent execution strategy.
Parameters
ParameterTypeDescription
`name`strUnique strategy identifier (e.g. ``"react"``, ``"plan_execute"``).
Returns
TypeDescription
Callable[[type], type]Class decorator that registers the strategy.

Example

@strategy(name="react")
class ReActStrategy:
async def execute(self, goal: str) -> AgentResponse: ...
@strategy(name="react")
class ReActStrategy:
async def execute(self, goal: str) -> AgentResponse: ...

tool
def tool(
    func: Callable | None = None,
    *,
    name: str | None = None,
    description: str | None = None
) -> Any
Decorator that converts an async function into an agent tool.

Automatically generates JSON schema from type hints for LLM function calling.

Usage

@tool
async def lookup_order(order_id: str) -> dict:
"""Look up an order by its ID."""
return await order_service.find(order_id)
@tool(name="search", description="Search local services")
async def search_services(
query: str,
category: str | None = None,
radius_km: float = 5.0,
) -> list[dict]:
return await search.find(query, category, radius_km)
@tool
async def lookup_order(order_id: str) -> dict:
"""Look up an order by its ID."""
return await order_service.find(order_id)
@tool(name="search", description="Search local services")
async def search_services(
query: str,
category: str | None = None,
radius_km: float = 5.0,
) -> list[dict]:
return await search.find(query, category, radius_km)

Invalid agent configuration.

Raised when an agent is constructed with invalid parameters (no tools, no system prompt, invalid strategy, etc.).

__init__
def __init__(
    message: str = 'Agent configuration error',
    *,
    agent_name: str | None = None,
    **kwargs: Any
) -> None

Base exception for all agent errors.
__init__
def __init__(
    message: str = 'Agent error',
    **kwargs: Any
) -> None

Agent execution failed.

Raised when the agent’s reasoning loop encounters an unrecoverable error (LLM failure, strategy crash, etc.).

__init__
def __init__(
    message: str = 'Agent execution failed',
    *,
    agent_name: str | None = None,
    step_number: int | None = None,
    **kwargs: Any
) -> None

Agent exceeded its AI governance budget.

Raised when token usage or cost exceeds configured limits before the agent completes its task.

__init__
def __init__(
    message: str = 'Budget exceeded',
    *,
    budget_type: str | None = None,
    limit: float | None = None,
    used: float | None = None,
    **kwargs: Any
) -> None

Agent exceeded maximum reasoning iterations.

Raised when the agent reaches max_iterations without producing a final response.

__init__
def __init__(
    message: str = 'Maximum iterations exceeded',
    *,
    max_iterations: int | None = None,
    current_iteration: int | None = None,
    **kwargs: Any
) -> None

Reasoning strategy failed.

Raised when the agent’s strategy encounters an error during reasoning (LLM failure, invalid response, etc.).

__init__
def __init__(
    message: str = 'Strategy execution failed',
    *,
    strategy_name: str | None = None,
    **kwargs: Any
) -> None

Agent does not have access to this tool.

Raised when module visibility controls prevent an agent from accessing a specific tool.

__init__
def __init__(
    message: str = 'Tool access denied',
    *,
    tool_name: str | None = None,
    agent_module: str | None = None,
    tool_module: str | None = None,
    **kwargs: Any
) -> None

Base exception for tool errors.
__init__
def __init__(
    message: str = 'Tool error',
    **kwargs: Any
) -> None

Tool execution failed.

Raised when a tool raises an exception during execution.

__init__
def __init__(
    message: str = 'Tool execution failed',
    *,
    tool_name: str | None = None,
    arguments: dict[str, Any] | None = None,
    **kwargs: Any
) -> None

Tool not found in registry.

Raised when the agent tries to call a tool that is not registered in the tool registry.

__init__
def __init__(
    message: str = 'Tool not found',
    *,
    tool_name: str | None = None,
    available_tools: list[str] | None = None,
    **kwargs: Any
) -> None