bat.prebuilt.prebuilt_workflow
PrebuiltWorkflow Objects
class PrebuiltWorkflow(ABC)
Abstract base class for prebuilt workflows. Extend this class to implement custom workflows.
__init__
def __init__(config: AgentConfig, StateType: Type[AgentState], *args,
**kwargs)
Initialize the AgentGraph with a state graph and optional checkpointing and logger. Compile the state graph and set up the logger if the logger_name is provided.
Arguments:
graph_builderStateGraph - The state graph builder.use_checkpointbool - Whether to use checkpointing. Defaults to False.logger_nameOptional[str] - The name of the logger to use. Defaults to None.
StateType
@property
def StateType() -> Type[AgentState]
Get the AgentState type used in the workflow.
Returns:
Type[AgentState]- The AgentState type.
graph_builder
@property
def graph_builder() -> StateGraph
Get the state graph builder.
Returns:
StateGraph- The state graph builder.
graph
@property
def graph() -> CompiledStateGraph
Get the compiled state graph.
Returns:
CompiledStateGraph- The compiled state graph.
agent_config
@property
def agent_config() -> AgentConfig
Get the AgentConfig used in the workflow.
Returns:
AgentConfig- The AgentConfig.
as_runnable
def as_runnable() -> RunnableGenerator
Returns a RunnableGenerator for the ReActLoop, wrapping the astream method.
Allows the ReActLoop to be used as a node (subgraph) in a larger graph.
bat.prebuilt.call_agent_node
CallAgentNode Objects
class CallAgentNode(PrebuiltWorkflow)
CallAgentNode implements agent-to-agent communication using the A2A protocol.
This workflow abstracts streaming communication between agents as an internal mini-graph. It handles the complete lifecycle of calling another agent, consuming its streamed responses, and managing state updates.
The internal mini-graph flow is: START → call_agent → consume_stream → router → (consume_stream | cleanup) → END
The loop continues consuming streamed responses from the target agent until either:
- The target agent completes its task
- The target agent requests user input
- An error occurs
Args
config (AgentConfig): Configuration for the agent, including checkpointing options.
StateType (Type[AgentState]): The AgentState schema used in the loop.
loop_name (str): The name of this workflow loop (e.g., "domain_agent_loop").
agent_name (str): The name of the target agent to call (e.g., "SMO Agent").
build_message (Callable[[RunnableConfig, str], Message]): Callback function to build
the request message from the input text.
input (str, optional): A key pointing to a string in the state. Defaults to "question".
The value at this key is used as input to send to the target agent.
output (str, optional): A key pointing to a string in the state. Defaults to "answer".
The value at this key is updated with responses from the target agent.
global_status (str, optional): A key pointing to a string in the state. Defaults to "status".
The value at this key is updated with the overall status of the communication.
agent_input_required (str, optional): A key pointing to a bool in the state. Defaults to "agent_input".
The value at this key is set to True when the target agent requests user input.
agent_status (str, optional): A key pointing to a string in the state. Defaults to "agent_status".
The value at this key is updated with the status from the target agent.
agent_content (str, optional): A key pointing to a string in the state. Defaults to "agent_content".
The value at this key is updated with the content from the target agent.
input_required (str, optional): A key pointing to a bool in the state. Defaults to "input_required".
The value at this key is set to True when user input is required.
recursion_limit (int, optional): Maximum recursion depth for nested calls. Defaults to 50.
Example
from a2a.types import Message
from bat.agent import AgentGraph, AgentState
from bat.prebuilt import CallAgentNode
from langchain_core.runnables import RunnableConfig
from typing import List, Optional
from langchain_core.messages import BaseMessage
class OrchestratorAgentState(AgentState):
agent_input_text: str
agent_output_text: str
agent_status: str
agent_input_required: bool = False
agent_content: Optional[str] = None
input_required: bool = False
...
def build_agent_message(config: RunnableConfig, text: str) -> Message:
"""Build a message to send to the SMO Agent."""
return Message(
role="user",
parts=[{"type": "text", "text": text}],
)
class OrchestratorGraph(AgentGraph):
def setup(self, config: AgentConfig) -> None:
self.call_agent_node = CallAgentNode(
config=config,
StateType=OrchestratorAgentState,
loop_name="domain_agent_loop",
agent_name="SMO Agent",
input="agent_input_text",
output="agent_output_text",
global_status="agent_status",
agent_input_required="agent_input_required",
agent_status="agent_status",
agent_content="agent_content",
input_required="input_required",
build_message=build_agent_message,
)
...
self.graph_builder.add_node("call_agent_node", self.call_agent_node.as_runnable())
__init__
def __init__(config: AgentConfig,
StateType: Type[AgentState],
loop_name: str,
agent_name: str,
build_message: Callable[[RunnableConfig, str], Message],
*,
input: str = "question",
output: str = "answer",
global_status: str = "status",
agent_input_required: str = "agent_input",
agent_status: str = "agent_status",
agent_content: str = "agent_content",
input_required: str = "input_required",
recursion_limit: int = 50) -> None
Initialize the CallAgentNode workflow with the given configuration and parameters.
Arguments:
configAgentConfig - Configuration for the agent, including checkpointing options.StateTypeType[AgentState] - The AgentState schema used in the loop.loop_namestr - The name of this workflow loop (e.g., "domain_agent_loop").agent_namestr - The name of the target agent to call (e.g., "SMO Agent"). The agent card will be retrieved from the configuration using this name.build_messageCallable[[RunnableConfig, str], Message] - Callback function to build the request message from the input text. Should accept a RunnableConfig and a string, and return an A2A Message object.inputstr, optional - A key pointing to a string in the state. Defaults to "question". The value at this key is used as input to send to the target agent.outputstr, optional - A key pointing to a string in the state. Defaults to "answer". The value at this key is updated with responses from the target agent.global_statusstr, optional - A key pointing to a string in the state. Defaults to "status". The value at this key is updated with the overall status of the communication. Useful to display the current operation to the user.agent_input_requiredstr, optional - A key pointing to a bool in the state. Defaults to "agent_input". The value at this key is set to True when the target agent requests user input.agent_statusstr, optional - A key pointing to a string in the state. Defaults to "agent_status". The value at this key is updated with the status from the target agent.agent_contentstr, optional - A key pointing to a string in the state. Defaults to "agent_content". The value at this key is updated with the content from the target agent.input_requiredstr, optional - A key pointing to a bool in the state. Defaults to "input_required". The value at this key is set to True when user input is required.recursion_limitint, optional - Maximum recursion depth for nested calls. Defaults to 50. This prevents infinite loops in agent-to-agent communication.
consume_agent_stream
async def consume_agent_stream(
agent_card: AgentCard,
message: Message) -> AsyncIterable[ClientEvent | Message]
Consume the agent stream from another A2A agent. The following operations are performed:
- Creates an A2A client configured for streaming (120s timeout)
- Sends the request message to the target agent
- Yields each stream item (events or messages)
- Tracks usage metadata from the stream for metrics collection
- Handles errors and ensures proper cleanup
The method automatically extracts and stores usage metadata from each stream item, which can later be retrieved using get_usage_metadata().
Arguments:
agent_cardAgentCard - The agent card of the target agent, containing connection details and capabilities.messageMessage - The A2A message to send to the agent.
Yields:
ClientEvent | Message: Stream items from the agent, including status updates, artifacts, and final messages.
Raises:
Exception- If the streaming connection fails or encounters an error.
get_usage_metadata
def get_usage_metadata(
from_timestamp: Optional[float] = None) -> UsageMetadata
Get the aggregated usage metadata collected from the agent communication stream.
Arguments:
from_timestampOptional[float] - If provided, only usage metadata after this timestamp will be considered. If None, all usage metadata will be considered.
Returns:
UsageMetadata- The aggregated usage metadata from all stream events.
bat.prebuilt.react_loop
ReActLoop Objects
class ReActLoop(PrebuiltWorkflow)
ReActLoop implements a ReAct-style loop using a ChatModelClient with associated tools.
The available tools must be registered in the ChatModelClient used to create the ReActLoop. The loop continues until the chat model produces a final answer without any tool calls.
Args
config (AgentConfig): Configuration for the agent, including checkpointing options.
StateType (Type[AgentState]): The AgentState schema used in the loop.
loop_name (str): The name of the loop.
chat_model_client (ChatModelClient): The chat model client to use.
input_key (str, optional): A key pointing to a string or HumanMessage in the state. Defaults to "input".
The value at this key is used as input to the ChatModelClient on the first call of the loop.
output_key (str, optional): A key pointing to a string in the state. Defaults to "output".
The value at this key is set to the final answer from the ChatModelClient when the loop completes.
messages_key (str, optional): A key pointing to a List[BaseMessage] in the state. Defaults to None.
If provided, the value at this key is used as the conversation history for the ChatModelClient.
The conversation history is updated **in-place** with each response from the ChatModelClient.
status_key (str, optional): A key pointing to a string in the state. Defaults to None.
If provided, the value at this key is updated with the current status of the loop.
Useful to beautify the streamed output of the loop.
Example
from bat.agent import AgentGraph, AgentState
from bat.chat_model_client import ChatModelClient
from bat.prebuilt import ReActLoop
from langchain.tools import tool
class RNGClient(ChatModelClient):
...
class RNGAgentState(AgentState):
user_input: str
model_output: str
conversation_history: List[BaseMessage] = []
agent_status: Optional[str] = None
...
@tool
def generate_random_numbers(int min: int, int max: int, int n: int) -> str:
"""Generate n random numbers between min and max (inclusive)."""
...
class RNGAgentGraph(AgentGraph):
def setup(self, config: AgentConfig) -> None:
self.rng_client = RNGClient(tools=[generate_random_numbers])
self.rng_loop = ReActLoop(
state_schema=RNGAgentState,
loop_name="rng_loop",
chat_model_client=self.rng_client,
input_key="user_input",
output_key="model_output",
messages_key="conversation_history",
status_key="agent_status",
)
...
self.graph_builder.add_node("rng_loop", self.rng_loop.as_runnable())
__init__
def __init__(config: AgentConfig,
StateType: Type[AgentState],
loop_name: str,
chat_model_client: ChatModelClient,
input_key: str = "input",
output_key: str = "output",
messages_key: Optional[str] = None,
status_key: Optional[str] = None) -> None
Initialize the ReActLoop with the given configuration and parameters.
Arguments:
configAgentConfig - Configuration for the agent, including checkpointing options.StateTypeType[AgentState] - The AgentState schema used in the loop.loop_namestr - The name of the loop.chat_model_clientChatModelClient - The chat model client to use.input_keystr, optional - A key pointing to a string or HumanMessage in the state. Defaults to "input". The value at this key is used as input to the ChatModelClient on the first call of the loop.output_keystr, optional - A key pointing to a string in the state. Defaults to "output". The value at this key is set to the final answer from the ChatModelClient when the loop completes.messages_keystr, optional - A key pointing to a List[BaseMessage] in the state. Defaults to None. If provided, the value at this key is used as the conversation history for the ChatModelClient. The conversation history is updated in-place with each response from the ChatModelClient.status_keystr, optional - A key pointing to a string in the state. Defaults to None. If provided, the value at this key is updated with the current status of the loop. Useful to beautify the streamed output of the loop.