railtracks
The Railtracks Framework for building resilient agentic systems in simple python
1# ------------------------------------------------------------- 2# Copyright (c) Railtown AI. All rights reserved. 3# Licensed under the MIT License. See LICENSE in project root for information. 4# ------------------------------------------------------------- 5"""The Railtracks Framework for building resilient agentic systems in simple python""" 6 7from __future__ import annotations 8 9from dotenv import load_dotenv 10 11__all__ = [ 12 "Session", 13 "session", 14 "call", 15 "broadcast", 16 "call_batch", 17 "interactive", 18 "ExecutionInfo", 19 "ExecutorConfig", 20 "llm", 21 "context", 22 "set_config", 23 "context", 24 "function_node", 25 "agent_node", 26 "integrations", 27 "prebuilt", 28 "MCPStdioParams", 29 "MCPHttpParams", 30 "connect_mcp", 31 "create_mcp_server", 32 "ToolManifest", 33 "session_id", 34] 35 36 37from railtracks.built_nodes.easy_usage_wrappers import ( 38 agent_node, 39 function_node, 40) 41 42from . import context, integrations, llm, prebuilt 43from ._session import ExecutionInfo, Session, session 44from .context.central import session_id, set_config 45from .interaction import broadcast, call, call_batch, interactive 46from .nodes.manifest import ToolManifest 47from .rt_mcp import MCPHttpParams, MCPStdioParams, connect_mcp, create_mcp_server 48from .utils.config import ExecutorConfig 49 50load_dotenv() 51# Do not worry about changing this version number manually. It will updated on release. 52__version__ = "1.0.0"
42class Session: 43 """ 44 The main class for managing an execution session. 45 46 This class is responsible for setting up all the necessary components for running a Railtracks execution, including the coordinator, publisher, and state management. 47 48 For the configuration parameters of the setting. It will follow this precedence: 49 1. The parameters in the `Session` constructor. 50 2. The parameters in global context variables. 51 3. The default values. 52 53 Default Values: 54 - `name`: None 55 - `timeout`: 150.0 seconds 56 - `end_on_error`: False 57 - `logging_setting`: "REGULAR" 58 - `log_file`: None (logs will not be written to a file) 59 - `broadcast_callback`: None (no callback for broadcast messages) 60 - `prompt_injection`: True (the prompt will be automatically injected from context variables) 61 - `save_state`: True (the state of the execution will be saved to a file at the end of the run in the `.railtracks` directory) 62 63 64 Args: 65 name (str | None, optional): Optional name for the session. This name will be included in the saved state file if `save_state` is True. 66 context (Dict[str, Any], optional): A dictionary of global context variables to be used during the execution. 67 timeout (float, optional): The maximum number of seconds to wait for a response to your top-level request. 68 end_on_error (bool, optional): If True, the execution will stop when an exception is encountered. 69 logging_setting (AllowableLogLevels, optional): The setting for the level of logging you would like to have. 70 log_file (str | os.PathLike | None, optional): The file to which the logs will be written. 71 broadcast_callback (Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None, optional): A callback function that will be called with the broadcast messages. 72 prompt_injection (bool, optional): If True, the prompt will be automatically injected from context variables. 73 save_state (bool, optional): If True, the state of the execution will be saved to a file at the end of the run in the `.railtracks` directory. 74 """ 75 76 def __init__( 77 self, 78 context: Dict[str, Any] | None = None, 79 *, 80 name: str | None = None, 81 timeout: float | None = None, 82 end_on_error: bool | None = None, 83 logging_setting: AllowableLogLevels | None = None, 84 log_file: str | os.PathLike | None = None, 85 broadcast_callback: ( 86 Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None 87 ) = None, 88 prompt_injection: bool | None = None, 89 save_state: bool | None = None, 90 ): 91 # first lets read from defaults if nessecary for the provided input config 92 93 self.executor_config = self.global_config_precedence( 94 timeout=timeout, 95 end_on_error=end_on_error, 96 logging_setting=logging_setting, 97 log_file=log_file, 98 broadcast_callback=broadcast_callback, 99 prompt_injection=prompt_injection, 100 save_state=save_state, 101 ) 102 103 if context is None: 104 context = {} 105 106 self.name = name 107 108 prepare_logger( 109 setting=self.executor_config.logging_setting, 110 path=self.executor_config.log_file, 111 ) 112 self.publisher: RTPublisher = RTPublisher() 113 114 self._identifier = str(uuid.uuid4()) 115 116 executor_info = ExecutionInfo.create_new() 117 self.coordinator = Coordinator( 118 execution_modes={"async": AsyncioExecutionStrategy()} 119 ) 120 self.rt_state = RTState( 121 executor_info, self.executor_config, self.coordinator, self.publisher 122 ) 123 124 self.coordinator.start(self.publisher) 125 self._setup_subscriber() 126 register_globals( 127 session_id=self._identifier, 128 rt_publisher=self.publisher, 129 parent_id=None, 130 executor_config=self.executor_config, 131 global_context_vars=context, 132 ) 133 134 self._start_time = time.time() 135 136 logger.debug("Session %s is initialized" % self._identifier) 137 138 @classmethod 139 def global_config_precedence( 140 cls, 141 timeout: float | None, 142 end_on_error: bool | None, 143 logging_setting: AllowableLogLevels | None, 144 log_file: str | os.PathLike | None, 145 broadcast_callback: ( 146 Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None 147 ), 148 prompt_injection: bool | None, 149 save_state: bool | None, 150 ) -> ExecutorConfig: 151 """ 152 Uses the following precedence order to determine the configuration parameters: 153 1. The parameters in the method parameters. 154 2. The parameters in global context variables. 155 3. The default values. 156 """ 157 global_executor_config = get_global_config() 158 159 return global_executor_config.precedence_overwritten( 160 timeout=timeout, 161 end_on_error=end_on_error, 162 logging_setting=logging_setting, 163 log_file=log_file, 164 subscriber=broadcast_callback, 165 prompt_injection=prompt_injection, 166 save_state=save_state, 167 ) 168 169 def __enter__(self): 170 return self 171 172 def __exit__(self, exc_type, exc_val, exc_tb): 173 if self.executor_config.save_state: 174 try: 175 railtracks_dir = Path(".railtracks") 176 railtracks_dir.mkdir( 177 exist_ok=True 178 ) # Creates if doesn't exist, skips otherwise. 179 180 # Try to create file path with name, fallback to identifier only if there's an issue 181 try: 182 file_path = ( 183 railtracks_dir / f"{self.name}_{self._identifier}.json" 184 if self.name 185 else railtracks_dir / f"{self._identifier}.json" 186 ) 187 file_path.touch() 188 except FileNotFoundError: 189 logger.warning( 190 get_message( 191 ExceptionMessageKey.INVALID_SESSION_FILE_NAME_WARN 192 ).format(name=self.name, identifier=self._identifier) 193 ) 194 file_path = railtracks_dir / f"{self._identifier}.json" 195 196 logger.info("Saving execution info to %s" % file_path) 197 198 file_path.write_text(json.dumps(self.payload())) 199 except Exception as e: 200 logger.error( 201 "Error while saving to execution info to file", 202 exc_info=e, 203 ) 204 205 self._close() 206 207 def _setup_subscriber(self): 208 """ 209 Prepares and attaches the saved broadcast_callback to the publisher attached to this runner. 210 """ 211 212 if self.executor_config.subscriber is not None: 213 self.publisher.subscribe( 214 stream_subscriber(self.executor_config.subscriber), 215 name="Streaming Subscriber", 216 ) 217 218 def _close(self): 219 """ 220 Closes the runner and cleans up all resources. 221 222 - Shuts down the state object 223 - Detaches logging handlers so they aren't duplicated 224 - Deletes all the global variables that were registered in the context 225 """ 226 # the publisher should have already been closed in `_run_base` 227 self.rt_state.shutdown() 228 detach_logging_handlers() 229 delete_globals() 230 # by deleting all of the state variables we are ensuring that the next time we create a runner it is fresh 231 232 @property 233 def info(self) -> ExecutionInfo: 234 """ 235 Returns the current state of the runner. 236 237 This is useful for debugging and viewing the current state of the run. 238 """ 239 return self.rt_state.info 240 241 def payload(self) -> Dict[str, Any]: 242 """ 243 Gets the complete json payload tied to this session. 244 245 The outputted json schema is maintained in (link here) 246 """ 247 info = self.info 248 249 run_list = info.graph_serialization() 250 251 full_dict = { 252 "session_id": self._identifier, 253 "session_name": self.name, 254 "start_time": self._start_time, 255 "end_time": time.time(), 256 "runs": run_list, 257 } 258 259 return json.loads(json.dumps(full_dict))
The main class for managing an execution session.
This class is responsible for setting up all the necessary components for running a Railtracks execution, including the coordinator, publisher, and state management.
For the configuration parameters of the setting. It will follow this precedence:
- The parameters in the
Sessionconstructor. - The parameters in global context variables.
- The default values.
Default Values:
name: Nonetimeout: 150.0 secondsend_on_error: Falselogging_setting: "REGULAR"log_file: None (logs will not be written to a file)broadcast_callback: None (no callback for broadcast messages)prompt_injection: True (the prompt will be automatically injected from context variables)save_state: True (the state of the execution will be saved to a file at the end of the run in the.railtracksdirectory)
Arguments:
- name (str | None, optional): Optional name for the session. This name will be included in the saved state file if
save_stateis True. - context (Dict[str, Any], optional): A dictionary of global context variables to be used during the execution.
- timeout (float, optional): The maximum number of seconds to wait for a response to your top-level request.
- end_on_error (bool, optional): If True, the execution will stop when an exception is encountered.
- logging_setting (AllowableLogLevels, optional): The setting for the level of logging you would like to have.
- log_file (str | os.PathLike | None, optional): The file to which the logs will be written.
- broadcast_callback (Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None, optional): A callback function that will be called with the broadcast messages.
- prompt_injection (bool, optional): If True, the prompt will be automatically injected from context variables.
- save_state (bool, optional): If True, the state of the execution will be saved to a file at the end of the run in the
.railtracksdirectory.
76 def __init__( 77 self, 78 context: Dict[str, Any] | None = None, 79 *, 80 name: str | None = None, 81 timeout: float | None = None, 82 end_on_error: bool | None = None, 83 logging_setting: AllowableLogLevels | None = None, 84 log_file: str | os.PathLike | None = None, 85 broadcast_callback: ( 86 Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None 87 ) = None, 88 prompt_injection: bool | None = None, 89 save_state: bool | None = None, 90 ): 91 # first lets read from defaults if nessecary for the provided input config 92 93 self.executor_config = self.global_config_precedence( 94 timeout=timeout, 95 end_on_error=end_on_error, 96 logging_setting=logging_setting, 97 log_file=log_file, 98 broadcast_callback=broadcast_callback, 99 prompt_injection=prompt_injection, 100 save_state=save_state, 101 ) 102 103 if context is None: 104 context = {} 105 106 self.name = name 107 108 prepare_logger( 109 setting=self.executor_config.logging_setting, 110 path=self.executor_config.log_file, 111 ) 112 self.publisher: RTPublisher = RTPublisher() 113 114 self._identifier = str(uuid.uuid4()) 115 116 executor_info = ExecutionInfo.create_new() 117 self.coordinator = Coordinator( 118 execution_modes={"async": AsyncioExecutionStrategy()} 119 ) 120 self.rt_state = RTState( 121 executor_info, self.executor_config, self.coordinator, self.publisher 122 ) 123 124 self.coordinator.start(self.publisher) 125 self._setup_subscriber() 126 register_globals( 127 session_id=self._identifier, 128 rt_publisher=self.publisher, 129 parent_id=None, 130 executor_config=self.executor_config, 131 global_context_vars=context, 132 ) 133 134 self._start_time = time.time() 135 136 logger.debug("Session %s is initialized" % self._identifier)
138 @classmethod 139 def global_config_precedence( 140 cls, 141 timeout: float | None, 142 end_on_error: bool | None, 143 logging_setting: AllowableLogLevels | None, 144 log_file: str | os.PathLike | None, 145 broadcast_callback: ( 146 Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None 147 ), 148 prompt_injection: bool | None, 149 save_state: bool | None, 150 ) -> ExecutorConfig: 151 """ 152 Uses the following precedence order to determine the configuration parameters: 153 1. The parameters in the method parameters. 154 2. The parameters in global context variables. 155 3. The default values. 156 """ 157 global_executor_config = get_global_config() 158 159 return global_executor_config.precedence_overwritten( 160 timeout=timeout, 161 end_on_error=end_on_error, 162 logging_setting=logging_setting, 163 log_file=log_file, 164 subscriber=broadcast_callback, 165 prompt_injection=prompt_injection, 166 save_state=save_state, 167 )
Uses the following precedence order to determine the configuration parameters:
- The parameters in the method parameters.
- The parameters in global context variables.
- The default values.
232 @property 233 def info(self) -> ExecutionInfo: 234 """ 235 Returns the current state of the runner. 236 237 This is useful for debugging and viewing the current state of the run. 238 """ 239 return self.rt_state.info
Returns the current state of the runner.
This is useful for debugging and viewing the current state of the run.
241 def payload(self) -> Dict[str, Any]: 242 """ 243 Gets the complete json payload tied to this session. 244 245 The outputted json schema is maintained in (link here) 246 """ 247 info = self.info 248 249 run_list = info.graph_serialization() 250 251 full_dict = { 252 "session_id": self._identifier, 253 "session_name": self.name, 254 "start_time": self._start_time, 255 "end_time": time.time(), 256 "runs": run_list, 257 } 258 259 return json.loads(json.dumps(full_dict))
Gets the complete json payload tied to this session.
The outputted json schema is maintained in (link here)
320def session( 321 func: Callable[_P, Coroutine[Any, Any, _TOutput]] | None = None, 322 *, 323 name: str | None = None, 324 context: Dict[str, Any] | None = None, 325 timeout: float | None = None, 326 end_on_error: bool | None = None, 327 logging_setting: AllowableLogLevels | None = None, 328 log_file: str | os.PathLike | None = None, 329 broadcast_callback: ( 330 Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None 331 ) = None, 332 prompt_injection: bool | None = None, 333 save_state: bool | None = None, 334) -> ( 335 Callable[_P, Coroutine[Any, Any, Tuple[_TOutput, Session]]] 336 | Callable[ 337 [Callable[_P, Coroutine[Any, Any, _TOutput]]], 338 Callable[_P, Coroutine[Any, Any, Tuple[_TOutput, Session]]], 339 ] 340): 341 """ 342 This decorator automatically creates and manages a Session context for the decorated function, 343 allowing async functions to use Railtracks operations without manually managing the session lifecycle. 344 345 Can be used as: 346 - @session (without parentheses) - uses default settings 347 - @session() (with empty parentheses) - uses default settings 348 - @session(name="my_task", timeout=30) (with configuration parameters) 349 350 When using this decorator, the function returns a tuple containing: 351 1. The original function's return value 352 2. The Session object used during execution 353 354 This allows access to session information (like execution state, logs, etc.) after the function completes, 355 while maintaining the simplicity of decorator usage. 356 357 Args: 358 name (str | None, optional): Optional name for the session. This name will be included in the saved state file if `save_state` is True. 359 context (Dict[str, Any], optional): A dictionary of global context variables to be used during the execution. 360 timeout (float, optional): The maximum number of seconds to wait for a response to your top-level request. 361 end_on_error (bool, optional): If True, the execution will stop when an exception is encountered. 362 logging_setting (AllowableLogLevels, optional): The setting for the level of logging you would like to have. 363 log_file (str | os.PathLike | None, optional): The file to which the logs will be written. 364 broadcast_callback (Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None, optional): A callback function that will be called with the broadcast messages. 365 prompt_injection (bool, optional): If True, the prompt will be automatically injected from context variables. 366 save_state (bool, optional): If True, the state of the execution will be saved to a file at the end of the run in the `.railtracks` directory. 367 368 Returns: 369 When used as @session (without parentheses): Returns the decorated function that returns (result, session). 370 When used as @session(...) (with parameters): Returns a decorator function that takes an async function 371 and returns a new async function that returns (result, session). 372 """ 373 374 def decorator( 375 target_func: Callable[_P, Coroutine[Any, Any, _TOutput]], 376 ) -> Callable[_P, Coroutine[Any, Any, Tuple[_TOutput, Session]]]: 377 # Validate that the decorated function is async 378 if not inspect.iscoroutinefunction(target_func): 379 raise TypeError( 380 f"@session decorator can only be applied to async functions. " 381 f"Function '{target_func.__name__}' is not async. " 382 f"Add 'async' keyword to your function definition." 383 ) 384 385 @wraps(target_func) 386 async def wrapper( 387 *args: _P.args, **kwargs: _P.kwargs 388 ) -> Tuple[_TOutput, Session]: 389 session_obj = Session( 390 context=context, 391 timeout=timeout, 392 end_on_error=end_on_error, 393 logging_setting=logging_setting, 394 log_file=log_file, 395 broadcast_callback=broadcast_callback, 396 name=name, 397 prompt_injection=prompt_injection, 398 save_state=save_state, 399 ) 400 401 with session_obj: 402 result = await target_func(*args, **kwargs) 403 return result, session_obj 404 405 return wrapper 406 407 # If used as @session without parentheses 408 if func is not None: 409 return decorator(func) 410 411 # If used as @session(...) 412 return decorator
This decorator automatically creates and manages a Session context for the decorated function, allowing async functions to use Railtracks operations without manually managing the session lifecycle.
Can be used as:
- @session (without parentheses) - uses default settings
- @session() (with empty parentheses) - uses default settings
- @session(name="my_task", timeout=30) (with configuration parameters)
When using this decorator, the function returns a tuple containing:
- The original function's return value
- The Session object used during execution
This allows access to session information (like execution state, logs, etc.) after the function completes, while maintaining the simplicity of decorator usage.
Arguments:
- name (str | None, optional): Optional name for the session. This name will be included in the saved state file if
save_stateis True. - context (Dict[str, Any], optional): A dictionary of global context variables to be used during the execution.
- timeout (float, optional): The maximum number of seconds to wait for a response to your top-level request.
- end_on_error (bool, optional): If True, the execution will stop when an exception is encountered.
- logging_setting (AllowableLogLevels, optional): The setting for the level of logging you would like to have.
- log_file (str | os.PathLike | None, optional): The file to which the logs will be written.
- broadcast_callback (Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None, optional): A callback function that will be called with the broadcast messages.
- prompt_injection (bool, optional): If True, the prompt will be automatically injected from context variables.
- save_state (bool, optional): If True, the state of the execution will be saved to a file at the end of the run in the
.railtracksdirectory.
Returns:
When used as @session (without parentheses): Returns the decorated function that returns (result, session). When used as @session(...) (with parameters): Returns a decorator function that takes an async function and returns a new async function that returns (result, session).
60async def call( 61 node_: Callable[_P, Node[_TOutput]] | RTFunction[_P, _TOutput], 62 *args: _P.args, 63 **kwargs: _P.kwargs, 64) -> _TOutput: 65 """ 66 Call a node from within a node inside the framework. This will return a coroutine that you can interact with 67 in whatever way using async/await logic. 68 69 Usage: 70 ```python 71 # for sequential operation 72 result = await call(NodeA, "hello world", 42) 73 74 # for parallel operation 75 tasks = [call(NodeA, "hello world", i) for i in range(10)] 76 results = await asyncio.gather(*tasks) 77 ``` 78 79 Args: 80 node: The node type you would like to create. This could be a function decorated with `@function_node`, a function, or a Node instance. 81 *args: The arguments to pass to the node 82 **kwargs: The keyword arguments to pass to the node 83 """ 84 node: Callable[_P, Node[_TOutput]] 85 # this entire section is a bit of a typing nightmare becuase all overloads we provide. 86 if isinstance(node_, FunctionType): 87 node = extract_node_from_function(node_) 88 else: 89 node = node_ 90 91 # if the context is none then we will need to create a wrapper for the state object to work with. 92 if not is_context_present(): 93 # we have to use lazy import here to prevent a circular import issue. This is a must have unfortunately. 94 from railtracks import Session 95 96 with Session(): 97 result = await _start(node, args=args, kwargs=kwargs) 98 return result 99 100 # if the context is not active then we know this is the top level request 101 if not is_context_active(): 102 result = await _start(node, args=args, kwargs=kwargs) 103 return result 104 105 # if the context is active then we can just run the node 106 result = await _run(node, args=args, kwargs=kwargs) 107 return result
Call a node from within a node inside the framework. This will return a coroutine that you can interact with in whatever way using async/await logic.
Usage:
# for sequential operation
result = await call(NodeA, "hello world", 42)
# for parallel operation
tasks = [call(NodeA, "hello world", i) for i in range(10)]
results = await asyncio.gather(*tasks)
Arguments:
- node: The node type you would like to create. This could be a function decorated with
@function_node, a function, or a Node instance. - *args: The arguments to pass to the node
- **kwargs: The keyword arguments to pass to the node
6async def broadcast(item: str): 7 """ 8 Streams the given message 9 10 This will trigger the broadcast_callback callback you have already provided. 11 12 Args: 13 item (str): The item you want to stream. 14 """ 15 publisher = get_publisher() 16 17 await publisher.publish(Streaming(node_id=get_parent_id(), streamed_object=item))
Streams the given message
This will trigger the broadcast_callback callback you have already provided.
Arguments:
- item (str): The item you want to stream.
27async def call_batch( 28 node: Callable[..., Node[_TOutput]] 29 | Callable[..., _TOutput] 30 | _AsyncNodeAttachedFunc[_P, _TOutput] 31 | _SyncNodeAttachedFunc[_P, _TOutput], 32 *iterables: Iterable[Any], 33 return_exceptions: bool = True, 34): 35 """ 36 Complete a node over multiple iterables, allowing for parallel execution. 37 38 Note the results will be returned in the order of the iterables, not the order of completion. 39 40 If one of the nodes returns an exception, the thrown exception will be included as a response. 41 42 Args: 43 node: The node type to create. 44 *iterables: The iterables to map the node over. 45 return_exceptions: If True, exceptions will be returned as part of the results. 46 If False, exceptions will be raised immediately, and you will lose access to the results. 47 Defaults to true. 48 49 Returns: 50 An iterable of results from the node. 51 52 Usage: 53 ```python 54 results = await batch(NodeA, ["hello world"] * 10) 55 for result in results: 56 handle(result) 57 ``` 58 """ 59 # this is big typing disaster but there is no way around it. Try if if you want to. 60 contracts = [call(node, *args) for args in zip(*iterables)] 61 62 results = await asyncio.gather(*contracts, return_exceptions=return_exceptions) 63 return results
Complete a node over multiple iterables, allowing for parallel execution.
Note the results will be returned in the order of the iterables, not the order of completion.
If one of the nodes returns an exception, the thrown exception will be included as a response.
Arguments:
- node: The node type to create.
- *iterables: The iterables to map the node over.
- return_exceptions: If True, exceptions will be returned as part of the results. If False, exceptions will be raised immediately, and you will lose access to the results. Defaults to true.
Returns:
An iterable of results from the node.
Usage:
results = await batch(NodeA, ["hello world"] * 10) for result in results: handle(result)
19class ExecutionInfo: 20 """ 21 A class that contains the full details of the state of a run at any given point in time. 22 23 The class is designed to be used as a snapshot of state that can be used to display the state of the run, or to 24 create a graphical representation of the system. 25 """ 26 27 def __init__( 28 self, 29 request_forest: RequestForest, 30 node_forest: NodeForest, 31 stamper: StampManager, 32 ): 33 self.request_forest = request_forest 34 self.node_forest = node_forest 35 self.stamper = stamper 36 37 @classmethod 38 def default(cls) -> ExecutionInfo: 39 """Creates a new "empty" instance of the ExecutionInfo class with the default values.""" 40 return cls.create_new() 41 42 @classmethod 43 def create_new( 44 cls, 45 ) -> ExecutionInfo: 46 """ 47 Creates a new empty instance of state variables with the provided executor configuration. 48 49 """ 50 request_heap = RequestForest() 51 node_heap = NodeForest() 52 stamper = StampManager() 53 54 return ExecutionInfo( 55 request_forest=request_heap, 56 node_forest=node_heap, 57 stamper=stamper, 58 ) 59 60 @property 61 def answer(self): 62 """Convenience method to access the answer of the run.""" 63 return self.request_forest.answer 64 65 @property 66 def all_stamps(self) -> List[Stamp]: 67 """Convenience method to access all the stamps of the run.""" 68 return self.stamper.all_stamps 69 70 @property 71 def name(self): 72 """ 73 Gets the name of the graph by pulling the name of the insertion request. It will raise a ValueError if the insertion 74 request is not present or there are multiple insertion requests. 75 """ 76 insertion_requests = self.insertion_requests 77 78 if len(insertion_requests) >= 2: 79 raise ValueError( 80 "You cannot get the name of a graph with multiple insertion requests" 81 ) 82 83 if len(insertion_requests) == 0: 84 raise ValueError( 85 "You cannot get the name of a graph with no insertion requests" 86 ) 87 88 i_r = insertion_requests[0] 89 90 return self.node_forest.get_node_type(i_r.sink_id).name() 91 92 @property 93 def insertion_requests(self): 94 """A convenience method to access all the insertion requests of the run.""" 95 return self.request_forest.insertion_request 96 97 def _get_info(self, ids: List[str] | str | None = None) -> ExecutionInfo: 98 """ 99 Gets a subset of the current state based on the provided node ids. It will contain all the children of the provided node ids 100 101 Note: If no ids are provided, the full state is returned. 102 103 Args: 104 ids (List[str] | str | None): A list of node ids to filter the state by. If None, the full state is returned. 105 106 Returns: 107 ExecutionInfo: A new instance of ExecutionInfo containing only the children of the provided ids. 108 109 """ 110 if ids is None: 111 return self 112 else: 113 # firstly lets 114 if isinstance(ids, str): 115 ids = [ids] 116 117 # we need to quickly check to make sure these ids are valid 118 for identifier in ids: 119 if identifier not in self.request_forest: 120 raise ValueError( 121 f"Identifier '{identifier}' not found in the current state." 122 ) 123 124 new_node_forest, new_request_forest = create_sub_state_info( 125 self.node_forest.heap(), 126 self.request_forest.heap(), 127 ids, 128 ) 129 return ExecutionInfo( 130 node_forest=new_node_forest, 131 request_forest=new_request_forest, 132 stamper=self.stamper, 133 ) 134 135 def _to_graph(self) -> Tuple[List[Vertex], List[Edge]]: 136 """ 137 Converts the current state into its graph representation. 138 139 Returns: 140 List[Node]: An iterable of nodes in the graph. 141 List[Edge]: An iterable of edges in the graph. 142 """ 143 return self.node_forest.to_vertices(), self.request_forest.to_edges() 144 145 def graph_serialization(self) -> dict[str, Any]: 146 """ 147 Creates a string (JSON) representation of this info object designed to be used to construct a graph for this 148 info object. 149 150 Some important notes about its structure are outlined below: 151 - The `nodes` key contains a list of all the nodes in the graph, represented as `Vertex` objects. 152 - The `edges` key contains a list of all the edges in the graph, represented as `Edge` objects. 153 - The `stamps` key contains an ease of use list of all the stamps associated with the run, represented as `Stamp` objects. 154 155 - The "nodes" and "requests" key will be outlined with normal graph details like connections and identifiers in addition to a loose details object. 156 - However, both will carry an addition param called "stamp" which is a timestamp style object. 157 - They also will carry a "parent" param which is a recursive structure that allows you to traverse the graph in time. 158 159 160 ``` 161 """ 162 parent_nodes = [x.identifier for x in self.insertion_requests] 163 164 infos = [self._get_info(parent_node) for parent_node in parent_nodes] 165 166 runs = [] 167 168 for info, parent_node_id in zip(infos, parent_nodes): 169 insertion_requests = info.request_forest.insertion_request 170 171 assert len(insertion_requests) == 1 172 parent_request = insertion_requests[0] 173 174 all_parents = parent_request.get_all_parents() 175 176 start_time = all_parents[-1].stamp.time 177 178 assert len([x for x in all_parents if x.status == "Completed"]) <= 1 179 end_time = None 180 for req in all_parents: 181 if req.status in ["Completed", "Failed"]: 182 end_time = req.stamp.time 183 break 184 185 entry = { 186 "name": info.name, 187 "run_id": parent_node_id, 188 "nodes": info.node_forest.to_vertices(), 189 "status": parent_request.status, 190 "edges": info.request_forest.to_edges(), 191 "steps": _get_stamps_from_forests( 192 info.node_forest, info.request_forest 193 ), 194 "start_time": start_time, 195 "end_time": end_time, 196 } 197 runs.append(entry) 198 199 return json.loads( 200 json.dumps( 201 runs, 202 cls=RTJSONEncoder, 203 ) 204 )
A class that contains the full details of the state of a run at any given point in time.
The class is designed to be used as a snapshot of state that can be used to display the state of the run, or to create a graphical representation of the system.
37 @classmethod 38 def default(cls) -> ExecutionInfo: 39 """Creates a new "empty" instance of the ExecutionInfo class with the default values.""" 40 return cls.create_new()
Creates a new "empty" instance of the ExecutionInfo class with the default values.
42 @classmethod 43 def create_new( 44 cls, 45 ) -> ExecutionInfo: 46 """ 47 Creates a new empty instance of state variables with the provided executor configuration. 48 49 """ 50 request_heap = RequestForest() 51 node_heap = NodeForest() 52 stamper = StampManager() 53 54 return ExecutionInfo( 55 request_forest=request_heap, 56 node_forest=node_heap, 57 stamper=stamper, 58 )
Creates a new empty instance of state variables with the provided executor configuration.
60 @property 61 def answer(self): 62 """Convenience method to access the answer of the run.""" 63 return self.request_forest.answer
Convenience method to access the answer of the run.
65 @property 66 def all_stamps(self) -> List[Stamp]: 67 """Convenience method to access all the stamps of the run.""" 68 return self.stamper.all_stamps
Convenience method to access all the stamps of the run.
70 @property 71 def name(self): 72 """ 73 Gets the name of the graph by pulling the name of the insertion request. It will raise a ValueError if the insertion 74 request is not present or there are multiple insertion requests. 75 """ 76 insertion_requests = self.insertion_requests 77 78 if len(insertion_requests) >= 2: 79 raise ValueError( 80 "You cannot get the name of a graph with multiple insertion requests" 81 ) 82 83 if len(insertion_requests) == 0: 84 raise ValueError( 85 "You cannot get the name of a graph with no insertion requests" 86 ) 87 88 i_r = insertion_requests[0] 89 90 return self.node_forest.get_node_type(i_r.sink_id).name()
Gets the name of the graph by pulling the name of the insertion request. It will raise a ValueError if the insertion request is not present or there are multiple insertion requests.
92 @property 93 def insertion_requests(self): 94 """A convenience method to access all the insertion requests of the run.""" 95 return self.request_forest.insertion_request
A convenience method to access all the insertion requests of the run.
145 def graph_serialization(self) -> dict[str, Any]: 146 """ 147 Creates a string (JSON) representation of this info object designed to be used to construct a graph for this 148 info object. 149 150 Some important notes about its structure are outlined below: 151 - The `nodes` key contains a list of all the nodes in the graph, represented as `Vertex` objects. 152 - The `edges` key contains a list of all the edges in the graph, represented as `Edge` objects. 153 - The `stamps` key contains an ease of use list of all the stamps associated with the run, represented as `Stamp` objects. 154 155 - The "nodes" and "requests" key will be outlined with normal graph details like connections and identifiers in addition to a loose details object. 156 - However, both will carry an addition param called "stamp" which is a timestamp style object. 157 - They also will carry a "parent" param which is a recursive structure that allows you to traverse the graph in time. 158 159 160 ``` 161 """ 162 parent_nodes = [x.identifier for x in self.insertion_requests] 163 164 infos = [self._get_info(parent_node) for parent_node in parent_nodes] 165 166 runs = [] 167 168 for info, parent_node_id in zip(infos, parent_nodes): 169 insertion_requests = info.request_forest.insertion_request 170 171 assert len(insertion_requests) == 1 172 parent_request = insertion_requests[0] 173 174 all_parents = parent_request.get_all_parents() 175 176 start_time = all_parents[-1].stamp.time 177 178 assert len([x for x in all_parents if x.status == "Completed"]) <= 1 179 end_time = None 180 for req in all_parents: 181 if req.status in ["Completed", "Failed"]: 182 end_time = req.stamp.time 183 break 184 185 entry = { 186 "name": info.name, 187 "run_id": parent_node_id, 188 "nodes": info.node_forest.to_vertices(), 189 "status": parent_request.status, 190 "edges": info.request_forest.to_edges(), 191 "steps": _get_stamps_from_forests( 192 info.node_forest, info.request_forest 193 ), 194 "start_time": start_time, 195 "end_time": end_time, 196 } 197 runs.append(entry) 198 199 return json.loads( 200 json.dumps( 201 runs, 202 cls=RTJSONEncoder, 203 ) 204 )
Creates a string (JSON) representation of this info object designed to be used to construct a graph for this info object.
Some important notes about its structure are outlined below:
- The `nodes` key contains a list of all the nodes in the graph, represented as `Vertex` objects.
- The `edges` key contains a list of all the edges in the graph, represented as `Edge` objects.
- The `stamps` key contains an ease of use list of all the stamps associated with the run, represented as `Stamp` objects.
- The "nodes" and "requests" key will be outlined with normal graph details like connections and identifiers in addition to a loose details object.
- However, both will carry an addition param called "stamp" which is a timestamp style object.
- They also will carry a "parent" param which is a recursive structure that allows you to traverse the graph in time.
```
10class ExecutorConfig: 11 def __init__( 12 self, 13 *, 14 timeout: float = 150.0, 15 end_on_error: bool = False, 16 logging_setting: AllowableLogLevels = "REGULAR", 17 log_file: str | os.PathLike | None = None, 18 broadcast_callback: ( 19 Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None 20 ) = None, 21 prompt_injection: bool = True, 22 save_state: bool = True, 23 ): 24 """ 25 ExecutorConfig is special configuration object designed to allow customization of the executor in the RT system. 26 27 Args: 28 timeout (float): The maximum number of seconds to wait for a response to your top level request 29 end_on_error (bool): If true, the executor will stop execution when an exception is encountered. 30 logging_setting (AllowableLogLevels): The setting for the level of logging you would like to have. 31 log_file (str | os.PathLike | None): The file to which the logs will be written. If None, no file will be created. 32 broadcast_callback (Callable or Coroutine): A function or coroutine that will handle streaming messages. 33 prompt_injection (bool): If true, prompts can be injected with global context 34 save_state (bool): If true, the state of the executor will be saved to disk. 35 """ 36 self.timeout = timeout 37 self.end_on_error = end_on_error 38 self.logging_setting = logging_setting 39 self.subscriber = broadcast_callback 40 self.log_file = log_file 41 self.prompt_injection = prompt_injection 42 self.save_state = save_state 43 44 @property 45 def logging_setting(self) -> AllowableLogLevels: 46 return self._logging_setting 47 48 @logging_setting.setter 49 def logging_setting(self, value: AllowableLogLevels): 50 if value not in allowable_log_levels_set: 51 raise ValueError( 52 f"logging_setting must be one of {allowable_log_levels_set}, got {value}" 53 ) 54 self._logging_setting: AllowableLogLevels = value 55 56 def precedence_overwritten( 57 self, 58 *, 59 timeout: float | None = None, 60 end_on_error: bool | None = None, 61 logging_setting: AllowableLogLevels | None = None, 62 log_file: str | os.PathLike | None = None, 63 subscriber: ( 64 Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None 65 ) = None, 66 prompt_injection: bool | None = None, 67 save_state: bool | None = None, 68 ): 69 """ 70 If any of the parameters are provided (not None), it will create a new update the current instance with the new values and return a deep copied reference to it. 71 """ 72 return ExecutorConfig( 73 timeout=timeout if timeout is not None else self.timeout, 74 end_on_error=end_on_error 75 if end_on_error is not None 76 else self.end_on_error, 77 logging_setting=logging_setting 78 if logging_setting is not None 79 else self.logging_setting, 80 log_file=log_file if log_file is not None else self.log_file, 81 broadcast_callback=subscriber 82 if subscriber is not None 83 else self.subscriber, 84 prompt_injection=prompt_injection 85 if prompt_injection is not None 86 else self.prompt_injection, 87 save_state=save_state if save_state is not None else self.save_state, 88 ) 89 90 def __repr__(self): 91 return ( 92 f"ExecutorConfig(timeout={self.timeout}, end_on_error={self.end_on_error}, " 93 f"logging_setting={self.logging_setting}, log_file={self.log_file}, " 94 f"prompt_injection={self.prompt_injection}, " 95 f"save_state={self.save_state})" 96 )
11 def __init__( 12 self, 13 *, 14 timeout: float = 150.0, 15 end_on_error: bool = False, 16 logging_setting: AllowableLogLevels = "REGULAR", 17 log_file: str | os.PathLike | None = None, 18 broadcast_callback: ( 19 Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None 20 ) = None, 21 prompt_injection: bool = True, 22 save_state: bool = True, 23 ): 24 """ 25 ExecutorConfig is special configuration object designed to allow customization of the executor in the RT system. 26 27 Args: 28 timeout (float): The maximum number of seconds to wait for a response to your top level request 29 end_on_error (bool): If true, the executor will stop execution when an exception is encountered. 30 logging_setting (AllowableLogLevels): The setting for the level of logging you would like to have. 31 log_file (str | os.PathLike | None): The file to which the logs will be written. If None, no file will be created. 32 broadcast_callback (Callable or Coroutine): A function or coroutine that will handle streaming messages. 33 prompt_injection (bool): If true, prompts can be injected with global context 34 save_state (bool): If true, the state of the executor will be saved to disk. 35 """ 36 self.timeout = timeout 37 self.end_on_error = end_on_error 38 self.logging_setting = logging_setting 39 self.subscriber = broadcast_callback 40 self.log_file = log_file 41 self.prompt_injection = prompt_injection 42 self.save_state = save_state
ExecutorConfig is special configuration object designed to allow customization of the executor in the RT system.
Arguments:
- timeout (float): The maximum number of seconds to wait for a response to your top level request
- end_on_error (bool): If true, the executor will stop execution when an exception is encountered.
- logging_setting (AllowableLogLevels): The setting for the level of logging you would like to have.
- log_file (str | os.PathLike | None): The file to which the logs will be written. If None, no file will be created.
- broadcast_callback (Callable or Coroutine): A function or coroutine that will handle streaming messages.
- prompt_injection (bool): If true, prompts can be injected with global context
- save_state (bool): If true, the state of the executor will be saved to disk.
56 def precedence_overwritten( 57 self, 58 *, 59 timeout: float | None = None, 60 end_on_error: bool | None = None, 61 logging_setting: AllowableLogLevels | None = None, 62 log_file: str | os.PathLike | None = None, 63 subscriber: ( 64 Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None 65 ) = None, 66 prompt_injection: bool | None = None, 67 save_state: bool | None = None, 68 ): 69 """ 70 If any of the parameters are provided (not None), it will create a new update the current instance with the new values and return a deep copied reference to it. 71 """ 72 return ExecutorConfig( 73 timeout=timeout if timeout is not None else self.timeout, 74 end_on_error=end_on_error 75 if end_on_error is not None 76 else self.end_on_error, 77 logging_setting=logging_setting 78 if logging_setting is not None 79 else self.logging_setting, 80 log_file=log_file if log_file is not None else self.log_file, 81 broadcast_callback=subscriber 82 if subscriber is not None 83 else self.subscriber, 84 prompt_injection=prompt_injection 85 if prompt_injection is not None 86 else self.prompt_injection, 87 save_state=save_state if save_state is not None else self.save_state, 88 )
If any of the parameters are provided (not None), it will create a new update the current instance with the new values and return a deep copied reference to it.
359def set_config( 360 *, 361 timeout: float | None = None, 362 end_on_error: bool | None = None, 363 logging_setting: AllowableLogLevels | None = None, 364 log_file: str | os.PathLike | None = None, 365 broadcast_callback: ( 366 Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None 367 ) = None, 368 prompt_injection: bool | None = None, 369 save_state: bool | None = None, 370): 371 """ 372 Sets the global configuration for the executor. This will be propagated to all new runners created after this call. 373 374 - If you call this function after the runner has been created, it will not affect the current runner. 375 - This function will only overwrite the values that are provided, leaving the rest unchanged. 376 377 378 """ 379 380 if is_context_active(): 381 warnings.warn( 382 "The executor config is being set after the runner has been created, this is not recommended" 383 ) 384 385 config = global_executor_config.get() 386 new_config = config.precedence_overwritten( 387 timeout=timeout, 388 end_on_error=end_on_error, 389 logging_setting=logging_setting, 390 log_file=log_file, 391 subscriber=broadcast_callback, 392 prompt_injection=prompt_injection, 393 save_state=save_state, 394 ) 395 396 global_executor_config.set(new_config)
Sets the global configuration for the executor. This will be propagated to all new runners created after this call.
- If you call this function after the runner has been created, it will not affect the current runner.
- This function will only overwrite the values that are provided, leaving the rest unchanged.
173def function_node( 174 func: Callable[_P, Coroutine[None, None, _TOutput] | _TOutput] 175 | List[Callable[_P, Coroutine[None, None, _TOutput] | _TOutput]], 176 /, 177 *, 178 name: str | None = None, 179 manifest: ToolManifest | None = None, 180) -> ( 181 Callable[_P, Coroutine[None, None, _TOutput] | _TOutput] 182 | List[Callable[_P, Coroutine[None, None, _TOutput] | _TOutput]] 183 | None 184): 185 """ 186 Creates a new Node type from a function that can be used in `rt.call()`. 187 188 By default, it will parse the function's docstring and turn them into tool details and parameters. However, if 189 you provide custom ToolManifest it will override that logic. 190 191 WARNING: If you overriding tool parameters. It is on you to make sure they will work with your function. 192 193 NOTE: If you have already converted this function to a node this function will do nothing 194 195 Args: 196 func (Callable): The function to convert into a Node. 197 name (str, optional): Human-readable name for the node/tool. 198 manifest (ToolManifest, optional): The details you would like to override the tool with. 199 """ 200 201 # handle the case where a list of functions is provided 202 if isinstance(func, list): 203 return [function_node(f, name=name, manifest=manifest) for f in func] 204 205 # check if the function has already been converted to a node 206 if hasattr(func, "node_type"): 207 warnings.warn( 208 "The provided function has already been converted to a node.", 209 UserWarning, 210 ) 211 return func 212 213 # validate_function_parameters is separated out to allow for easier testing. 214 validate_function_parameters(func, manifest) 215 216 # assign the correct node class based on whether the function is async or sync 217 if asyncio.iscoroutinefunction(func): 218 node_class = AsyncDynamicFunctionNode 219 elif inspect.isfunction(func): 220 node_class = SyncDynamicFunctionNode 221 elif inspect.isbuiltin(func): 222 # builtin functions are written in C and do not have space for the addition of metadata like our node type. 223 # so instead we wrap them in a function that allows for the addition of the node type. 224 # this logic preserved details like the function name, docstring, and signature, but allows us to add the node type. 225 func = _function_preserving_metadata(func) 226 node_class = SyncDynamicFunctionNode 227 else: 228 raise NodeCreationError( 229 message=f"The provided function is not a valid coroutine or sync function it is {type(func)}.", 230 notes=[ 231 "You must provide a valid function or coroutine function to make a node.", 232 ], 233 ) 234 235 # build the node using the NodeBuilder 236 builder = NodeBuilder( 237 node_class, 238 name=name if name is not None else f"{func.__name__}", 239 ) 240 241 builder.setup_function_node( 242 func, 243 tool_details=manifest.description if manifest is not None else None, 244 tool_params=manifest.parameters if manifest is not None else None, 245 ) 246 247 completed_node_type = builder.build() 248 249 # there is some pretty scary logic here. 250 if issubclass(completed_node_type, AsyncDynamicFunctionNode): 251 setattr(func, "node_type", completed_node_type) 252 return func 253 elif issubclass(completed_node_type, SyncDynamicFunctionNode): 254 setattr(func, "node_type", completed_node_type) 255 return func 256 else: 257 raise NodeCreationError( 258 message="The provided function did not create a valid node type.", 259 notes=[ 260 "Please make a github issue with the details of what went wrong.", 261 ], 262 )
Creates a new Node type from a function that can be used in rt.call().
By default, it will parse the function's docstring and turn them into tool details and parameters. However, if you provide custom ToolManifest it will override that logic.
WARNING: If you overriding tool parameters. It is on you to make sure they will work with your function.
NOTE: If you have already converted this function to a node this function will do nothing
Arguments:
- func (Callable): The function to convert into a Node.
- name (str, optional): Human-readable name for the node/tool.
- manifest (ToolManifest, optional): The details you would like to override the tool with.
120def agent_node( 121 name: str | None = None, 122 *, 123 tool_nodes: Iterable[Type[Node] | Callable | RTFunction] | None = None, 124 output_schema: Type[_TBaseModel] | None = None, 125 llm: ModelBase[_TStream] | None = None, 126 max_tool_calls: int | None = None, 127 system_message: SystemMessage | str | None = None, 128 manifest: ToolManifest | None = None, 129): 130 """ 131 Dynamically creates an agent based on the provided parameters. 132 133 Args: 134 name (str | None): The name of the agent. If none the default will be used. 135 tool_nodes (set[Type[Node] | Callable | RTFunction] | None): If your agent is a LLM with access to tools, what does it have access to? 136 output_schema (Type[_TBaseModel] | None): If your agent should return a structured output, what is the output_schema? 137 llm (ModelBase): The LLM model to use. If None it will need to be passed in at instance time. 138 max_tool_calls (int | None): Maximum number of tool calls allowed (if it is a ToolCall Agent). 139 system_message (SystemMessage | str | None): System message for the agent. 140 manifest (ToolManifest | None): If you want to use this as a tool in other agents you can pass in a ToolManifest. 141 """ 142 unpacked_tool_nodes: set[Type[Node]] | None = None 143 if tool_nodes is not None: 144 unpacked_tool_nodes = set() 145 for node in tool_nodes: 146 if isinstance(node, FunctionType): 147 unpacked_tool_nodes.add(extract_node_from_function(node)) 148 else: 149 assert issubclass(node, Node), ( 150 f"Expected {node} to be a subclass of Node" 151 ) 152 unpacked_tool_nodes.add(node) 153 154 # See issue (___) this logic should be migrated soon. 155 if manifest is not None: 156 tool_details = manifest.description 157 tool_params = manifest.parameters 158 else: 159 tool_details = None 160 tool_params = None 161 162 if unpacked_tool_nodes is not None and len(unpacked_tool_nodes) > 0: 163 if output_schema is not None: 164 return structured_tool_call_llm( 165 tool_nodes=unpacked_tool_nodes, 166 output_schema=output_schema, 167 name=name, 168 llm=llm, 169 max_tool_calls=max_tool_calls, 170 system_message=system_message, 171 tool_details=tool_details, 172 tool_params=tool_params, 173 ) 174 else: 175 return tool_call_llm( 176 tool_nodes=unpacked_tool_nodes, 177 name=name, 178 llm=llm, 179 max_tool_calls=max_tool_calls, 180 system_message=system_message, 181 tool_details=tool_details, 182 tool_params=tool_params, 183 ) 184 else: 185 if output_schema is not None: 186 return structured_llm( 187 output_schema=output_schema, 188 name=name, 189 llm=llm, 190 system_message=system_message, 191 tool_details=tool_details, 192 tool_params=tool_params, 193 ) 194 else: 195 return terminal_llm( 196 name=name, 197 llm=llm, 198 system_message=system_message, 199 tool_details=tool_details, 200 tool_params=tool_params, 201 )
Dynamically creates an agent based on the provided parameters.
Arguments:
- name (str | None): The name of the agent. If none the default will be used.
- tool_nodes (set[Type[Node] | Callable | RTFunction] | None): If your agent is a LLM with access to tools, what does it have access to?
- output_schema (Type[_TBaseModel] | None): If your agent should return a structured output, what is the output_schema?
- llm (ModelBase): The LLM model to use. If None it will need to be passed in at instance time.
- max_tool_calls (int | None): Maximum number of tool calls allowed (if it is a ToolCall Agent).
- system_message (SystemMessage | str | None): System message for the agent.
- manifest (ToolManifest | None): If you want to use this as a tool in other agents you can pass in a ToolManifest.
19class MCPStdioParams(StdioServerParameters): 20 timeout: timedelta = timedelta(seconds=30) 21 22 def as_stdio_params(self) -> StdioServerParameters: 23 # Collect all attributes except 'timeout' 24 stdio_kwargs = self.dict(exclude={"timeout"}) 25 return StdioServerParameters(**stdio_kwargs)
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
28class MCPHttpParams(BaseModel): 29 url: str 30 headers: dict[str, Any] | None = None 31 timeout: timedelta = timedelta(seconds=30) 32 sse_read_timeout: timedelta = timedelta(seconds=60 * 5) 33 terminate_on_close: bool = True
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
8def connect_mcp( 9 config: MCPStdioParams | MCPHttpParams, client_session: ClientSession | None = None 10) -> MCPServer: 11 """ 12 Returns an MCPServer class. On creation, it will connect to the MCP server and fetch the tools. 13 The connection will remain open until the server is closed with `close()`. 14 15 Args: 16 config: Configuration for the MCP server, either as StdioServerParameters or MCPHttpParams. 17 client_session: Optional ClientSession to use for the MCP server connection. If not provided, a new session will be created. 18 19 Returns: 20 MCPServer: An instance of the MCPServer class. 21 """ 22 # Apply Jupyter compatibility patches if needed 23 apply_patches() 24 25 return MCPServer(config=config, client_session=client_session)
Returns an MCPServer class. On creation, it will connect to the MCP server and fetch the tools.
The connection will remain open until the server is closed with close().
Arguments:
- config: Configuration for the MCP server, either as StdioServerParameters or MCPHttpParams.
- client_session: Optional ClientSession to use for the MCP server connection. If not provided, a new session will be created.
Returns:
MCPServer: An instance of the MCPServer class.
86def create_mcp_server( 87 nodes: List[Node | RTFunction], 88 server_name: str = "MCP Server", 89 fastmcp: FastMCP | None = None, 90): 91 """ 92 Create a FastMCP server that can be used to run nodes as MCP tools. 93 94 Args: 95 nodes: List of Node classes to be registered as tools with the MCP server. 96 server_name: Name of the MCP server instance. 97 fastmcp: Optional FastMCP instance to use instead of creating a new one. 98 99 Returns: 100 A FastMCP server instance. 101 """ 102 if fastmcp is not None: 103 if not isinstance(fastmcp, FastMCP): 104 raise ValueError("Provided fastmcp must be an instance of FastMCP.") 105 mcp = fastmcp 106 else: 107 mcp = FastMCP(server_name) 108 109 for node in [n if not hasattr(n, "node_type") else n.node_type for n in nodes]: 110 node_info = node.tool_info() 111 func = _create_tool_function(node, node_info) 112 113 mcp._tool_manager._tools[node_info.name] = MCPTool( 114 fn=func, 115 name=node_info.name, 116 description=node_info.detail, 117 parameters=( 118 _parameters_to_json_schema(node_info.parameters) 119 if node_info.parameters is not None 120 else {} 121 ), 122 fn_metadata=func_metadata(func, []), 123 is_async=True, 124 context_kwarg=None, 125 annotations=None, 126 ) # Register the node as a tool 127 128 return mcp
Create a FastMCP server that can be used to run nodes as MCP tools.
Arguments:
- nodes: List of Node classes to be registered as tools with the MCP server.
- server_name: Name of the MCP server instance.
- fastmcp: Optional FastMCP instance to use instead of creating a new one.
Returns:
A FastMCP server instance.
7class ToolManifest: 8 """ 9 Creates a manifest for a tool, which includes its description and parameters. 10 11 Args: 12 description (str): A description of the tool. 13 parameters (Iterable[Parameter] | None): An iterable of parameters for the tool. If None, there are no paramerters. 14 """ 15 16 def __init__( 17 self, 18 description: str, 19 parameters: Iterable[Parameter] | None = None, 20 ): 21 self.description = description 22 self.parameters: List[Parameter] = ( 23 list(parameters) if parameters is not None else [] 24 )
Creates a manifest for a tool, which includes its description and parameters.
Arguments:
- description (str): A description of the tool.
- parameters (Iterable[Parameter] | None): An iterable of parameters for the tool. If None, there are no paramerters.
422def session_id(): 423 """ 424 Gets the current session ID if it exists, otherwise returns None. 425 """ 426 try: 427 return get_session_id() 428 except ContextError: 429 return None
Gets the current session ID if it exists, otherwise returns None.