bat.agent.config
AgentConfig Objects
class AgentConfig(BaseModel)
Agent Configuration, including MCP servers and remote agents.
Attributes
mcp_servers (List[MCPServerConfig]): List of MCP server configurations.
remote_agents (List[RemoteAgentConfig]): List of remote agent configurations.
Methods
is_remote_agent_required(agent_name: str) -> bool:
Check if a remote agent is marked as required in the configuration.
is_mcp_server_required(server_name: str) -> bool:
Check if an MCP server is marked as required in the configuration.
load(path: str = 'config.yaml') -> AgentConfig:
Load the agent configuration from a YAML file.
get_tools(mcp_server_names: List[str]) -> List[BaseTool]:
Retrieve tools from the specified MCP servers.
get_agent_cards(agent_names: List[str]) -> List[AgentCard]:
Retrieve agent cards from the specified remote agents.
list_mcp_servers_names
def list_mcp_servers_names() -> List[str]
List the names of all configured MCP servers.
Returns:
List[str]- List of MCP server names.
list_remote_agents_names
def list_remote_agents_names() -> List[str]
List the names of all configured remote agents.
Returns:
List[str]- List of remote agent names.
is_remote_agent_required
def is_remote_agent_required(agent_name: str) -> bool
Check if a remote agent is marked as required in the configuration.
Arguments:
agent_namestr - The name of the remote agent.
Returns:
bool- True if the agent is required, False otherwise (or if the agent is not in the configuration).
is_mcp_server_required
def is_mcp_server_required(server_name: str) -> bool
Check if an MCP server is marked as required in the configuration.
Arguments:
server_namestr - The name of the MCP server.
Returns:
bool- True if the server is required, False otherwise (or if the server is not in the configuration).
load
@classmethod
def load(cls, path: str = 'config.yaml') -> Self
Load the agent configuration from a YAML file. If the YAML file is not found, an empty configuration is used.
Arguments:
pathstr - The path to the configuration YAML file.
Returns:
AgentConfig- The loaded agent configuration or an empty configuration if the file is not found.
Raises:
ValueError- If the configuration file cannot be loaded or validated.
get_mcp_server_connection
def get_mcp_server_connection(server_name: str) -> MCPConnection | None
Get the MCP connection for a given MCP server.
Arguments:
server_namestr - The name of the MCP server.
Returns:
MCPConnection | None: The MCP connection object if the MCP server is found, otherwise None
get_mcp_agent_connection
def get_mcp_agent_connection(agent_name: str) -> MCPConnection | None
Get the MCP connection for a given remote agent, if it uses MCP protocol.
Arguments:
agent_namestr - The name of the remote agent.
Returns:
MCPConnection | None: The MCP connection object if the agent is found and uses MCP protocol, otherwise None.
get_a2a_agent_connection
def get_a2a_agent_connection(agent_name: str) -> A2AConnection | None
Get the A2A connection for a given remote agent name, if it uses A2A protocol.
Arguments:
agent_namestr - The name of the remote agent.
Returns:
A2AConnection | None: The A2A connection object if the agent is found and uses A2A protocol, otherwise None.
list_tools
async def list_tools(mcp_server_names: List[str]) -> List[BaseTool]
Retrieve tools from the specified MCP servers.
Arguments:
mcp_server_namesList[str] - List of MCP server names to retrieve tools from.
Returns:
List[BaseTool]- List of tools available from the specified MCP servers.
Raises:
ConnectionError- If arequiredMCP server cannot be connected to.
list_agent_cards
async def list_agent_cards(agent_names: List[str]) -> Dict[str, AgentCard]
Retrieve agent cards from the specified remote agents. If a non-required remote agent cannot be reached, it is not included in the result.
Arguments:
agent_namesList[str] - List of remote agent names to retrieve agent cards from.
Returns:
Dict[str, AgentCard]: Dictionary of Agent Cards available from the specified remote agents.
Raises:
ConnectionError- If arequiredremote agent cannot be connected to.
bat.agent.application
AgentApplication Objects
class AgentApplication()
__init__
def __init__(AgentGraphType: Type[AgentGraph],
AgentStateType: Type[AgentState],
agent_card_path: str = './agent.json')
Initialize the AgentApplication with the given agent card path and agent graph.
Arguments:
agent_graphAgentGraph - The agent graph implementing the agent's logic.agent_card_pathstr - The path to the agent card JSON file. Defaults to "./agent.json".
load_agent_card
def load_agent_card(agent_card_path: str) -> AgentCard
Load the Agent Card from a JSON file.
Arguments:
agent_card_pathstr - The path to the Agent Card JSON file.
Returns:
AgentCard- The loaded Agent Card.
Raises:
Exception- For general errors during loading.EnvironmentError- If the URL environment variable is not set.FileNotFoundError- If the agent card file does not exist.ValidationError- If the agent card JSON is invalid.
agent_graph
@property
def agent_graph() -> AgentGraph
Get the agent graph.
agent_card
@property
def agent_card() -> AgentCard
Get the agent card.
run
def run(expose_mcp: bool = False) -> None
Run the agent application.
Arguments:
expose_mcpbool, optional - Whether to expose the MCP protocol. Defaults to False. This parameter isn't fully supported yet and may lead to unexpected behavior when set to True.
bat.agent.graph
AgentGraph Objects
class AgentGraph(ABC)
Abstract base class for agent graphs.
Extend this class to implement the specific behavior of an agent.
Example
from bat.agent import AgentGraph, AgentState
from langgraph.runnables import RunnableConfig
from langgraph.graph import StateGraph
class MyAgentState(BaseModel):
# Your state here
# ...
pass
class MyAgentGraph(AgentGraph):
def __init__(self):
# Define the agent graph using langgraph.graph.StateGraph class
graph_builder = StateGraph(MyAgentState)
# Add nodes and edges to the graph as needed ...
super().__init__(
graph_builder=graph_builder,
use_checkpoint=True,
logger_name="my_agent"
)
self._log("Graph initialized", "info")
# Your nodes logic here
# ...
__init__
def __init__(config: AgentConfig, StateType: Type[AgentState])
Initialize the AgentGraph with a state graph and optional checkpointing and logger. Compile the state graph and set up the logger if the logger_name is provided.
Arguments:
graph_builderStateGraph - The state graph builder.use_checkpointbool - Whether to use checkpointing. Defaults to False.logger_nameOptional[str] - The name of the logger to use. Defaults to None.
graph_builder
@property
def graph_builder() -> StateGraph
Get the state graph builder.
Returns:
StateGraph- The state graph builder.
setup
@abstractmethod
def setup(config: AgentConfig) -> None
Set up the agent graph with the provided configuration. Subclasses must implement this method.
Arguments:
configAgentConfig - The agent configuration.
astream
async def astream(query: str,
config: RunnableConfig) -> AsyncIterable[AgentTaskResult]
Asynchronously stream results from the agent graph based on the query and configuration. This method performes the following steps:
- Looks for a checkpoint associated with the provided configuration.
- If no checkpoint is found, creates a new agent state from the query,
using the
from_querymethod of theStateType. - If a checkpoint is found, restores the state from the checkpoint and updates it with the query
using the
update_after_checkpoint_restoremethod. - Prepares the input for the graph execution, wrapping the state in a
Commandif theis_waiting_for_human_inputmethod of the state returnsTrue. - Executes the graph with the
astreammethod, passing the input and configuration. - For each item in the stream:
- If it is an interrupt, yields an
AgentTaskResultwith the statusinput-required. This enables human-in-the-loop interactions. - Otherwise, validates the item as an
StateTypeand converts it to anAgentTaskResultusing theto_task_resultmethod of the state. Then it yields the result.
This method prints debug logs in the format [<thread_id>]: <message>.
Arguments:
querystr - The query to process.configRunnableConfig - Configuration for the runnable.
Returns:
AsyncIterable[AgentTaskResult]- An asynchronous iterable of agent task results.
consume_agent_stream
async def consume_agent_stream(
agent_card: AgentCard,
message: Message) -> AsyncIterable[ClientEvent | Message]
WARNING: THIS METHOD IS DEPRECATED AND WILL BE REMOVED IN FUTURE RELEASES. USE THE CallAgentNode INSTEAD.
Consume the agent stream from another A2A agent using the provided agent card and request.
Arguments:
agent_cardAgentCard - The agent card of the target agent.requestMessage - The message to send to the agent.
Yields:
AsyncIterable[SendStreamingMessageSuccessResponse]- An asynchronous iterable of streaming message responses.
draw_mermaid
def draw_mermaid(file_path: Optional[str] = None) -> None
Draw the agent graph in Mermaid format. If a file path is provided, save the diagram to the file, otherwise print it to the console.
Arguments:
file_pathOptional[str] - The path to the file where the Mermaid diagram should be saved.
bat.agent.state
AgentTaskStatus
AgentTaskStatus is a type alias for the status of an agent task.
The possible values are:
working: The agent is currently processing the task.input-required: The agent requires additional input from the user to proceed.completed: The agent has successfully completed the task.error: An error occurred during the task execution.
AgentTaskResult Objects
class AgentTaskResult(BaseModel)
Result of an agent invocation.
Attributes
task_status (AgentTaskStatus): The status of the agent task.
content (str): The content of the agent's response or message.
Attributes meaning
task_status | content |
|---|---|
| working | Ongoing task description or progress update. |
| input-required | Description of the required user input or context. |
| completed | Final response or result of the agent's processing. |
| error | Error message indicating what went wrong during the task execution. |
AgentState Objects
class AgentState(BaseModel, ABC)
Abstract Pydantic model from which agent's state classes should inherit.
This class combines Pydantic's model validation with abstract state management requirements for agent operations. Subclasses should define concrete state models while implementing the required abstract methods.
Attributes
bat_extra (Dict[str, Any]): A dictionary for storing extra state information.
The user should not modify this directly, as it is used internally by the SDK.
bat_buffer (List): A list used as a buffer for intermediate state data.
The user should not modify this directly, as it is used internally by the SDK.
Methods
from_query (**abstract**): Factory method to create an agent state from an initial query
to_task_result (**abstract**): Convert current state to a `AgentTaskResult` object
update_after_checkpoint_restore: Refresh state after checkpoint restoration
is_waiting_for_human_input: Check if agent requires human input
Example
from bat.agent import AgentState, AgentTaskResult
from typing import List, Optional, Self
from typing_extensions import override
class MyAgentState(AgentState):
user_inputs: List[str] = []
assistant_outputs: List[str] = []
question: str = ""
answer: Optional[str] = None
@classmethod
def from_query(cls, query: str) -> Self:
return cls(
user_inputs=[query],
question=query,
)
@override
def update_after_checkpoint_restore(self, query: str) -> None:
self.user_inputs.append(query)
self.question = query
@override
def to_task_result(self) -> AgentTaskResult:
if self.answer is None:
return AgentTaskResult(
task_status="working",
content="Processing your request..."
)
return AgentTaskResult(
task_status="completed",
content=self.answer
)
from_query
@classmethod
@abstractmethod
def from_query(cls, query: str) -> Self
Instantiate agent state from initial query.
Factory method called by the execution framework to create a new state instance. Alternative to direct initialization, allowing state-specific construction logic.
Arguments:
query- Initial user query to bootstrap agent state
Returns:
Self- Fully initialized agent state instance
update_after_checkpoint_restore
def update_after_checkpoint_restore(query: str) -> None
Update state with new query after checkpoint restoration.
Called by the SDK when restoring from a saved checkpoint. Allows the state to synchronize with new execution parameters before resuming the graph.
Arguments:
query- New query to execute with the restored state
to_task_result
@abstractmethod
def to_task_result() -> AgentTaskResult
Convert current state to a task result object.
Used to yield execution results during graph processing. This method defines how the agent's internal state translates to external-facing task results.
Returns:
AgentTaskResult- Task result representation of current state
is_waiting_for_human_input
def is_waiting_for_human_input() -> bool
Check if agent is blocked waiting for human input.
Default implementation returns False. Override in subclasses to implement
human-in-the-loop pausing behavior.
Returns:
bool- True if agent requires human input to proceed, False otherwise