Skip to main content
Version: v4.0.0 [Denim]

bat.prebuilt.prebuilt_workflow

PrebuiltWorkflow Objects

class PrebuiltWorkflow(ABC)

Abstract base class for prebuilt workflows. Extend this class to implement custom workflows.

__init__

def __init__(config: AgentConfig, StateType: Type[AgentState], *args,
**kwargs)

Initialize the AgentGraph with a state graph and optional checkpointing and logger. Compile the state graph and set up the logger if the logger_name is provided.

Arguments:

  • graph_builder StateGraph - The state graph builder.
  • use_checkpoint bool - Whether to use checkpointing. Defaults to False.
  • logger_name Optional[str] - The name of the logger to use. Defaults to None.

StateType

@property
def StateType() -> Type[AgentState]

Get the AgentState type used in the workflow.

Returns:

  • Type[AgentState] - The AgentState type.

graph_builder

@property
def graph_builder() -> StateGraph

Get the state graph builder.

Returns:

  • StateGraph - The state graph builder.

graph

@property
def graph() -> CompiledStateGraph

Get the compiled state graph.

Returns:

  • CompiledStateGraph - The compiled state graph.

agent_config

@property
def agent_config() -> AgentConfig

Get the AgentConfig used in the workflow.

Returns:

  • AgentConfig - The AgentConfig.

as_runnable

def as_runnable() -> RunnableGenerator

Returns a RunnableGenerator for the ReActLoop, wrapping the astream method. Allows the ReActLoop to be used as a node (subgraph) in a larger graph.

bat.prebuilt.call_agent_node

CallAgentNode Objects

class CallAgentNode(PrebuiltWorkflow)

CallAgentNode implements agent-to-agent communication using the A2A protocol.

This workflow abstracts streaming communication between agents as an internal mini-graph. It handles the complete lifecycle of calling another agent, consuming its streamed responses, and managing state updates.

The internal mini-graph flow is: START → call_agent → consume_stream → router → (consume_stream | cleanup) → END

The loop continues consuming streamed responses from the target agent until either:

  • The target agent completes its task
  • The target agent requests user input
  • An error occurs

Args

config (AgentConfig): Configuration for the agent, including checkpointing options.
StateType (Type[AgentState]): The AgentState schema used in the loop.
loop_name (str): The name of this workflow loop (e.g., "domain_agent_loop").
agent_name (str): The name of the target agent to call (e.g., "SMO Agent").
build_message (Callable[[RunnableConfig, str], Message]): Callback function to build
the request message from the input text.
input (str, optional): A key pointing to a string in the state. Defaults to "question".
The value at this key is used as input to send to the target agent.
output (str, optional): A key pointing to a string in the state. Defaults to "answer".
The value at this key is updated with responses from the target agent.
global_status (str, optional): A key pointing to a string in the state. Defaults to "status".
The value at this key is updated with the overall status of the communication.
agent_input_required (str, optional): A key pointing to a bool in the state. Defaults to "agent_input".
The value at this key is set to True when the target agent requests user input.
agent_status (str, optional): A key pointing to a string in the state. Defaults to "agent_status".
The value at this key is updated with the status from the target agent.
agent_content (str, optional): A key pointing to a string in the state. Defaults to "agent_content".
The value at this key is updated with the content from the target agent.
input_required (str, optional): A key pointing to a bool in the state. Defaults to "input_required".
The value at this key is set to True when user input is required.
recursion_limit (int, optional): Maximum recursion depth for nested calls. Defaults to 50.

Example

from a2a.types import Message
from bat.agent import AgentGraph, AgentState
from bat.prebuilt import CallAgentNode
from langchain_core.runnables import RunnableConfig
from typing import List, Optional
from langchain_core.messages import BaseMessage

class OrchestratorAgentState(AgentState):
agent_input_text: str
agent_output_text: str
agent_status: str
agent_input_required: bool = False
agent_content: Optional[str] = None
input_required: bool = False
...

