class Agent(Module, metaclass=AutoParams):
"""Agent is a Module type that uses language models to solve tasks.
An Agent can perform actions in an environment using tools calls.
For an Agent, a tool is any callable object.
An Agent can handle multimodal inputs and outputs.
"""
# Configure AutoParams to use docstring as 'description' parameter
_autoparams_use_docstring_for = "description"
# Configure AutoParams to use class name as 'name' parameter
_autoparams_use_classname_for = "name"
_supported_outputs: List[str] = [
"reasoning_structured",
"reasoning_text_generation",
"structured",
"text_generation",
"audio_generation",
"audio_text_generation",
"tool_responses",
]
def __init__( # noqa: C901
self,
name: str,
model: Union[ChatCompletionModel, ModelGateway, LM],
*,
system_message: Optional[str] = None,
instructions: Optional[str] = None,
expected_output: Optional[str] = None,
examples: Optional[Union[str, List[Union[Example, Mapping[str, Any]]]]] = None,
system_extra_message: Optional[str] = None,
guardrails: Optional[Dict[str, Callable]] = None,
message_fields: Optional[Dict[str, Any]] = None,
config: Optional[Dict[str, Any]] = None,
templates: Optional[Dict[str, str]] = None,
context_cache: Optional[str] = None,
prefilling: Optional[str] = None,
generation_schema: Optional[msgspec.Struct] = None,
typed_parser: Optional[str] = None,
response_mode: Optional[str] = None,
tools: Optional[List[Callable]] = None,
mcp_servers: Optional[List[Mapping[str, Any]]] = None,
signature: Optional[Union[str, Signature]] = None,
description: Optional[str] = None,
annotations: Optional[Mapping[str, type]] = None,
):
"""Initialize the Agent module.
Args:
name:
Agent name in snake case format.
model:
Chat Completation Model client.
system_message:
The Agent behaviour.
instructions:
What the Agent should do.
expected_output:
What the response should be like.
examples:
Examples of inputs, reasoning and outputs.
system_extra_message:
An extra message in system prompt.
guardrails:
Dictionary mapping guardrail types to callables.
Valid keys: "input", "output"
!!! example
guardrails={"input": input_checker, "output": output_checker}
message_fields:
Dictionary mapping Message field names to their paths in the Message object.
Valid keys: "task_inputs", "task_multimodal_inputs", "messages",
"context_inputs", "model_preference", "vars"
!!! example
message_fields={
"task_inputs": "input.user",
"task_multimodal_inputs": {"audio": "audio.user"},
"messages": "messages.history",
"context_inputs": "context.data",
"model_preference": "model.preference",
"vars": "vars.data"
}
Field descriptions:
- task_inputs: Field path for task input (str, dict, or tuple)
- task_multimodal_inputs: Map datatype (image, video, audio, file)
to field paths
- messages: Field path for list of chats in ChatML format
- context_inputs: Field path for context (str or list of str)
- model_preference: Field path for model preference (str, only valid
with ModelGateway)
- vars: Field path for inputs to templates and tools (str)
config:
Dictionary with configuration options.
Valid keys: "verbose", "return_messages", "tool_choice",
"stream", "image_block_kwargs", "video_block_kwargs", "include_date"
!!! example
config={
"verbose": True,
"return_messages": False,
"tool_choice": "auto",
"stream": False,
"image_block_kwargs": {"detail": "high"},
"video_block_kwargs": {"format": "mp4"},
"include_date": False
}
Configuration options:
- verbose: Print model output and tool calls to console (bool)
- return_messages: Return dict with messages and response (bool)
- tool_choice: Control tool selection ("auto", "required", or function name)
- stream: Transmit response on-the-fly (bool)
- image_block_kwargs: Dict of kwargs to pass to ChatBlock.image
(e.g., {"detail": "high"})
- video_block_kwargs: Dict of kwargs to pass to ChatBlock.video
(e.g., {"format": "mp4"})
- include_date: Include current date with weekday in system prompt
(bool). Format: "Weekday, Month DD, YYYY"
templates:
Dictionary mapping template types to Jinja template strings.
Valid keys: "task", "response", "context", "system_prompt"
!!! example
templates={
"task": "Who was {{person}}?",
"response": "{{final_answer}}",
"context": "Context: {{context}}",
"system_prompt": "Custom system prompt: ..."
}
Template descriptions:
- task: Formats the task/prompt sent to the model
- response: Formats the model's response
- context: Formats context_inputs (does NOT apply to context_cache)
- system_prompt: Overrides the default system prompt
template. If not provided, uses SYSTEM_PROMPT_TEMPLATE.
Available variables: system_message, instructions,
expected_output, examples, system_extra_message,
current_date (if include_date=True)
context_cache:
A fixed context.
prefilling:
Forces an initial message from the model. From that message it
will continue its response from there.
generation_schema:
Schema that defines how the output should be structured.
typed_parser:
Converts the model raw output into a typed-dict. Supported parser:
`typed_xml`.
response_mode:
What the response should be.
* `plain_response` (default): Returns the final agent response directly.
* other: Write on field in Message object.
tools:
A list of callable objects.
mcp_servers:
List of MCP (Model Context Protocol) server configurations.
Each config should contain:
- name: Namespace for tools from this server
- transport: "stdio" or "http"
- For stdio: command, args, cwd, env
- For http: base_url, headers
- Optional: include_tools, exclude_tools, tool_config
!!! example
mcp_servers=[{
"name": "fs",
"transport": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem"],
"include_tools": ["read_file", "write_file"],
"tool_config": {"read_file": {"inject_vars": ["context"]}}
}]
signature:
A DSPy-based signature. A signature creates a task_template,
a generation_schema, instructions and examples (both if passed).
Can be combined with standard generation_schemas like `ReAct` and
`ChainOfThought`. Can also be combined with `typed_parser`.
description:
The Agent description. It's useful when using an agent-as-a-tool.
annotations
Define the input and output annotations to use the agent-as-a-function.
"""
if annotations is None:
annotations = {"message": str, "return": str}
# Validate that signature and custom annotations are not both provided
if signature is not None and annotations != {"message": str, "return": str}:
raise ValueError(
"Cannot specify both 'signature' and custom 'annotations'. "
"When using a signature, annotations are generated automatically "
"from the signature inputs. Remove the 'annotations' parameter."
)
super().__init__()
self.set_name(name)
self.set_description(description)
# Only set annotations if signature is not provided
# (signature will set annotations automatically in _set_signature)
if signature is None:
self.set_annotations(annotations)
else:
# Set default temporarily, will be overridden by _set_signature
self.set_annotations({"message": str, "return": str})
self._set_config(config)
stream = config.get("stream", False) if config else False
if stream is True:
if generation_schema is not None:
raise ValueError("`generation_schema` is not `stream=True` compatible")
if guardrails is not None and "output" in guardrails:
raise ValueError(
"`guardrails['output']` is not `stream=True` compatible"
)
if templates is not None and templates.get("response") is not None:
raise ValueError(
"`templates['response']` is not `stream=True` compatible"
)
if typed_parser is not None:
raise ValueError("`typed_parser` is not `stream=True` compatible")
self._set_context_cache(context_cache)
self._set_guardrails(guardrails)
self._set_message_fields(message_fields)
self._set_model(model)
self._set_prefilling(prefilling)
self._set_system_extra_message(system_extra_message)
self._set_response_mode(response_mode)
self._set_templates(templates)
self._set_tools(tools, mcp_servers)
if signature is not None:
signature_params = dotdict(
signature=signature,
examples=examples,
instructions=instructions,
system_message=system_message,
typed_parser=typed_parser,
)
if generation_schema is not None:
signature_params.generation_schema = generation_schema
self._set_signature(**signature_params)
else:
self._set_typed_parser(typed_parser)
self._set_examples(examples)
self._set_generation_schema(generation_schema)
self._set_expected_output(expected_output)
self._set_instructions(instructions)
self._set_system_message(system_message)
def forward(
self, message: Optional[Union[str, Mapping[str, Any], Message]] = None, **kwargs
) -> Union[str, Mapping[str, None], ModelStreamResponse, Message]:
"""Execute the agent with the given message.
Args:
message: The input message, which can be:
- str: Direct task input (used as task_inputs)
- Message: Message object with fields mapped via message_fields.
Requires message_fields configuration, e.g.:
message_fields={"task_inputs": "input.user"}
- dict: Task inputs as a dictionary
- None: When using named task arguments (see below)
**kwargs: Can include:
- Reserved kwargs (runtime overrides for message_fields):
- task_multimodal_inputs: Override multimodal inputs
- messages: Override chat messages (chat history)
- context_inputs: Override context
- model_preference: Override model preference
- vars: Override template/tool variables
- Named task arguments: When message=None and a task template is
configured, any other kwargs are treated as task inputs.
Example: agent(name="Vilson", age=27)
This is useful when using agents as tools with typed annotations.
Returns:
Agent response (str, Message, or ModelStreamResponse depending on
configuration)
Raises:
ValueError: If both message and named task arguments are provided,
or if named arguments are used without a task template.
Examples:
>>> # String input
>>> agent("What is the weather?")
>>> # Dict input
>>> agent({"city": "Natal"})
>>> # Message input (requires message_fields configuration)
>>> agent_with_message = Agent(
... model=model,
... message_fields={"task_inputs": "user.query"}
... )
>>> msg = Message()
>>> msg.set("user.query", "Hello")
>>> agent_with_message(msg)
>>> # Named arguments (requires task template)
>>> agent = Agent(
... model=model,
... templates={"task": "Greet {{name}} who is {{age}} years old"},
... )
>>> agent(name="Vilson", age=27)
"""
inputs = self._prepare_task(message, **kwargs)
model_response = self._execute_model(prefilling=self.prefilling, **inputs)
response = self._process_model_response(message, model_response, **inputs)
return response
async def aforward(
self, message: Optional[Union[str, Mapping[str, Any], Message]] = None, **kwargs
) -> Union[str, Mapping[str, None], ModelStreamResponse, Message]:
"""Async version of forward."""
inputs = await self._aprepare_task(message, **kwargs)
model_response = await self._aexecute_model(
prefilling=self.prefilling, **inputs
)
response = await self._aprocess_model_response(
message, model_response, **inputs
)
return response
def _execute_model(
self,
messages: List[Mapping[str, Any]],
vars: Mapping[str, Any],
prefilling: Optional[str] = None,
model_preference: Optional[str] = None,
) -> Union[ModelResponse, ModelStreamResponse]:
model_execution_params = self._prepare_model_execution(
messages=messages,
prefilling=prefilling,
model_preference=model_preference,
vars=vars,
)
if self.guardrails.get("input"):
self._execute_input_guardrail(model_execution_params)
if self.config.get("verbose", False):
cprint(f"[{self.name}][call_model]", bc="br1", ls="b")
model_response = self.lm(**model_execution_params)
return model_response
async def _aexecute_model(
self,
messages: List[Mapping[str, Any]],
vars: Mapping[str, Any],
prefilling: Optional[str] = None,
model_preference: Optional[str] = None,
) -> Union[ModelResponse, ModelStreamResponse]:
model_execution_params = self._prepare_model_execution(
messages=messages,
prefilling=prefilling,
model_preference=model_preference,
vars=vars,
)
if self.guardrails.get("input"):
await self._aexecute_input_guardrail(model_execution_params)
if self.config.get("verbose", False):
cprint(f"[{self.name}][call_model]", bc="br1", ls="b")
model_response = await self.lm.acall(**model_execution_params)
return model_response
def _prepare_model_execution(
self,
messages: List[Mapping[str, Any]],
vars: Mapping[str, Any],
prefilling: Optional[str] = None,
model_preference: Optional[str] = None,
) -> Mapping[str, Any]:
system_prompt = self.get_system_prompt(vars)
tool_schemas = self.tool_library.get_tool_json_schemas()
if not tool_schemas:
tool_schemas = None
tool_choice = self.config.get("tool_choice")
if is_subclass_of(self.generation_schema, ToolFlowControl) and tool_schemas:
tools_template = self.generation_schema.tools_template
inputs = {"tool_schemas": tool_schemas, "tool_choice": tool_choice}
flow_control_tools = self._format_template(inputs, tools_template)
if system_prompt:
system_prompt = flow_control_tools + "\n\n" + system_prompt
else:
system_prompt = flow_control_tools
tool_schemas = None # Disable tool_schemas to controlflow preference
tool_choice = None # Disable tool_choice to controlflow preference
model_execution_params = dotdict(
messages=deepcopy(messages),
system_prompt=system_prompt or None,
prefilling=prefilling,
stream=self.config.get("stream", False),
tool_schemas=tool_schemas,
tool_choice=tool_choice,
generation_schema=self.generation_schema,
typed_parser=self.typed_parser,
)
if model_preference:
model_execution_params.model_preference = model_preference
return model_execution_params
def _prepare_input_guardrail_execution(
self, model_execution_params: Mapping[str, Any]
) -> Mapping[str, Any]:
messages = model_execution_params.get("messages")
last_message = messages[-1]
if isinstance(last_message.get("content"), list):
if last_message.get("content")[0]["type"] == "image_url":
data = [last_message]
else: # audio, file
data = last_message.get("content")[-1] # text input
else:
data = last_message.get("content")
guardrail_params = {"data": data}
return guardrail_params
def _process_model_response(
self,
message: Union[str, Mapping[str, Any], Message],
model_response: Union[ModelResponse, ModelStreamResponse],
messages: List[Mapping[str, Any]],
vars: Mapping[str, Any],
model_preference: Optional[str] = None,
) -> Union[str, Mapping[str, Any], Message, ModelStreamResponse]:
if "tool_call" in model_response.response_type:
model_response, messages = self._process_tool_call_response(
model_response, messages, vars, model_preference
)
elif is_subclass_of(self.generation_schema, ToolFlowControl):
model_response, messages = self._process_tool_flow_control_response(
model_response, messages, vars, model_preference
)
if isinstance(model_response, (ModelResponse, ModelStreamResponse)):
raw_response = self._extract_raw_response(model_response)
response_type = model_response.response_type
else: # returns tool result as response or tool call as response
raw_response = model_response
response_type = "tool_responses"
if response_type in self._supported_outputs:
response = self._prepare_response(
raw_response, response_type, messages, message, vars
)
return response
else:
raise ValueError(f"Unsupported `response_type={response_type}`")
async def _aprocess_model_response(
self,
message: Union[str, Mapping[str, Any], Message],
model_response: Union[ModelResponse, ModelStreamResponse],
messages: List[Mapping[str, Any]],
vars: Mapping[str, Any],
model_preference: Optional[str] = None,
) -> Union[str, Mapping[str, Any], Message, ModelStreamResponse]:
if "tool_call" in model_response.response_type:
model_response, messages = await self._aprocess_tool_call_response(
model_response, messages, vars, model_preference
)
elif is_subclass_of(self.generation_schema, ToolFlowControl):
(
model_response,
messages,
) = await self._aprocess_tool_flow_control_response(
model_response, messages, vars, model_preference
)
if isinstance(model_response, (ModelResponse, ModelStreamResponse)):
raw_response = self._extract_raw_response(model_response)
response_type = model_response.response_type
else: # returns tool result as response or tool call as response
raw_response = model_response
response_type = "tool_responses"
if response_type in self._supported_outputs:
response = await self._aprepare_response(
raw_response, response_type, messages, message, vars
)
return response
else:
raise ValueError(f"Unsupported `response_type={response_type}`")
def _process_tool_flow_control_response(
self,
model_response: Union[ModelResponse, ModelStreamResponse],
messages: Mapping[str, Any],
vars: Mapping[str, Any],
model_preference: Optional[str] = None,
) -> Tuple[Union[str, Mapping[str, Any], ModelStreamResponse], Mapping[str, Any]]:
"""Handle tool flow control responses using the ToolFlowControl interface."""
flow_control = self.generation_schema
while True:
raw_response = self._extract_raw_response(model_response)
# Use ToolFlowControl interface via generation_schema
flow_result = flow_control.extract_flow_result(raw_response)
if flow_result.is_complete:
return model_response, messages
if self.config.get("verbose", False) and flow_result.reasoning:
cprint(
f"[{self.name}][tool_calls_reasoning] {flow_result.reasoning}",
bc="br2",
ls="b",
)
if flow_result.tool_calls:
tool_results = self._process_tool_call(
flow_result.tool_calls, messages, vars
)
if tool_results.return_directly:
tool_calls = tool_results.to_dict().pop("return_directly")
tool_calls["reasoning"] = flow_result.reasoning
tool_responses = dotdict(tool_responses=tool_calls)
return tool_responses, messages
# Use interface to inject results
raw_response = flow_control.inject_results(raw_response, tool_results)
# Use interface to build history
messages = flow_control.build_history(raw_response, messages)
model_response = self._execute_model(
messages=messages, model_preference=model_preference, vars=vars
)
async def _aprocess_tool_flow_control_response(
self,
model_response: Union[ModelResponse, ModelStreamResponse],
messages: Mapping[str, Any],
vars: Mapping[str, Any],
model_preference: Optional[str] = None,
) -> Tuple[Union[str, Mapping[str, Any], ModelStreamResponse], Mapping[str, Any]]:
"""Async version of _process_tool_flow_control_response.
Handle tool flow control responses using the ToolFlowControl interface.
"""
flow_control = self.generation_schema
while True:
raw_response = self._extract_raw_response(model_response)
# Use ToolFlowControl interface via generation_schema (async)
flow_result = await flow_control.aextract_flow_result(raw_response)
if flow_result.is_complete:
return model_response, messages
if self.config.get("verbose", False) and flow_result.reasoning:
cprint(
f"[{self.name}][tool_calls_reasoning] {flow_result.reasoning}",
bc="br2",
ls="b",
)
if flow_result.tool_calls:
tool_results = await self._aprocess_tool_call(
flow_result.tool_calls, messages, vars
)
if tool_results.return_directly:
tool_calls = tool_results.to_dict().pop("return_directly")
tool_calls["reasoning"] = flow_result.reasoning
tool_responses = dotdict(tool_responses=tool_calls)
return tool_responses, messages
# Use interface to inject results (async version)
raw_response = await flow_control.ainject_results(
raw_response, tool_results
)
# Use interface to build history (async version)
messages = await flow_control.abuild_history(raw_response, messages)
model_response = await self._aexecute_model(
messages=messages, model_preference=model_preference, vars=vars
)
def _process_tool_call_response(
self,
model_response: Union[ModelResponse, ModelStreamResponse],
messages: Mapping[str, Any],
vars: Mapping[str, Any],
model_preference: Optional[str] = None,
) -> Tuple[Union[str, Mapping[str, Any], ModelStreamResponse], Mapping[str, Any]]:
"""ToolCall example:
[{'role': 'assistant', 'tool_responses': [{'id': 'call_1YL',
'type': 'function', 'function': {'arguments': '{"order_id":"order_12345"}',
'name': 'get_delivery_date'}}]}, {'role': 'tool', 'tool_call_id': 'call_HA',
'content': '2024-10-15'}].
"""
while True:
if model_response.response_type == "tool_call":
raw_response = model_response.data
reasoning = raw_response.reasoning
if self.config.get("verbose", False):
if reasoning:
repr_str = f"[{self.name}][tool_calls_reasoning] {reasoning}"
cprint(repr_str, bc="br2", ls="b")
tool_callings = raw_response.get_calls()
tool_results = self._process_tool_call(tool_callings, messages, vars)
if tool_results.return_directly:
tool_calls = tool_results.to_dict()
tool_calls.pop("return_directly")
tool_calls["reasoning"] = reasoning
tool_responses = dotdict(tool_responses=tool_calls)
return tool_responses, messages
id_results = {
call.id: call.result or call.error
for call in tool_results.tool_calls
}
raw_response.insert_results(id_results)
tool_responses_message = raw_response.get_messages()
messages.extend(tool_responses_message)
else:
return model_response, messages
model_response = self._execute_model(
messages=messages, model_preference=model_preference, vars=vars
)
async def _aprocess_tool_call_response(
self,
model_response: Union[ModelResponse, ModelStreamResponse],
messages: Mapping[str, Any],
vars: Mapping[str, Any],
model_preference: Optional[str] = None,
) -> Tuple[Union[str, Mapping[str, Any], ModelStreamResponse], Mapping[str, Any]]:
"""Async version of _process_tool_call_response.
ToolCall example: [{'role': 'assistant', 'tool_responses': [{'id': 'call_1YL',
'type': 'function', 'function': {'arguments': '{"order_id":"order_12345"}',
'name': 'get_delivery_date'}}]}, {'role': 'tool', 'tool_call_id': 'call_HA',
'content': '2024-10-15'}].
"""
while True:
if model_response.response_type == "tool_call":
raw_response = model_response.data
reasoning = raw_response.reasoning
if self.config.get("verbose", False):
if reasoning:
repr_str = f"[{self.name}][tool_calls_reasoning] {reasoning}"
cprint(repr_str, bc="br2", ls="b")
tool_callings = raw_response.get_calls()
tool_results = await self._aprocess_tool_call(
tool_callings, messages, vars
)
if tool_results.return_directly:
tool_calls = tool_results.to_dict()
tool_calls.pop("return_directly")
tool_calls["reasoning"] = reasoning
tool_responses = dotdict(tool_responses=tool_calls)
return tool_responses, messages
id_results = {
call.id: call.result or call.error
for call in tool_results.tool_calls
}
raw_response.insert_results(id_results)
tool_responses_message = raw_response.get_messages()
messages.extend(tool_responses_message)
else:
return model_response, messages
model_response = await self._aexecute_model(
messages=messages, model_preference=model_preference, vars=vars
)
def _process_tool_call(
self,
tool_callings: Mapping[str, Any],
messages: List[Mapping[str, Any]],
vars: Mapping[str, Any],
) -> ToolResponses:
if self.config.get("verbose", False):
for call in tool_callings:
repr_str = f"[{self.name}][tool_call] {call[1]}: {call[2]}"
cprint(repr_str, bc="br2", ls="b")
tool_results = self.tool_library(
tool_callings=tool_callings,
messages=messages,
vars=vars,
)
if self.config.get("verbose", False):
repr_str = f"[{self.name}][tool_responses]"
if tool_results.return_directly:
repr_str += " return directly"
cprint(repr_str, bc="br1", ls="b")
for call in tool_results.tool_calls:
result = call.result or call.error or ""
repr_str = f"[{self.name}][tool_response] {call.name}: {result}"
cprint(repr_str, ls="b")
return tool_results
async def _aprocess_tool_call(
self,
tool_callings: Mapping[str, Any],
messages: List[Mapping[str, Any]],
vars: Mapping[str, Any],
) -> ToolResponses:
"""Async version of _process_tool_call."""
if self.config.get("verbose", False):
for call in tool_callings:
repr_str = f"[{self.name}][tool_call] {call[1]}: {call[2]}"
cprint(repr_str, bc="br2", ls="b")
tool_results = await self.tool_library.acall(
tool_callings=tool_callings,
messages=messages,
vars=vars,
)
if self.config.get("verbose", False):
repr_str = f"[{self.name}][tool_responses]"
if tool_results.return_directly:
repr_str += " return directly"
cprint(repr_str, bc="br1", ls="b")
for call in tool_results.tool_calls:
result = call.result or call.error or ""
repr_str = f"[{self.name}][tool_response] {call.name}: {result}"
cprint(repr_str, ls="b")
return tool_results
def _prepare_response(
self,
raw_response: Union[str, Mapping[str, Any], ModelStreamResponse],
response_type: str,
messages: List[Mapping[str, Any]],
message: Union[str, Mapping[str, Any], Message],
vars: Mapping[str, Any],
) -> Union[str, Mapping[str, Any], ModelStreamResponse]:
formatted_response = None
if not isinstance(raw_response, ModelStreamResponse):
if response_type == "text_generation" or "structured" in response_type:
if self.config.get("verbose", False):
cprint(f"[{self.name}][response] {raw_response}", bc="y", ls="b")
if self.guardrails.get("output"):
self._execute_output_guardrail(raw_response)
if self.templates.get("response"):
if isinstance(raw_response, str):
pre_response = self._format_response_template(vars)
formatted_response = self._format_template(
raw_response, pre_response
)
elif isinstance(raw_response, dict):
raw_response.update(vars)
formatted_response = self._format_response_template(
raw_response
)
result = formatted_response or raw_response
if self.config.get("return_messages", False):
if response_type == "tool_responses":
result.messages = messages
else:
result = dotdict(response=result, messages=messages)
return self._define_response_mode(result, message)
async def _aprepare_response(
self,
raw_response: Union[str, Mapping[str, Any], ModelStreamResponse],
response_type: str,
messages: List[Mapping[str, Any]],
message: Union[str, Mapping[str, Any], Message],
vars: Mapping[str, Any],
) -> Union[str, Mapping[str, Any], ModelStreamResponse]:
"""Async version of _prepare_response with async output guardrail support."""
formatted_response = None
if not isinstance(raw_response, ModelStreamResponse):
if response_type == "text_generation" or "structured" in response_type:
if self.config.get("verbose", False):
cprint(f"[{self.name}][response] {raw_response}", bc="y", ls="b")
if self.guardrails.get("output"):
await self._aexecute_output_guardrail(raw_response)
if self.templates.get("response"):
if isinstance(raw_response, str):
pre_response = self._format_response_template(vars)
formatted_response = self._format_template(
raw_response, pre_response
)
elif isinstance(raw_response, dict):
raw_response.update(vars)
formatted_response = self._format_response_template(
raw_response
)
result = formatted_response or raw_response
if self.config.get("return_messages", False):
if response_type == "tool_responses":
result.messages = messages
else:
result = dotdict(response=result, messages=messages)
return self._define_response_mode(result, message)
def _prepare_output_guardrail_execution(
self, model_response: Union[str, Mapping[str, Any]]
) -> Mapping[str, Any]:
if isinstance(model_response, str):
data = model_response
else:
data = str(model_response)
guardrail_params = {"data": data}
return guardrail_params
def _prepare_task( # noqa: C901
self, message: Optional[Union[str, Message, Mapping[str, Any]]] = None, **kwargs
) -> Mapping[str, Any]:
"""Prepare model input in ChatML format and execution params."""
# Extract reserved kwargs
vars = kwargs.pop("vars", {})
messages = kwargs.pop("messages", [])
model_preference = kwargs.pop("model_preference", None)
# Get remaining kwargs (potential task inputs)
remaining_kwargs = {
k: v for k, v in kwargs.items() if k not in _RESERVED_KWARGS
}
# Handle named task arguments
if message is None and remaining_kwargs:
if not self.templates.get("task"):
raise ValueError(
f"Named task arguments require a 'task' template to be configured. "
f"Received kwargs: {list(remaining_kwargs.keys())}. "
f"Either configure a task template or pass arguments as: "
f"agent({{'key': 'value'}}) or agent(Message(...))"
)
# Convert named kwargs to dict for template rendering
message = remaining_kwargs
# Clear kwargs to avoid passing them down
for key in remaining_kwargs:
kwargs.pop(key)
elif message is not None and remaining_kwargs:
raise ValueError(
f"Cannot pass both 'message' argument and named task arguments. "
f"Received message={type(message).__name__} and "
f"kwargs={list(remaining_kwargs.keys())}. "
f"Use either agent(message) or agent(key1=value1, key2=value2)"
)
# Extract vars from Message if not provided
if not vars and isinstance(message, Message) and self.vars is not None:
vars = message.get(self.vars, {})
# Extract messages from Message if not provided
if (
messages == []
and isinstance(message, Message)
and self.messages is not None
):
messages = self._get_content_from_message(self.messages, message)
content = self._process_task_inputs(message, vars=vars, **kwargs)
if content is None and messages == []:
raise ValueError(
"No task input provided. Expected one of:\n"
" - agent('your text')\n"
" - agent({'key': 'value'})\n"
" - agent(message=Message(...))\n"
" - agent(param1=..., param2=...) with task template configured"
)
if content is not None:
chat_content = [ChatBlock.user(content)]
if messages == []:
messages = chat_content
else:
messages.extend(chat_content)
if model_preference is None and isinstance(message, Message):
model_preference = self.get_model_preference_from_message(message)
return {
"messages": messages,
"model_preference": model_preference,
"vars": vars,
}
async def _aprepare_task( # noqa: C901
self, message: Optional[Union[str, Message, Mapping[str, Any]]] = None, **kwargs
) -> Mapping[str, Any]:
"""Async version of _prepare_task.
Prepare model input in ChatML format and execution params.
"""
# Extract reserved kwargs
vars = kwargs.pop("vars", {})
messages = kwargs.pop("messages", [])
model_preference = kwargs.pop("model_preference", None)
# Get remaining kwargs (potential task inputs)
remaining_kwargs = {
k: v for k, v in kwargs.items() if k not in _RESERVED_KWARGS
}
# Handle named task arguments
if message is None and remaining_kwargs:
if not self.templates.get("task"):
raise ValueError(
f"Named task arguments require a 'task' template to be configured. "
f"Received kwargs: {list(remaining_kwargs.keys())}. "
f"Either configure a task template or pass arguments as: "
f"agent({{'key': 'value'}}) or agent(Message(...))"
)
# Convert named kwargs to dict for template rendering
message = remaining_kwargs
# Clear kwargs to avoid passing them down
for key in remaining_kwargs:
kwargs.pop(key)
elif message is not None and remaining_kwargs:
raise ValueError(
f"Cannot pass both 'message' argument and named task arguments. "
f"Received message={type(message).__name__} and "
f"kwargs={list(remaining_kwargs.keys())}. "
f"Use either agent(message) or agent(key1=value1, key2=value2)"
)
# Extract vars from Message if not provided
if not vars and isinstance(message, Message) and self.vars is not None:
vars = message.get(self.vars, {})
# Extract messages from Message if not provided
if (
messages == []
and isinstance(message, Message)
and self.messages is not None
):
messages = self._get_content_from_message(self.messages, message)
content = await self._aprocess_task_inputs(message, vars=vars, **kwargs)
if content is None and messages == []:
raise ValueError(
"No task input provided. Expected one of:\n"
" - agent('your text')\n"
" - agent({'key': 'value'})\n"
" - agent(message=Message(...))\n"
" - agent(param1=..., param2=...) with task template configured"
)
if content is not None:
chat_content = [ChatBlock.user(content)]
if messages == []:
messages = chat_content
else:
messages.extend(chat_content)
# messages is already set when content is None
if model_preference is None and isinstance(message, Message):
model_preference = self.get_model_preference_from_message(message)
return {
"messages": messages,
"model_preference": model_preference,
"vars": vars,
}
def _process_task_inputs( # noqa: C901
self,
message: Union[str, Message, Mapping[str, Any]],
vars: Mapping[str, Any],
**kwargs,
) -> Optional[Union[str, Mapping[str, Any]]]:
content = ""
context_content = self._context_manager(message, vars=vars, **kwargs)
if context_content:
content += context_content
if isinstance(message, Message):
task_inputs = self._extract_message_values(self.task_inputs, message)
else:
task_inputs = message
if task_inputs is None and self.templates.get("task") is None:
return None
if self.templates.get("task"):
if task_inputs:
if isinstance(task_inputs, str):
pre_task = self._format_task_template(vars)
task_content = self._format_template(task_inputs, pre_task)
elif isinstance(task_inputs, dict):
task_inputs.update(vars)
task_content = self._format_task_template(task_inputs)
# It's possible to use `task_template` as the default task message
# if no `task_inputs` is selected. This can be useful for multimodal
# models that require a text message to be sent along with the data
elif vars:
task_content = self._format_task_template(vars)
else:
task_content = self.templates.get("task")
else:
task_content = task_inputs
if isinstance(task_content, Mapping): # dict -> str
task_content = "\n".join(f"{k}: {v}" for k, v in task_content.items())
task_content = apply_xml_tags("task", task_content)
content += task_content
content = content.strip() # Remove whitespace
multimodal_content = self._process_task_multimodal_inputs(message, **kwargs)
if multimodal_content:
multimodal_content.append(ChatBlock.text(content))
return multimodal_content
return content
async def _aprocess_task_inputs( # noqa: C901
self,
message: Union[str, Message, Mapping[str, Any]],
vars: Mapping[str, Any],
**kwargs,
) -> Optional[Union[str, Mapping[str, Any]]]:
"""Async version of _process_task_inputs."""
content = ""
context_content = self._context_manager(message, vars=vars, **kwargs)
if context_content:
content += context_content
if isinstance(message, Message):
task_inputs = self._extract_message_values(self.task_inputs, message)
else:
task_inputs = message
if task_inputs is None and self.templates.get("task") is None:
return None
if self.templates.get("task"):
if task_inputs:
if isinstance(task_inputs, str):
pre_task = self._format_task_template(vars)
task_content = self._format_template(task_inputs, pre_task)
elif isinstance(task_inputs, dict):
task_inputs.update(vars)
task_content = self._format_task_template(task_inputs)
# It's possible to use `task_template` as the default task message
# if no `task_inputs` is selected. This can be useful for multimodal
# models that require a text message to be sent along with the data
elif vars:
task_content = self._format_task_template(vars)
else:
task_content = self.templates.get("task")
else:
task_content = task_inputs
if isinstance(task_content, Mapping): # dict -> str
task_content = "\n".join(f"{k}: {v}" for k, v in task_content.items())
task_content = apply_xml_tags("task", task_content)
content += task_content
content = content.strip() # Remove whitespace
multimodal_content = await self._aprocess_task_multimodal_inputs(
message, **kwargs
)
if multimodal_content:
multimodal_content.append(ChatBlock.text(content))
return multimodal_content
return content
def _context_manager( # noqa: C901
self,
message: Union[str, Message, Mapping[str, Any]],
vars: Mapping[str, Any],
**kwargs,
) -> Optional[str]:
"""Mount context."""
context_content = ""
if self.context_cache: # Fixed Context Cache
context_content += self.context_cache
context_inputs = None
runtime_context_inputs = kwargs.pop("context_inputs", None)
if runtime_context_inputs is not None:
context_inputs = runtime_context_inputs
elif isinstance(message, Message):
context_inputs = self._extract_message_values(self.context_inputs, message)
if context_inputs is not None:
if self.templates.get("context"):
if isinstance(context_inputs, Mapping):
context_inputs.update(vars)
msg_context = self._format_template(
context_inputs, self.templates.get("context")
)
else:
pre_msg_context = self._format_template(
vars, self.templates.get("context")
)
msg_context = self._format_template(context_inputs, pre_msg_context)
elif isinstance(context_inputs, str):
msg_context = context_inputs
elif isinstance(context_inputs, list):
msg_context = " ".join(str(v) for v in context_inputs if v is not None)
elif isinstance(context_inputs, dict):
msg_context = "\n".join(
f"{k}: {v if not isinstance(v, list) else ', '.join(v)}"
for k, v in context_inputs.items()
)
context_content += msg_context
if context_content:
if vars:
context_content = self._format_template(vars, context_content)
return apply_xml_tags("context", context_content) + "\n\n"
return None
def _process_task_multimodal_inputs(
self, message: Union[str, Message, Mapping[str, Any]], **kwargs
) -> Optional[List[Mapping[str, Any]]]:
"""Processes multimodal inputs (image, audio, video, file) via kwargs or
message.
Returns a list of multimodal content in ChatML format.
"""
multimodal_paths = None
task_multimodal_inputs = kwargs.get("task_multimodal_inputs", None)
if task_multimodal_inputs is not None:
multimodal_paths = task_multimodal_inputs
elif isinstance(message, Message) and self.task_multimodal_inputs is not None:
multimodal_paths = self._extract_message_values(
self.task_multimodal_inputs, message
)
if multimodal_paths is None:
return None
content = []
formatters = {
"image": self._format_image_input,
"audio": self._format_audio_input,
"video": self._format_video_input,
"file": self._format_file_input,
}
for media_type, formatter in formatters.items():
media_sources = multimodal_paths.get(media_type, [])
if not isinstance(media_sources, list):
media_sources = [media_sources]
for media_source in media_sources:
if media_source is not None:
formatted_input = formatter(media_source)
if formatted_input:
content.append(formatted_input)
return content
async def _aprocess_task_multimodal_inputs(
self, message: Union[str, Message, Mapping[str, Any]], **kwargs
) -> Optional[List[Mapping[str, Any]]]:
"""Async version of _process_task_multimodal_inputs.
Processes multimodal inputs (image, audio, video, file) via kwargs or message.
Returns a list of multimodal content in ChatML format.
"""
multimodal_paths = None
task_multimodal_inputs = kwargs.get("task_multimodal_inputs", None)
if task_multimodal_inputs is not None:
multimodal_paths = task_multimodal_inputs
elif isinstance(message, Message) and self.task_multimodal_inputs is not None:
multimodal_paths = self._extract_message_values(
self.task_multimodal_inputs, message
)
if multimodal_paths is None:
return None
content = []
formatters = {
"image": self._aformat_image_input,
"audio": self._aformat_audio_input,
"video": self._aformat_video_input,
"file": self._aformat_file_input,
}
for media_type, formatter in formatters.items():
media_sources = multimodal_paths.get(media_type, [])
if not isinstance(media_sources, list):
media_sources = [media_sources]
for media_source in media_sources:
if media_source is not None:
formatted_input = await formatter(media_source)
if formatted_input:
content.append(formatted_input)
return content
def _format_image_input(self, image_source: str) -> Optional[Mapping[str, Any]]:
"""Formats the image input for the model."""
img = Image(image_source, **self.config.get("image_block_kwargs", {}))
return img()
def _format_video_input(self, video_source: str) -> Optional[Mapping[str, Any]]:
"""Formats the video input for the model."""
# URLs: don't force encode (keep URL), local files: encode
is_url = video_source.startswith("http")
vid = Video(
video_source,
force_encode=not is_url,
**self.config.get("video_block_kwargs", {}),
)
return vid()
def _format_audio_input(self, audio_source: str) -> Optional[Mapping[str, Any]]:
"""Formats the audio input for the model."""
aud = Audio(audio_source)
return aud()
def _format_file_input(self, file_source: str) -> Optional[Mapping[str, Any]]:
"""Formats the file input for the model."""
f = File(file_source)
return f()
async def _aformat_image_input(
self, image_source: str
) -> Optional[Mapping[str, Any]]:
"""Async version of _format_image_input."""
img = Image(image_source, **self.config.get("image_block_kwargs", {}))
return await img.acall()
async def _aformat_video_input(
self, video_source: str
) -> Optional[Mapping[str, Any]]:
"""Async version of _format_video_input."""
# URLs: don't force encode (keep URL), local files: encode
is_url = video_source.startswith("http")
vid = Video(
video_source,
force_encode=not is_url,
**self.config.get("video_block_kwargs", {}),
)
return await vid.acall()
async def _aformat_audio_input(
self, audio_source: str
) -> Optional[Mapping[str, Any]]:
"""Async version of _format_audio_input."""
aud = Audio(audio_source)
return await aud.acall()
async def _aformat_file_input(
self, file_source: str
) -> Optional[Mapping[str, Any]]:
"""Async version of _format_file_input."""
f = File(file_source)
return await f.acall()
def inspect_model_execution_params(
self, message: Optional[Union[str, Mapping[str, Any], Message]] = None, **kwargs
) -> Mapping[str, Any]:
"""Debug model input parameters.
Accepts the same arguments as forward() to inspect what would be sent to
the model.
"""
inputs = self._prepare_task(message, **kwargs)
model_execution_params = self._prepare_model_execution(
prefilling=self.prefilling, **inputs
)
return model_execution_params
def _set_context_inputs(
self, context_inputs: Optional[Union[str, List[str]]] = None
):
if isinstance(context_inputs, (str, list)) or context_inputs is None:
if isinstance(context_inputs, str) and context_inputs == "":
raise ValueError(
"`context_inputs` requires a string not empty"
f"given `{context_inputs}`"
)
if isinstance(context_inputs, list) and not context_inputs:
raise ValueError(
"`context_inputs` requires a list not empty"
f"given `{context_inputs}`"
)
self.register_buffer("context_inputs", context_inputs)
else:
raise TypeError(
"`context_inputs` requires a string, list or None"
f"given `{type(context_inputs)}`"
)
def _set_context_cache(self, context_cache: Optional[str] = None):
if isinstance(context_cache, str) or context_cache is None:
self.register_buffer("context_cache", context_cache)
else:
raise TypeError(
"`context_cache` requires a string or None"
f"given `{type(context_cache)}`"
)
def _set_prefilling(self, prefilling: Optional[str] = None):
if isinstance(prefilling, str) or prefilling is None:
self.register_buffer("prefilling", prefilling)
else:
raise TypeError(
f"`prefilling` requires a string or Nonegiven `{type(prefilling)}`"
)
def _set_tools(
self,
tools: Optional[List[Callable]] = None,
mcp_servers: Optional[List[Mapping[str, Any]]] = None,
):
self.tool_library = ToolLibrary(
self.get_module_name(), tools or [], mcp_servers=mcp_servers
)
def _set_generation_schema(
self, generation_schema: Optional[msgspec.Struct] = None
):
if generation_schema is None or is_subclass_of(
generation_schema, msgspec.Struct
):
self.register_buffer("generation_schema", generation_schema)
else:
raise TypeError(
"`generation_schema` need be a `msgspec.Struct` or None "
f"given `{type(generation_schema)}`"
)
def _set_model(self, model: Union[ChatCompletionModel, ModelGateway, LM]):
if isinstance(model, LM): # If already LM, use directly
self.lm = model
else: # LM will validate model type
self.lm = LM(model)
@property
def model(self):
"""Access underlying model for convenience.
Returns:
The wrapped model instance
"""
return self.lm.model
@model.setter
def model(self, value: Union[ChatCompletionModel, ModelGateway, LM]):
"""Update the agent's model.
Args:
value: New model (can be Model or LM)
"""
self._set_model(value)
def _set_system_message(self, system_message: Optional[str] = None):
if isinstance(system_message, str) or system_message is None:
if isinstance(system_message, str):
system_message = cleandoc(system_message)
if (
hasattr(self.generation_schema, "system_message")
and self.generation_schema.system_message is not None
):
if system_message is None:
system_message = self.generation_schema.system_message
else:
system_message = (
self.generation_schema.system_message + system_message
)
self.system_message = Parameter(system_message, PromptSpec.SYSTEM_MESSAGE)
else:
raise TypeError(
"`system_message` requires a string or None "
f"given `{type(system_message)}`"
)
def _set_instructions(self, instructions: Optional[str] = None):
if isinstance(instructions, str) or instructions is None:
if isinstance(instructions, str):
instructions = cleandoc(instructions)
typed_parser_cls = typed_parser_registry.get(self.typed_parser, None)
if typed_parser_cls is not None:
instructions = self._format_template(
{"instructions": instructions}, typed_parser_cls.template
)
self.instructions = Parameter(instructions, PromptSpec.INSTRUCTIONS)
else:
raise TypeError(
f"`instructions` requires a string or None given `{type(instructions)}`"
)
def _set_expected_output(self, expected_output: Optional[str] = None):
if isinstance(expected_output, str) or expected_output is None: # TODO
if isinstance(expected_output, str):
expected_output = cleandoc(expected_output)
expected_output_temp = ""
if expected_output:
expected_output_temp += expected_output
typed_parser_cls = typed_parser_registry.get(self.typed_parser, None)
if typed_parser_cls is not None: # Schema as expected output
response_format = response_format_from_msgspec_struct(
self.generation_schema
)
schema = typed_parser_cls.schema_from_response_format(response_format)
content = {"expected_outputs": schema}
rendered = self._format_template(content, EXPECTED_OUTPUTS_TEMPLATE)
expected_output_temp += rendered
self.expected_output = Parameter(
expected_output_temp or None, PromptSpec.EXPECTED_OUTPUT
)
else:
raise TypeError(
"`expected_output` requires a string or None "
f"given `{type(expected_output)}`"
)
def _set_examples(
self,
examples: Optional[Union[str, List[Union[Example, Mapping[str, Any]]]]] = None,
):
if isinstance(examples, (str, list)) or examples is None:
if isinstance(examples, str):
examples = cleandoc(examples)
if isinstance(examples, list):
typed_parser_cls = typed_parser_registry.get(self.typed_parser, None)
collection = ExampleCollection(examples)
if typed_parser_cls is not None:
serialize_func = typed_parser_cls.encode
else:
serialize_func = msgspec_dumps
examples = collection.get_formatted(serialize_func, serialize_func)
self.examples = Parameter(examples, PromptSpec.EXAMPLES)
else:
raise TypeError(
f"`examples` requires a List[Example] or None given `{type(examples)}`"
)
def _set_messages(self, messages: Optional[str] = None):
if isinstance(messages, str) or messages is None:
self.register_buffer("messages", messages)
else:
raise TypeError(
f"`messages` requires a string or None given `{type(messages)}`"
)
def _set_config(self, config: Optional[Dict[str, Any]] = None):
"""Set agent configuration.
Args:
config:
Dictionary with configuration options.
Valid keys: "verbose", "return_messages", "tool_choice",
"stream", "image_block_kwargs", "video_block_kwargs", "include_date"
Raises:
TypeError:
If config is not a dict or None.
ValueError:
If invalid keys are provided.
"""
# Define valid keys for Agent
valid_keys = {
"verbose",
"return_messages",
"tool_choice",
"stream",
"image_block_kwargs",
"video_block_kwargs",
"include_date",
"execution", # Added for execution settings
}
if config is None:
self.register_buffer("config", {})
return
if not isinstance(config, dict):
raise TypeError(f"`config` must be a dict or None, given `{type(config)}`")
invalid_keys = set(config.keys()) - valid_keys
if invalid_keys:
raise ValueError(
f"Invalid config keys: {invalid_keys}. Valid keys are: {valid_keys}"
)
if "image_block_kwargs" in config:
if not isinstance(config["image_block_kwargs"], dict):
raise TypeError(
f"`image_block_kwargs` must be a dict, "
f"given `{type(config['image_block_kwargs'])}`"
)
if "video_block_kwargs" in config:
if not isinstance(config["video_block_kwargs"], dict):
raise TypeError(
f"`video_block_kwargs` must be a dict, "
f"given `{type(config['video_block_kwargs'])}`"
)
self.register_buffer("config", config.copy())
def _set_system_extra_message(self, system_extra_message: Optional[str] = None):
if isinstance(system_extra_message, str) or system_extra_message is None:
if isinstance(system_extra_message, str):
system_extra_message = cleandoc(system_extra_message)
self.register_buffer("system_extra_message", system_extra_message)
else:
raise TypeError(
"`system_extra_message` requires a string or None "
f"given `{type(system_extra_message)}`"
)
def _set_vars(self, vars: Optional[str] = None):
if isinstance(vars, str) or vars is None:
self.register_buffer("vars", vars)
else:
raise TypeError(f"`vars` requires a string or None given `{type(vars)}`")
def _set_message_fields(self, message_fields: Optional[Dict[str, Any]] = None):
"""Set message field mappings for Agent.
Args:
message_fields: Dictionary mapping field names to their values.
Valid keys: "task_inputs", "task_multimodal_inputs", "messages",
"context_inputs", "model_preference", "vars"
Raises:
TypeError: If message_fields is not a dict or None
ValueError: If invalid keys are provided
"""
# Define valid keys for Agent class
valid_keys = {
"task_inputs",
"task_multimodal_inputs",
"messages",
"context_inputs",
"model_preference",
"vars",
}
if message_fields is None:
# Set all fields to None
self._set_task_inputs(None)
self._set_task_multimodal_inputs(None)
self._set_model_preference(None)
self._set_context_inputs(None)
self._set_messages(None)
self._set_vars(None)
return
if not isinstance(message_fields, dict):
raise TypeError(
f"`message_fields` must be a dict or None, given "
f"`{type(message_fields)}`"
)
# Validate keys
invalid_keys = set(message_fields.keys()) - valid_keys
if invalid_keys:
raise ValueError(
f"Invalid message_fields keys: {invalid_keys}. "
f"Valid keys are: {valid_keys}"
)
# Set each field using its setter, defaulting to None if not provided
self._set_task_inputs(message_fields.get("task_inputs"))
self._set_task_multimodal_inputs(message_fields.get("task_multimodal_inputs"))
self._set_model_preference(message_fields.get("model_preference"))
self._set_context_inputs(message_fields.get("context_inputs"))
self._set_messages(message_fields.get("messages"))
self._set_vars(message_fields.get("vars"))
def _set_typed_parser(self, typed_parser: Optional[str] = None):
if isinstance(typed_parser, str) or typed_parser is None:
if (
isinstance(typed_parser, str)
and typed_parser not in typed_parser_registry
):
raise ValueError(
f"`typed_parser` supports only `{typed_parser_registry.keys()}`"
f" given `{typed_parser}`"
)
self.register_buffer("typed_parser", typed_parser)
else:
raise TypeError(
f"`typed_parser` requires a str given `{type(typed_parser)}`"
)
def _set_signature(
self,
*,
signature: Optional[Union[str, Signature]] = None,
examples: Optional[List[Example]] = None,
generation_schema: Optional[msgspec.Struct] = None,
instructions: Optional[str] = None,
system_message: Optional[str] = None,
typed_parser: Optional[str] = None,
):
if signature is not None:
typed_parser_cls = typed_parser_registry.get(typed_parser, None)
examples = examples or []
output_descriptions = None
signature_instructions = None
if isinstance(signature, str):
input_str_signature, output_str_signature = signature.split("->")
inputs_info = StructFactory._parse_annotations(input_str_signature)
outputs_info = StructFactory._parse_annotations(output_str_signature)
elif issubclass(signature, Signature):
output_str_signature = signature.get_str_signature().split("->")[-1]
inputs_info = signature.get_inputs_info()
outputs_info = signature.get_outputs_info()
output_descriptions = signature.get_output_descriptions()
signature_instructions = signature.get_instructions()
signature_examples = SignatureFactory.get_examples_from_signature(
signature
)
if signature_examples:
examples.extend(signature_examples)
else:
raise TypeError(
"`signature` requires a string, `Signature` or None "
f"given `{type(signature)}`"
)
# typed_parser
self._set_typed_parser(typed_parser)
# task template - add to templates dict, overriding if present
task_template = SignatureFactory.get_task_template_from_signature(
inputs_info
)
self.templates["task"] = task_template
# instructions
self._set_instructions(instructions or signature_instructions)
# generation schema
signature_output_struct = StructFactory.from_signature(
output_str_signature, "Outputs", output_descriptions
)
fused_output_struct = None
if generation_schema is not None:
signature_as_type = cast(Type[msgspec.Struct], signature_output_struct)
if is_optional_field(generation_schema, "final_answer"):
signature_as_type = Optional[signature_output_struct] # type: ignore
# Merge parent annotations with new final_answer annotation
merged_annotations = {
**generation_schema.__annotations__,
"final_answer": signature_as_type,
}
fused_output_struct = type(
"Output",
(generation_schema,),
{"__annotations__": merged_annotations},
)
self._set_generation_schema(fused_output_struct or signature_output_struct)
# system message
self._set_system_message(system_message)
# expected output
expected_output = SignatureFactory.get_expected_output_from_signature(
inputs_info, outputs_info, typed_parser_cls
)
self._set_expected_output(expected_output)
# examples
self._set_examples(examples)
# Generate and set annotations from signature inputs
generated_annotations = generate_annotations_from_signature(
inputs_info, signature
)
self.set_annotations(generated_annotations)
def get_system_prompt(self, vars: Optional[Mapping[str, Any]] = None) -> str:
"""Render the system prompt using the Jinja template.
Returns an empty string if no segments are provided.
"""
template_inputs = dotdict(
system_message=self.system_message.data,
instructions=self.instructions.data,
expected_output=self.expected_output.data,
examples=self.examples.data,
system_extra_message=self.system_extra_message,
)
if self.config.get("include_date", False):
now = datetime.now(tz=timezone.utc)
# Format: "Monday, December 09, 2025"
template_inputs.current_date = now.strftime("%A, %B %d, %Y")
system_prompt = self._format_template(
template_inputs, self.system_prompt_template
)
if vars: # Runtime inputs to system template
system_prompt = self._format_template(vars, system_prompt)
return system_prompt
@property
def system_prompt_template(self) -> str:
"""Get the system prompt template.
Returns the custom template if provided in templates dict,
otherwise returns the default SYSTEM_PROMPT_TEMPLATE.
"""
return self.templates.get("system_prompt", SYSTEM_PROMPT_TEMPLATE)