def build_agent_message(config: RunnableConfig, text: str) -> Message:
"""Build a message to send to the SMO Agent."""
return Message(
role="user",
parts=[{"type": "text", "text": text}],
)

class OrchestratorGraph(AgentGraph):
def setup(self, config: AgentConfig) -> None:
self.call_agent_node = CallAgentNode(
config=config,
StateType=OrchestratorAgentState,
loop_name="domain_agent_loop",
agent_name="SMO Agent",
input="agent_input_text",
output="agent_output_text",
global_status="agent_status",
agent_input_required="agent_input_required",
agent_status="agent_status",
agent_content="agent_content",
input_required="input_required",
build_message=build_agent_message,
)
...
self.graph_builder.add_node("call_agent_node", self.call_agent_node.as_runnable())

__init__

def __init__(config: AgentConfig,
StateType: Type[AgentState],
loop_name: str,
agent_name: str,
build_message: Callable[[RunnableConfig, str], Message],
*,
input: str = "question",
output: str = "answer",
global_status: str = "status",
agent_input_required: str = "agent_input",
agent_status: str = "agent_status",
agent_content: str = "agent_content",
input_required: str = "input_required",
recursion_limit: int = 50) -> None

Initialize the CallAgentNode workflow with the given configuration and parameters.

Arguments:

  • config AgentConfig - Configuration for the agent, including checkpointing options.
  • StateType Type[AgentState] - The AgentState schema used in the loop.
  • loop_name str - The name of this workflow loop (e.g., "domain_agent_loop").
  • agent_name str - The name of the target agent to call (e.g., "SMO Agent"). The agent card will be retrieved from the configuration using this name.
  • build_message Callable[[RunnableConfig, str], Message] - Callback function to build the request message from the input text. Should accept a RunnableConfig and a string, and return an A2A Message object.
  • input str, optional - A key pointing to a string in the state. Defaults to "question". The value at this key is used as input to send to the target agent.
  • output str, optional - A key pointing to a string in the state. Defaults to "answer". The value at this key is updated with responses from the target agent.
  • global_status str, optional - A key pointing to a string in the state. Defaults to "status". The value at this key is updated with the overall status of the communication. Useful to display the current operation to the user.
  • agent_input_required str, optional - A key pointing to a bool in the state. Defaults to "agent_input". The value at this key is set to True when the target agent requests user input.
  • agent_status str, optional - A key pointing to a string in the state. Defaults to "agent_status". The value at this key is updated with the status from the target agent.
  • agent_content str, optional - A key pointing to a string in the state. Defaults to "agent_content". The value at this key is updated with the content from the target agent.
  • input_required str, optional - A key pointing to a bool in the state. Defaults to "input_required". The value at this key is set to True when user input is required.
  • recursion_limit int, optional - Maximum recursion depth for nested calls. Defaults to 50. This prevents infinite loops in agent-to-agent communication.

consume_agent_stream

async def consume_agent_stream(
agent_card: AgentCard,
message: Message) -> AsyncIterable[ClientEvent | Message]

Consume the agent stream from another A2A agent. The following operations are performed:

  1. Creates an A2A client configured for streaming (120s timeout)
  2. Sends the request message to the target agent
  3. Yields each stream item (events or messages)
  4. Tracks usage metadata from the stream for metrics collection
  5. Handles errors and ensures proper cleanup

The method automatically extracts and stores usage metadata from each stream item, which can later be retrieved using get_usage_metadata().

Arguments:

  • agent_card AgentCard - The agent card of the target agent, containing connection details and capabilities.
  • message Message - The A2A message to send to the agent.

Yields:

ClientEvent | Message: Stream items from the agent, including status updates, artifacts, and final messages.

Raises:

  • Exception - If the streaming connection fails or encounters an error.

get_usage_metadata

def get_usage_metadata(
from_timestamp: Optional[float] = None) -> UsageMetadata

Get the aggregated usage metadata collected from the agent communication stream.

Arguments:

  • from_timestamp Optional[float] - If provided, only usage metadata after this timestamp will be considered. If None, all usage metadata will be considered.

Returns:

  • UsageMetadata - The aggregated usage metadata from all stream events.

bat.prebuilt.react_loop

ReActLoop Objects

class ReActLoop(PrebuiltWorkflow)

ReActLoop implements a ReAct-style loop using a ChatModelClient with associated tools.

The available tools must be registered in the ChatModelClient used to create the ReActLoop. The loop continues until the chat model produces a final answer without any tool calls.

Args

config (AgentConfig): Configuration for the agent, including checkpointing options.
StateType (Type[AgentState]): The AgentState schema used in the loop.
loop_name (str): The name of the loop.
chat_model_client (ChatModelClient): The chat model client to use.
input_key (str, optional): A key pointing to a string or HumanMessage in the state. Defaults to "input".
The value at this key is used as input to the ChatModelClient on the first call of the loop.
output_key (str, optional): A key pointing to a string in the state. Defaults to "output".
The value at this key is set to the final answer from the ChatModelClient when the loop completes.
messages_key (str, optional): A key pointing to a List[BaseMessage] in the state. Defaults to None.
If provided, the value at this key is used as the conversation history for the ChatModelClient.
The conversation history is updated **in-place** with each response from the ChatModelClient.
status_key (str, optional): A key pointing to a string in the state. Defaults to None.
If provided, the value at this key is updated with the current status of the loop.
Useful to beautify the streamed output of the loop.

Example

from bat.agent import AgentGraph, AgentState
from bat.chat_model_client import ChatModelClient
from bat.prebuilt import ReActLoop
from langchain.tools import tool

class RNGClient(ChatModelClient):
...

class RNGAgentState(AgentState):
user_input: str
model_output: str
conversation_history: List[BaseMessage] = []
agent_status: Optional[str] = None
...

@tool
def generate_random_numbers(int min: int, int max: int, int n: int) -> str:
"""Generate n random numbers between min and max (inclusive)."""
...

class RNGAgentGraph(AgentGraph):
def setup(self, config: AgentConfig) -> None:
self.rng_client = RNGClient(tools=[generate_random_numbers])
self.rng_loop = ReActLoop(
state_schema=RNGAgentState,
loop_name="rng_loop",
chat_model_client=self.rng_client,
input_key="user_input",
output_key="model_output",
messages_key="conversation_history",
status_key="agent_status",
)
...
self.graph_builder.add_node("rng_loop", self.rng_loop.as_runnable())

__init__

def __init__(config: AgentConfig,
StateType: Type[AgentState],
loop_name: str,
chat_model_client: ChatModelClient,
input_key: str = "input",
output_key: str = "output",
messages_key: Optional[str] = None,
status_key: Optional[str] = None) -> None

Initialize the ReActLoop with the given configuration and parameters.

Arguments:

  • config AgentConfig - Configuration for the agent, including checkpointing options.
  • StateType Type[AgentState] - The AgentState schema used in the loop.
  • loop_name str - The name of the loop.
  • chat_model_client ChatModelClient - The chat model client to use.
  • input_key str, optional - A key pointing to a string or HumanMessage in the state. Defaults to "input". The value at this key is used as input to the ChatModelClient on the first call of the loop.
  • output_key str, optional - A key pointing to a string in the state. Defaults to "output". The value at this key is set to the final answer from the ChatModelClient when the loop completes.
  • messages_key str, optional - A key pointing to a List[BaseMessage] in the state. Defaults to None. If provided, the value at this key is used as the conversation history for the ChatModelClient. The conversation history is updated in-place with each response from the ChatModelClient.
  • status_key str, optional - A key pointing to a string in the state. Defaults to None. If provided, the value at this key is updated with the current status of the loop. Useful to beautify the streamed output of the loop.