railtracks

The Railtracks Framework for building resilient agentic systems in simple python

 1#   -------------------------------------------------------------
 2#   Copyright (c) Railtown AI. All rights reserved.
 3#   Licensed under the MIT License. See LICENSE in project root for information.
 4#   -------------------------------------------------------------
 5"""The Railtracks Framework for building resilient agentic systems in simple python"""
 6
 7from __future__ import annotations
 8
 9from dotenv import load_dotenv
10
11__all__ = [
12    "Session",
13    "session",
14    "call",
15    "broadcast",
16    "call_batch",
17    "interactive",
18    "ExecutionInfo",
19    "ExecutorConfig",
20    "llm",
21    "context",
22    "set_config",
23    "context",
24    "function_node",
25    "agent_node",
26    "integrations",
27    "prebuilt",
28    "MCPStdioParams",
29    "MCPHttpParams",
30    "connect_mcp",
31    "create_mcp_server",
32    "ToolManifest",
33    "session_id",
34    "vector_stores",
35    "rag",
36    "RagConfig",
37]
38
39
40from railtracks.built_nodes.concrete.rag import RagConfig
41from railtracks.built_nodes.easy_usage_wrappers import (
42    agent_node,
43    function_node,
44)
45
46from . import context, integrations, llm, prebuilt, rag, vector_stores
47from ._session import ExecutionInfo, Session, session
48from .context.central import session_id, set_config
49from .interaction import broadcast, call, call_batch, interactive
50from .nodes.manifest import ToolManifest
51from .rt_mcp import MCPHttpParams, MCPStdioParams, connect_mcp, create_mcp_server
52from .utils.config import ExecutorConfig
53from .utils.logging.config import initialize_module_logging
54
55load_dotenv()
56initialize_module_logging()
57
58# Do not worry about changing this version number manually. It will updated on release.
59__version__ = "1.0.0"
class Session:
 42class Session:
 43    """
 44    The main class for managing an execution session.
 45
 46    This class is responsible for setting up all the necessary components for running a Railtracks execution, including the coordinator, publisher, and state management.
 47
 48    For the configuration parameters of the setting. It will follow this precedence:
 49    1. The parameters in the `Session` constructor.
 50    2. The parameters in global context variables.
 51    3. The default values.
 52
 53    Default Values:
 54    - `name`: None
 55    - `timeout`: 150.0 seconds
 56    - `end_on_error`: False
 57    - `logging_setting`: "INFO"
 58    - `log_file`: None (logs will not be written to a file)
 59    - `broadcast_callback`: None (no callback for broadcast messages)
 60    - `prompt_injection`: True (the prompt will be automatically injected from context variables)
 61    - `save_state`: True (the state of the execution will be saved to a file at the end of the run in the `.railtracks/data/sessions/` directory)
 62
 63
 64    Args:
 65        name (str | None, optional): Optional name for the session. This name will be included in the saved state file if `save_state` is True.
 66        context (Dict[str, Any], optional): A dictionary of global context variables to be used during the execution.
 67        timeout (float, optional): The maximum number of seconds to wait for a response to your top-level request.
 68        end_on_error (bool, optional): If True, the execution will stop when an exception is encountered.
 69        logging_setting (AllowableLogLevels, optional): The setting for the level of logging you would like to have. This will override the module-level logging settings for the duration of this session.
 70        log_file (str | os.PathLike | None, optional): The file to which the logs will be written.
 71        broadcast_callback (Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None, optional): A callback function that will be called with the broadcast messages.
 72        prompt_injection (bool, optional): If True, the prompt will be automatically injected from context variables.
 73        save_state (bool, optional): If True, the state of the execution will be saved to a file at the end of the run in the `.railtracks/data/sessions/` directory.
 74    """
 75
 76    def __init__(
 77        self,
 78        context: Dict[str, Any] | None = None,
 79        *,
 80        name: str | None = None,
 81        timeout: float | None = None,
 82        end_on_error: bool | None = None,
 83        logging_setting: AllowableLogLevels | None = None,
 84        log_file: str | os.PathLike | None = None,
 85        broadcast_callback: (
 86            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
 87        ) = None,
 88        prompt_injection: bool | None = None,
 89        save_state: bool | None = None,
 90    ):
 91        # first lets read from defaults if nessecary for the provided input config
 92
 93        self.executor_config = self.global_config_precedence(
 94            timeout=timeout,
 95            end_on_error=end_on_error,
 96            logging_setting=logging_setting,
 97            log_file=log_file,
 98            broadcast_callback=broadcast_callback,
 99            prompt_injection=prompt_injection,
100            save_state=save_state,
101        )
102
103        if context is None:
104            context = {}
105
106        self.name = name
107
108        self._has_custom_logging = logging_setting is not None or log_file is not None
109
110        if self._has_custom_logging:
111            mark_session_logging_override(
112                session_level=self.executor_config.logging_setting,
113                session_log_file=self.executor_config.log_file,
114            )
115
116        self.publisher: RTPublisher = RTPublisher()
117
118        self._identifier = str(uuid.uuid4())
119
120        executor_info = ExecutionInfo.create_new()
121        self.coordinator = Coordinator(
122            execution_modes={"async": AsyncioExecutionStrategy()}
123        )
124        self.rt_state = RTState(
125            executor_info, self.executor_config, self.coordinator, self.publisher
126        )
127
128        self.coordinator.start(self.publisher)
129        self._setup_subscriber()
130        register_globals(
131            session_id=self._identifier,
132            rt_publisher=self.publisher,
133            parent_id=None,
134            executor_config=self.executor_config,
135            global_context_vars=context,
136        )
137
138        self._start_time = time.time()
139
140        logger.debug("Session %s is initialized" % self._identifier)
141
142    @classmethod
143    def global_config_precedence(
144        cls,
145        timeout: float | None,
146        end_on_error: bool | None,
147        logging_setting: AllowableLogLevels | None,
148        log_file: str | os.PathLike | None,
149        broadcast_callback: (
150            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
151        ),
152        prompt_injection: bool | None,
153        save_state: bool | None,
154    ) -> ExecutorConfig:
155        """
156        Uses the following precedence order to determine the configuration parameters:
157        1. The parameters in the method parameters.
158        2. The parameters in global context variables.
159        3. The default values.
160        """
161        global_executor_config = get_global_config()
162
163        return global_executor_config.precedence_overwritten(
164            timeout=timeout,
165            end_on_error=end_on_error,
166            logging_setting=logging_setting,
167            log_file=log_file,
168            subscriber=broadcast_callback,
169            prompt_injection=prompt_injection,
170            save_state=save_state,
171        )
172
173    def __enter__(self):
174        return self
175
176    def __exit__(self, exc_type, exc_val, exc_tb):
177        if self.executor_config.save_state:
178            try:
179                railtracks_dir = Path(".railtracks")
180                sessions_dir = railtracks_dir / "data" / "sessions"
181                sessions_dir.mkdir(
182                    parents=True, exist_ok=True
183                )  # Creates directory structure if doesn't exist, skips otherwise.
184
185                # Try to create file path with name, fallback to identifier only if there's an issue
186                try:
187                    file_path = (
188                        sessions_dir / f"{self.name}_{self._identifier}.json"
189                        if self.name
190                        else sessions_dir / f"{self._identifier}.json"
191                    )
192                    file_path.touch()
193                except FileNotFoundError:
194                    logger.warning(
195                        get_message(
196                            ExceptionMessageKey.INVALID_SESSION_FILE_NAME_WARN
197                        ).format(name=self.name, identifier=self._identifier)
198                    )
199                    file_path = sessions_dir / f"{self._identifier}.json"
200
201                logger.info("Saving execution info to %s" % file_path)
202
203                file_path.write_text(json.dumps(self.payload()))
204            except Exception as e:
205                logger.error(
206                    "Error while saving to execution info to file",
207                    exc_info=e,
208                )
209
210        self._close()
211
212    def _setup_subscriber(self):
213        """
214        Prepares and attaches the saved broadcast_callback to the publisher attached to this runner.
215        """
216
217        if self.executor_config.subscriber is not None:
218            self.publisher.subscribe(
219                stream_subscriber(self.executor_config.subscriber),
220                name="Streaming Subscriber",
221            )
222
223    def _close(self):
224        """
225        Closes the runner and cleans up all resources.
226
227        - Shuts down the state object
228        - Detaches logging handlers so they aren't duplicated
229        - Deletes all the global variables that were registered in the context
230        """
231        # the publisher should have already been closed in `_run_base`
232        self.rt_state.shutdown()
233
234        if self._has_custom_logging:
235            restore_module_logging()
236
237        delete_globals()
238        # by deleting all of the state variables we are ensuring that the next time we create a runner it is fresh
239
240    @property
241    def info(self) -> ExecutionInfo:
242        """
243        Returns the current state of the runner.
244
245        This is useful for debugging and viewing the current state of the run.
246        """
247        return self.rt_state.info
248
249    def payload(self) -> Dict[str, Any]:
250        """
251        Gets the complete json payload tied to this session.
252
253        The outputted json schema is maintained in (link here)
254        """
255        info = self.info
256
257        run_list = info.graph_serialization()
258
259        full_dict = {
260            "session_id": self._identifier,
261            "session_name": self.name,
262            "start_time": self._start_time,
263            "end_time": time.time(),
264            "runs": run_list,
265        }
266
267        return json.loads(json.dumps(full_dict))

The main class for managing an execution session.

This class is responsible for setting up all the necessary components for running a Railtracks execution, including the coordinator, publisher, and state management.

For the configuration parameters of the setting. It will follow this precedence:

  1. The parameters in the Session constructor.
  2. The parameters in global context variables.
  3. The default values.

Default Values:

  • name: None
  • timeout: 150.0 seconds
  • end_on_error: False
  • logging_setting: "INFO"
  • log_file: None (logs will not be written to a file)
  • broadcast_callback: None (no callback for broadcast messages)
  • prompt_injection: True (the prompt will be automatically injected from context variables)
  • save_state: True (the state of the execution will be saved to a file at the end of the run in the .railtracks/data/sessions/ directory)
Arguments:
  • name (str | None, optional): Optional name for the session. This name will be included in the saved state file if save_state is True.
  • context (Dict[str, Any], optional): A dictionary of global context variables to be used during the execution.
  • timeout (float, optional): The maximum number of seconds to wait for a response to your top-level request.
  • end_on_error (bool, optional): If True, the execution will stop when an exception is encountered.
  • logging_setting (AllowableLogLevels, optional): The setting for the level of logging you would like to have. This will override the module-level logging settings for the duration of this session.
  • log_file (str | os.PathLike | None, optional): The file to which the logs will be written.
  • broadcast_callback (Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None, optional): A callback function that will be called with the broadcast messages.
  • prompt_injection (bool, optional): If True, the prompt will be automatically injected from context variables.
  • save_state (bool, optional): If True, the state of the execution will be saved to a file at the end of the run in the .railtracks/data/sessions/ directory.
Session( context: Optional[Dict[str, Any]] = None, *, name: str | None = None, timeout: float | None = None, end_on_error: bool | None = None, logging_setting: Optional[Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', 'NONE']] = None, log_file: str | os.PathLike | None = None, broadcast_callback: Union[Callable[[str], NoneType], Callable[[str], Coroutine[NoneType, NoneType, NoneType]], NoneType] = None, prompt_injection: bool | None = None, save_state: bool | None = None)
 76    def __init__(
 77        self,
 78        context: Dict[str, Any] | None = None,
 79        *,
 80        name: str | None = None,
 81        timeout: float | None = None,
 82        end_on_error: bool | None = None,
 83        logging_setting: AllowableLogLevels | None = None,
 84        log_file: str | os.PathLike | None = None,
 85        broadcast_callback: (
 86            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
 87        ) = None,
 88        prompt_injection: bool | None = None,
 89        save_state: bool | None = None,
 90    ):
 91        # first lets read from defaults if nessecary for the provided input config
 92
 93        self.executor_config = self.global_config_precedence(
 94            timeout=timeout,
 95            end_on_error=end_on_error,
 96            logging_setting=logging_setting,
 97            log_file=log_file,
 98            broadcast_callback=broadcast_callback,
 99            prompt_injection=prompt_injection,
100            save_state=save_state,
101        )
102
103        if context is None:
104            context = {}
105
106        self.name = name
107
108        self._has_custom_logging = logging_setting is not None or log_file is not None
109
110        if self._has_custom_logging:
111            mark_session_logging_override(
112                session_level=self.executor_config.logging_setting,
113                session_log_file=self.executor_config.log_file,
114            )
115
116        self.publisher: RTPublisher = RTPublisher()
117
118        self._identifier = str(uuid.uuid4())
119
120        executor_info = ExecutionInfo.create_new()
121        self.coordinator = Coordinator(
122            execution_modes={"async": AsyncioExecutionStrategy()}
123        )
124        self.rt_state = RTState(
125            executor_info, self.executor_config, self.coordinator, self.publisher
126        )
127
128        self.coordinator.start(self.publisher)
129        self._setup_subscriber()
130        register_globals(
131            session_id=self._identifier,
132            rt_publisher=self.publisher,
133            parent_id=None,
134            executor_config=self.executor_config,
135            global_context_vars=context,
136        )
137
138        self._start_time = time.time()
139
140        logger.debug("Session %s is initialized" % self._identifier)
executor_config
name
publisher: railtracks.pubsub.publisher.RTPublisher
coordinator
rt_state
@classmethod
def global_config_precedence( cls, timeout: float | None, end_on_error: bool | None, logging_setting: Optional[Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', 'NONE']], log_file: str | os.PathLike | None, broadcast_callback: Union[Callable[[str], NoneType], Callable[[str], Coroutine[NoneType, NoneType, NoneType]], NoneType], prompt_injection: bool | None, save_state: bool | None) -> ExecutorConfig:
142    @classmethod
143    def global_config_precedence(
144        cls,
145        timeout: float | None,
146        end_on_error: bool | None,
147        logging_setting: AllowableLogLevels | None,
148        log_file: str | os.PathLike | None,
149        broadcast_callback: (
150            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
151        ),
152        prompt_injection: bool | None,
153        save_state: bool | None,
154    ) -> ExecutorConfig:
155        """
156        Uses the following precedence order to determine the configuration parameters:
157        1. The parameters in the method parameters.
158        2. The parameters in global context variables.
159        3. The default values.
160        """
161        global_executor_config = get_global_config()
162
163        return global_executor_config.precedence_overwritten(
164            timeout=timeout,
165            end_on_error=end_on_error,
166            logging_setting=logging_setting,
167            log_file=log_file,
168            subscriber=broadcast_callback,
169            prompt_injection=prompt_injection,
170            save_state=save_state,
171        )

Uses the following precedence order to determine the configuration parameters:

  1. The parameters in the method parameters.
  2. The parameters in global context variables.
  3. The default values.
info: ExecutionInfo
240    @property
241    def info(self) -> ExecutionInfo:
242        """
243        Returns the current state of the runner.
244
245        This is useful for debugging and viewing the current state of the run.
246        """
247        return self.rt_state.info

Returns the current state of the runner.

This is useful for debugging and viewing the current state of the run.

def payload(self) -> Dict[str, Any]:
249    def payload(self) -> Dict[str, Any]:
250        """
251        Gets the complete json payload tied to this session.
252
253        The outputted json schema is maintained in (link here)
254        """
255        info = self.info
256
257        run_list = info.graph_serialization()
258
259        full_dict = {
260            "session_id": self._identifier,
261            "session_name": self.name,
262            "start_time": self._start_time,
263            "end_time": time.time(),
264            "runs": run_list,
265        }
266
267        return json.loads(json.dumps(full_dict))

Gets the complete json payload tied to this session.

The outputted json schema is maintained in (link here)

def session( func: Optional[Callable[~_P, Coroutine[Any, Any, ~_TOutput]]] = None, *, name: str | None = None, context: Optional[Dict[str, Any]] = None, timeout: float | None = None, end_on_error: bool | None = None, logging_setting: Optional[Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', 'NONE']] = None, log_file: str | os.PathLike | None = None, broadcast_callback: Union[Callable[[str], NoneType], Callable[[str], Coroutine[NoneType, NoneType, NoneType]], NoneType] = None, prompt_injection: bool | None = None, save_state: bool | None = None) -> Union[Callable[~_P, Coroutine[Any, Any, Tuple[~_TOutput, Session]]], Callable[[Callable[~_P, Coroutine[Any, Any, ~_TOutput]]], Callable[~_P, Coroutine[Any, Any, Tuple[~_TOutput, Session]]]]]:
328def session(
329    func: Callable[_P, Coroutine[Any, Any, _TOutput]] | None = None,
330    *,
331    name: str | None = None,
332    context: Dict[str, Any] | None = None,
333    timeout: float | None = None,
334    end_on_error: bool | None = None,
335    logging_setting: AllowableLogLevels | None = None,
336    log_file: str | os.PathLike | None = None,
337    broadcast_callback: (
338        Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
339    ) = None,
340    prompt_injection: bool | None = None,
341    save_state: bool | None = None,
342) -> (
343    Callable[_P, Coroutine[Any, Any, Tuple[_TOutput, Session]]]
344    | Callable[
345        [Callable[_P, Coroutine[Any, Any, _TOutput]]],
346        Callable[_P, Coroutine[Any, Any, Tuple[_TOutput, Session]]],
347    ]
348):
349    """
350    This decorator automatically creates and manages a Session context for the decorated function,
351    allowing async functions to use Railtracks operations without manually managing the session lifecycle.
352
353    Can be used as:
354    - @session (without parentheses) - uses default settings
355    - @session() (with empty parentheses) - uses default settings
356    - @session(name="my_task", timeout=30) (with configuration parameters)
357
358    When using this decorator, the function returns a tuple containing:
359    1. The original function's return value
360    2. The Session object used during execution
361
362    This allows access to session information (like execution state, logs, etc.) after the function completes,
363    while maintaining the simplicity of decorator usage.
364
365    Args:
366        name (str | None, optional): Optional name for the session. This name will be included in the saved state file if `save_state` is True.
367        context (Dict[str, Any], optional): A dictionary of global context variables to be used during the execution.
368        timeout (float, optional): The maximum number of seconds to wait for a response to your top-level request.
369        end_on_error (bool, optional): If True, the execution will stop when an exception is encountered.
370        logging_setting (AllowableLogLevels, optional): The setting for the level of logging you would like to have. This will override the module-level logging settings for the duration of this session.
371        log_file (str | os.PathLike | None, optional): The file to which the logs will be written.
372        broadcast_callback (Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None, optional): A callback function that will be called with the broadcast messages.
373        prompt_injection (bool, optional): If True, the prompt will be automatically injected from context variables.
374        save_state (bool, optional): If True, the state of the execution will be saved to a file at the end of the run in the `.railtracks/data/sessions/` directory.
375
376    Returns:
377        When used as @session (without parentheses): Returns the decorated function that returns (result, session).
378        When used as @session(...) (with parameters): Returns a decorator function that takes an async function
379        and returns a new async function that returns (result, session).
380    """
381
382    def decorator(
383        target_func: Callable[_P, Coroutine[Any, Any, _TOutput]],
384    ) -> Callable[_P, Coroutine[Any, Any, Tuple[_TOutput, Session]]]:
385        # Validate that the decorated function is async
386        if not inspect.iscoroutinefunction(target_func):
387            raise TypeError(
388                f"@session decorator can only be applied to async functions. "
389                f"Function '{target_func.__name__}' is not async. "
390                f"Add 'async' keyword to your function definition."
391            )
392
393        @wraps(target_func)
394        async def wrapper(
395            *args: _P.args, **kwargs: _P.kwargs
396        ) -> Tuple[_TOutput, Session]:
397            session_obj = Session(
398                context=context,
399                timeout=timeout,
400                end_on_error=end_on_error,
401                logging_setting=logging_setting,
402                log_file=log_file,
403                broadcast_callback=broadcast_callback,
404                name=name,
405                prompt_injection=prompt_injection,
406                save_state=save_state,
407            )
408
409            with session_obj:
410                result = await target_func(*args, **kwargs)
411                return result, session_obj
412
413        return wrapper
414
415    # If used as @session without parentheses
416    if func is not None:
417        return decorator(func)
418
419    # If used as @session(...)
420    return decorator

This decorator automatically creates and manages a Session context for the decorated function, allowing async functions to use Railtracks operations without manually managing the session lifecycle.

Can be used as:

  • @session (without parentheses) - uses default settings
  • @session() (with empty parentheses) - uses default settings
  • @session(name="my_task", timeout=30) (with configuration parameters)

When using this decorator, the function returns a tuple containing:

  1. The original function's return value
  2. The Session object used during execution

This allows access to session information (like execution state, logs, etc.) after the function completes, while maintaining the simplicity of decorator usage.

Arguments:
  • name (str | None, optional): Optional name for the session. This name will be included in the saved state file if save_state is True.
  • context (Dict[str, Any], optional): A dictionary of global context variables to be used during the execution.
  • timeout (float, optional): The maximum number of seconds to wait for a response to your top-level request.
  • end_on_error (bool, optional): If True, the execution will stop when an exception is encountered.
  • logging_setting (AllowableLogLevels, optional): The setting for the level of logging you would like to have. This will override the module-level logging settings for the duration of this session.
  • log_file (str | os.PathLike | None, optional): The file to which the logs will be written.
  • broadcast_callback (Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None, optional): A callback function that will be called with the broadcast messages.
  • prompt_injection (bool, optional): If True, the prompt will be automatically injected from context variables.
  • save_state (bool, optional): If True, the state of the execution will be saved to a file at the end of the run in the .railtracks/data/sessions/ directory.
Returns:

When used as @session (without parentheses): Returns the decorated function that returns (result, session). When used as @session(...) (with parameters): Returns a decorator function that takes an async function and returns a new async function that returns (result, session).

async def call( node_: Union[Callable[~_P, railtracks.nodes.nodes.Node[~_TOutput]], railtracks.built_nodes.concrete.function_base.RTFunction[~_P, ~_TOutput]], *args: _P.args, **kwargs: _P.kwargs) -> ~_TOutput:
 60async def call(
 61    node_: Callable[_P, Node[_TOutput]] | RTFunction[_P, _TOutput],
 62    *args: _P.args,
 63    **kwargs: _P.kwargs,
 64) -> _TOutput:
 65    """
 66    Call a node from within a node inside the framework. This will return a coroutine that you can interact with
 67    in whatever way using async/await logic.
 68
 69    Usage:
 70    ```python
 71    # for sequential operation
 72    result = await call(NodeA, "hello world", 42)
 73
 74    # for parallel operation
 75    tasks = [call(NodeA, "hello world", i) for i in range(10)]
 76    results = await asyncio.gather(*tasks)
 77    ```
 78
 79    Args:
 80        node: The node type you would like to create. This could be a function decorated with `@function_node`, a function, or a Node instance.
 81        *args: The arguments to pass to the node
 82        **kwargs: The keyword arguments to pass to the node
 83    """
 84    node: Callable[_P, Node[_TOutput]]
 85    # this entire section is a bit of a typing nightmare becuase all overloads we provide.
 86    if isinstance(node_, FunctionType):
 87        node = extract_node_from_function(node_)
 88    else:
 89        node = node_
 90
 91    # if the context is none then we will need to create a wrapper for the state object to work with.
 92    if not is_context_present():
 93        # we have to use lazy import here to prevent a circular import issue. This is a must have unfortunately.
 94        from railtracks import Session
 95
 96        with Session():
 97            result = await _start(node, args=args, kwargs=kwargs)
 98            return result
 99
100    # if the context is not active then we know this is the top level request
101    if not is_context_active():
102        result = await _start(node, args=args, kwargs=kwargs)
103        return result
104
105    # if the context is active then we can just run the node
106    result = await _run(node, args=args, kwargs=kwargs)
107    return result

Call a node from within a node inside the framework. This will return a coroutine that you can interact with in whatever way using async/await logic.

Usage:

# for sequential operation
result = await call(NodeA, "hello world", 42)

# for parallel operation
tasks = [call(NodeA, "hello world", i) for i in range(10)]
results = await asyncio.gather(*tasks)
Arguments:
  • node: The node type you would like to create. This could be a function decorated with @function_node, a function, or a Node instance.
  • *args: The arguments to pass to the node
  • **kwargs: The keyword arguments to pass to the node
async def broadcast(item: str):
 6async def broadcast(item: str):
 7    """
 8    Streams the given message
 9
10    This will trigger the broadcast_callback callback you have already provided.
11
12    Args:
13        item (str): The item you want to stream.
14    """
15    publisher = get_publisher()
16
17    await publisher.publish(Streaming(node_id=get_parent_id(), streamed_object=item))

Streams the given message

This will trigger the broadcast_callback callback you have already provided.

Arguments:
  • item (str): The item you want to stream.
async def call_batch( node: 'Callable[..., Node[_TOutput]] | Callable[..., _TOutput] | _AsyncNodeAttachedFunc[_P, _TOutput] | _SyncNodeAttachedFunc[_P, _TOutput]', *iterables: Iterable[Any], return_exceptions: bool = True):
27async def call_batch(
28    node: Callable[..., Node[_TOutput]]
29    | Callable[..., _TOutput]
30    | _AsyncNodeAttachedFunc[_P, _TOutput]
31    | _SyncNodeAttachedFunc[_P, _TOutput],
32    *iterables: Iterable[Any],
33    return_exceptions: bool = True,
34):
35    """
36    Complete a node over multiple iterables, allowing for parallel execution.
37
38    Note the results will be returned in the order of the iterables, not the order of completion.
39
40    If one of the nodes returns an exception, the thrown exception will be included as a response.
41
42    Args:
43        node: The node type to create.
44        *iterables: The iterables to map the node over.
45        return_exceptions: If True, exceptions will be returned as part of the results.
46            If False, exceptions will be raised immediately, and you will lose access to the results.
47            Defaults to true.
48
49    Returns:
50        An iterable of results from the node.
51
52    Usage:
53        ```python
54        results = await batch(NodeA, ["hello world"] * 10)
55        for result in results:
56            handle(result)
57        ```
58    """
59    # this is big typing disaster but there is no way around it. Try if if you want to.
60    contracts = [call(node, *args) for args in zip(*iterables)]
61
62    results = await asyncio.gather(*contracts, return_exceptions=return_exceptions)
63    return results

Complete a node over multiple iterables, allowing for parallel execution.

Note the results will be returned in the order of the iterables, not the order of completion.

If one of the nodes returns an exception, the thrown exception will be included as a response.

Arguments:
  • node: The node type to create.
  • *iterables: The iterables to map the node over.
  • return_exceptions: If True, exceptions will be returned as part of the results. If False, exceptions will be raised immediately, and you will lose access to the results. Defaults to true.
Returns:

An iterable of results from the node.

Usage:
results = await batch(NodeA, ["hello world"] * 10)
for result in results:
    handle(result)
class ExecutionInfo:
 19class ExecutionInfo:
 20    """
 21    A class that contains the full details of the state of a run at any given point in time.
 22
 23    The class is designed to be used as a snapshot of state that can be used to display the state of the run, or to
 24    create a graphical representation of the system.
 25    """
 26
 27    def __init__(
 28        self,
 29        request_forest: RequestForest,
 30        node_forest: NodeForest,
 31        stamper: StampManager,
 32    ):
 33        self.request_forest = request_forest
 34        self.node_forest = node_forest
 35        self.stamper = stamper
 36
 37    @classmethod
 38    def default(cls) -> ExecutionInfo:
 39        """Creates a new "empty" instance of the ExecutionInfo class with the default values."""
 40        return cls.create_new()
 41
 42    @classmethod
 43    def create_new(
 44        cls,
 45    ) -> ExecutionInfo:
 46        """
 47        Creates a new empty instance of state variables with the provided executor configuration.
 48
 49        """
 50        request_heap = RequestForest()
 51        node_heap = NodeForest()
 52        stamper = StampManager()
 53
 54        return ExecutionInfo(
 55            request_forest=request_heap,
 56            node_forest=node_heap,
 57            stamper=stamper,
 58        )
 59
 60    @property
 61    def answer(self):
 62        """Convenience method to access the answer of the run."""
 63        return self.request_forest.answer
 64
 65    @property
 66    def all_stamps(self) -> List[Stamp]:
 67        """Convenience method to access all the stamps of the run."""
 68        return self.stamper.all_stamps
 69
 70    @property
 71    def name(self):
 72        """
 73        Gets the name of the graph by pulling the name of the insertion request. It will raise a ValueError if the insertion
 74        request is not present or there are multiple insertion requests.
 75        """
 76        insertion_requests = self.insertion_requests
 77
 78        if len(insertion_requests) >= 2:
 79            raise ValueError(
 80                "You cannot get the name of a graph with multiple insertion requests"
 81            )
 82
 83        if len(insertion_requests) == 0:
 84            raise ValueError(
 85                "You cannot get the name of a graph with no insertion requests"
 86            )
 87
 88        i_r = insertion_requests[0]
 89
 90        return self.node_forest.get_node_type(i_r.sink_id).name()
 91
 92    @property
 93    def insertion_requests(self):
 94        """A convenience method to access all the insertion requests of the run."""
 95        return self.request_forest.insertion_request
 96
 97    def _get_info(self, ids: List[str] | str | None = None) -> ExecutionInfo:
 98        """
 99        Gets a subset of the current state based on the provided node ids. It will contain all the children of the provided node ids
100
101        Note: If no ids are provided, the full state is returned.
102
103        Args:
104            ids (List[str] | str | None): A list of node ids to filter the state by. If None, the full state is returned.
105
106        Returns:
107            ExecutionInfo: A new instance of ExecutionInfo containing only the children of the provided ids.
108
109        """
110        if ids is None:
111            return self
112        else:
113            # firstly lets
114            if isinstance(ids, str):
115                ids = [ids]
116
117            # we need to quickly check to make sure these ids are valid
118            for identifier in ids:
119                if identifier not in self.request_forest:
120                    raise ValueError(
121                        f"Identifier '{identifier}' not found in the current state."
122                    )
123
124            new_node_forest, new_request_forest = create_sub_state_info(
125                self.node_forest.heap(),
126                self.request_forest.heap(),
127                ids,
128            )
129            return ExecutionInfo(
130                node_forest=new_node_forest,
131                request_forest=new_request_forest,
132                stamper=self.stamper,
133            )
134
135    def _to_graph(self) -> Tuple[List[Vertex], List[Edge]]:
136        """
137        Converts the current state into its graph representation.
138
139        Returns:
140            List[Node]: An iterable of nodes in the graph.
141            List[Edge]: An iterable of edges in the graph.
142        """
143        return self.node_forest.to_vertices(), self.request_forest.to_edges()
144
145    def graph_serialization(self) -> dict[str, Any]:
146        """
147                Creates a string (JSON) representation of this info object designed to be used to construct a graph for this
148                info object.
149
150                Some important notes about its structure are outlined below:
151                - The `nodes` key contains a list of all the nodes in the graph, represented as `Vertex` objects.
152                - The `edges` key contains a list of all the edges in the graph, represented as `Edge` objects.
153                - The `stamps` key contains an ease of use list of all the stamps associated with the run, represented as `Stamp` objects.
154
155                - The "nodes" and "requests" key will be outlined with normal graph details like connections and identifiers in addition to a loose details object.
156                - However, both will carry an addition param called "stamp" which is a timestamp style object.
157                - They also will carry a "parent" param which is a recursive structure that allows you to traverse the graph in time.
158
159
160        ```
161        """
162        parent_nodes = [x.identifier for x in self.insertion_requests]
163
164        infos = [self._get_info(parent_node) for parent_node in parent_nodes]
165
166        runs = []
167
168        for info, parent_node_id in zip(infos, parent_nodes):
169            insertion_requests = info.request_forest.insertion_request
170
171            assert len(insertion_requests) == 1
172            parent_request = insertion_requests[0]
173
174            all_parents = parent_request.get_all_parents()
175
176            start_time = all_parents[-1].stamp.time
177
178            assert len([x for x in all_parents if x.status == "Completed"]) <= 1
179            end_time = None
180            for req in all_parents:
181                if req.status in ["Completed", "Failed"]:
182                    end_time = req.stamp.time
183                    break
184
185            entry = {
186                "name": info.name,
187                "run_id": parent_node_id,
188                "nodes": info.node_forest.to_vertices(),
189                "status": parent_request.status,
190                "edges": info.request_forest.to_edges(),
191                "steps": _get_stamps_from_forests(
192                    info.node_forest, info.request_forest
193                ),
194                "start_time": start_time,
195                "end_time": end_time,
196            }
197            runs.append(entry)
198
199        return json.loads(
200            json.dumps(
201                runs,
202                cls=RTJSONEncoder,
203            )
204        )

A class that contains the full details of the state of a run at any given point in time.

The class is designed to be used as a snapshot of state that can be used to display the state of the run, or to create a graphical representation of the system.

ExecutionInfo( request_forest: railtracks.state.request.RequestForest, node_forest: railtracks.state.node.NodeForest, stamper: railtracks.utils.profiling.StampManager)
27    def __init__(
28        self,
29        request_forest: RequestForest,
30        node_forest: NodeForest,
31        stamper: StampManager,
32    ):
33        self.request_forest = request_forest
34        self.node_forest = node_forest
35        self.stamper = stamper
request_forest
node_forest
stamper
@classmethod
def default(cls) -> ExecutionInfo:
37    @classmethod
38    def default(cls) -> ExecutionInfo:
39        """Creates a new "empty" instance of the ExecutionInfo class with the default values."""
40        return cls.create_new()

Creates a new "empty" instance of the ExecutionInfo class with the default values.

@classmethod
def create_new(cls) -> ExecutionInfo:
42    @classmethod
43    def create_new(
44        cls,
45    ) -> ExecutionInfo:
46        """
47        Creates a new empty instance of state variables with the provided executor configuration.
48
49        """
50        request_heap = RequestForest()
51        node_heap = NodeForest()
52        stamper = StampManager()
53
54        return ExecutionInfo(
55            request_forest=request_heap,
56            node_forest=node_heap,
57            stamper=stamper,
58        )

Creates a new empty instance of state variables with the provided executor configuration.

answer
60    @property
61    def answer(self):
62        """Convenience method to access the answer of the run."""
63        return self.request_forest.answer

Convenience method to access the answer of the run.

all_stamps: List[railtracks.utils.profiling.Stamp]
65    @property
66    def all_stamps(self) -> List[Stamp]:
67        """Convenience method to access all the stamps of the run."""
68        return self.stamper.all_stamps

Convenience method to access all the stamps of the run.

name
70    @property
71    def name(self):
72        """
73        Gets the name of the graph by pulling the name of the insertion request. It will raise a ValueError if the insertion
74        request is not present or there are multiple insertion requests.
75        """
76        insertion_requests = self.insertion_requests
77
78        if len(insertion_requests) >= 2:
79            raise ValueError(
80                "You cannot get the name of a graph with multiple insertion requests"
81            )
82
83        if len(insertion_requests) == 0:
84            raise ValueError(
85                "You cannot get the name of a graph with no insertion requests"
86            )
87
88        i_r = insertion_requests[0]
89
90        return self.node_forest.get_node_type(i_r.sink_id).name()

Gets the name of the graph by pulling the name of the insertion request. It will raise a ValueError if the insertion request is not present or there are multiple insertion requests.

insertion_requests
92    @property
93    def insertion_requests(self):
94        """A convenience method to access all the insertion requests of the run."""
95        return self.request_forest.insertion_request

A convenience method to access all the insertion requests of the run.

def graph_serialization(self) -> dict[str, typing.Any]:
145    def graph_serialization(self) -> dict[str, Any]:
146        """
147                Creates a string (JSON) representation of this info object designed to be used to construct a graph for this
148                info object.
149
150                Some important notes about its structure are outlined below:
151                - The `nodes` key contains a list of all the nodes in the graph, represented as `Vertex` objects.
152                - The `edges` key contains a list of all the edges in the graph, represented as `Edge` objects.
153                - The `stamps` key contains an ease of use list of all the stamps associated with the run, represented as `Stamp` objects.
154
155                - The "nodes" and "requests" key will be outlined with normal graph details like connections and identifiers in addition to a loose details object.
156                - However, both will carry an addition param called "stamp" which is a timestamp style object.
157                - They also will carry a "parent" param which is a recursive structure that allows you to traverse the graph in time.
158
159
160        ```
161        """
162        parent_nodes = [x.identifier for x in self.insertion_requests]
163
164        infos = [self._get_info(parent_node) for parent_node in parent_nodes]
165
166        runs = []
167
168        for info, parent_node_id in zip(infos, parent_nodes):
169            insertion_requests = info.request_forest.insertion_request
170
171            assert len(insertion_requests) == 1
172            parent_request = insertion_requests[0]
173
174            all_parents = parent_request.get_all_parents()
175
176            start_time = all_parents[-1].stamp.time
177
178            assert len([x for x in all_parents if x.status == "Completed"]) <= 1
179            end_time = None
180            for req in all_parents:
181                if req.status in ["Completed", "Failed"]:
182                    end_time = req.stamp.time
183                    break
184
185            entry = {
186                "name": info.name,
187                "run_id": parent_node_id,
188                "nodes": info.node_forest.to_vertices(),
189                "status": parent_request.status,
190                "edges": info.request_forest.to_edges(),
191                "steps": _get_stamps_from_forests(
192                    info.node_forest, info.request_forest
193                ),
194                "start_time": start_time,
195                "end_time": end_time,
196            }
197            runs.append(entry)
198
199        return json.loads(
200            json.dumps(
201                runs,
202                cls=RTJSONEncoder,
203            )
204        )

Creates a string (JSON) representation of this info object designed to be used to construct a graph for this info object.

    Some important notes about its structure are outlined below:
    - The `nodes` key contains a list of all the nodes in the graph, represented as `Vertex` objects.
    - The `edges` key contains a list of all the edges in the graph, represented as `Edge` objects.
    - The `stamps` key contains an ease of use list of all the stamps associated with the run, represented as `Stamp` objects.

    - The "nodes" and "requests" key will be outlined with normal graph details like connections and identifiers in addition to a loose details object.
    - However, both will carry an addition param called "stamp" which is a timestamp style object.
    - They also will carry a "parent" param which is a recursive structure that allows you to traverse the graph in time.

```

class ExecutorConfig:
10class ExecutorConfig:
11    def __init__(
12        self,
13        *,
14        timeout: float = 150.0,
15        end_on_error: bool = False,
16        logging_setting: AllowableLogLevels = "INFO",
17        log_file: str | os.PathLike | None = None,
18        broadcast_callback: (
19            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
20        ) = None,
21        prompt_injection: bool = True,
22        save_state: bool = True,
23    ):
24        """
25        ExecutorConfig is special configuration object designed to allow customization of the executor in the RT system.
26
27        Args:
28            timeout (float): The maximum number of seconds to wait for a response to your top level request
29            end_on_error (bool): If true, the executor will stop execution when an exception is encountered.
30            logging_setting (AllowableLogLevels): The setting for the level of logging you would like to have.
31            log_file (str | os.PathLike | None): The file to which the logs will be written. If None, no file will be created.
32            broadcast_callback (Callable or Coroutine): A function or coroutine that will handle streaming messages.
33            prompt_injection (bool): If true, prompts can be injected with global context
34            save_state (bool): If true, the state of the executor will be saved to disk.
35        """
36        self.timeout = timeout
37        self.end_on_error = end_on_error
38        self.logging_setting = logging_setting
39        self.subscriber = broadcast_callback
40        self.log_file = log_file
41        self.prompt_injection = prompt_injection
42        self.save_state = save_state
43
44    @property
45    def logging_setting(self) -> AllowableLogLevels:
46        return self._logging_setting
47
48    @logging_setting.setter
49    def logging_setting(self, value: AllowableLogLevels):
50        if value not in str_to_log_level:
51            raise ValueError(
52                f"logging_setting must be one of {str_to_log_level}, got {value}"
53            )
54        self._logging_setting: AllowableLogLevels = value
55
56    def precedence_overwritten(
57        self,
58        *,
59        timeout: float | None = None,
60        end_on_error: bool | None = None,
61        logging_setting: AllowableLogLevels | None = None,
62        log_file: str | os.PathLike | None = None,
63        subscriber: (
64            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
65        ) = None,
66        prompt_injection: bool | None = None,
67        save_state: bool | None = None,
68    ):
69        """
70        If any of the parameters are provided (not None), it will create a new update the current instance with the new values and return a deep copied reference to it.
71        """
72        return ExecutorConfig(
73            timeout=timeout if timeout is not None else self.timeout,
74            end_on_error=end_on_error
75            if end_on_error is not None
76            else self.end_on_error,
77            logging_setting=logging_setting
78            if logging_setting is not None
79            else self.logging_setting,
80            log_file=log_file if log_file is not None else self.log_file,
81            broadcast_callback=subscriber
82            if subscriber is not None
83            else self.subscriber,
84            prompt_injection=prompt_injection
85            if prompt_injection is not None
86            else self.prompt_injection,
87            save_state=save_state if save_state is not None else self.save_state,
88        )
89
90    def __repr__(self):
91        return (
92            f"ExecutorConfig(timeout={self.timeout}, end_on_error={self.end_on_error}, "
93            f"logging_setting={self.logging_setting}, log_file={self.log_file}, "
94            f"prompt_injection={self.prompt_injection}, "
95            f"save_state={self.save_state})"
96        )
ExecutorConfig( *, timeout: float = 150.0, end_on_error: bool = False, logging_setting: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', 'NONE'] = 'INFO', log_file: str | os.PathLike | None = None, broadcast_callback: Union[Callable[[str], NoneType], Callable[[str], Coroutine[NoneType, NoneType, NoneType]], NoneType] = None, prompt_injection: bool = True, save_state: bool = True)
11    def __init__(
12        self,
13        *,
14        timeout: float = 150.0,
15        end_on_error: bool = False,
16        logging_setting: AllowableLogLevels = "INFO",
17        log_file: str | os.PathLike | None = None,
18        broadcast_callback: (
19            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
20        ) = None,
21        prompt_injection: bool = True,
22        save_state: bool = True,
23    ):
24        """
25        ExecutorConfig is special configuration object designed to allow customization of the executor in the RT system.
26
27        Args:
28            timeout (float): The maximum number of seconds to wait for a response to your top level request
29            end_on_error (bool): If true, the executor will stop execution when an exception is encountered.
30            logging_setting (AllowableLogLevels): The setting for the level of logging you would like to have.
31            log_file (str | os.PathLike | None): The file to which the logs will be written. If None, no file will be created.
32            broadcast_callback (Callable or Coroutine): A function or coroutine that will handle streaming messages.
33            prompt_injection (bool): If true, prompts can be injected with global context
34            save_state (bool): If true, the state of the executor will be saved to disk.
35        """
36        self.timeout = timeout
37        self.end_on_error = end_on_error
38        self.logging_setting = logging_setting
39        self.subscriber = broadcast_callback
40        self.log_file = log_file
41        self.prompt_injection = prompt_injection
42        self.save_state = save_state

ExecutorConfig is special configuration object designed to allow customization of the executor in the RT system.

Arguments:
  • timeout (float): The maximum number of seconds to wait for a response to your top level request
  • end_on_error (bool): If true, the executor will stop execution when an exception is encountered.
  • logging_setting (AllowableLogLevels): The setting for the level of logging you would like to have.
  • log_file (str | os.PathLike | None): The file to which the logs will be written. If None, no file will be created.
  • broadcast_callback (Callable or Coroutine): A function or coroutine that will handle streaming messages.
  • prompt_injection (bool): If true, prompts can be injected with global context
  • save_state (bool): If true, the state of the executor will be saved to disk.
timeout
end_on_error
logging_setting: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', 'NONE']
44    @property
45    def logging_setting(self) -> AllowableLogLevels:
46        return self._logging_setting
subscriber
log_file
prompt_injection
save_state
def precedence_overwritten( self, *, timeout: float | None = None, end_on_error: bool | None = None, logging_setting: Optional[Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', 'NONE']] = None, log_file: str | os.PathLike | None = None, subscriber: Union[Callable[[str], NoneType], Callable[[str], Coroutine[NoneType, NoneType, NoneType]], NoneType] = None, prompt_injection: bool | None = None, save_state: bool | None = None):
56    def precedence_overwritten(
57        self,
58        *,
59        timeout: float | None = None,
60        end_on_error: bool | None = None,
61        logging_setting: AllowableLogLevels | None = None,
62        log_file: str | os.PathLike | None = None,
63        subscriber: (
64            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
65        ) = None,
66        prompt_injection: bool | None = None,
67        save_state: bool | None = None,
68    ):
69        """
70        If any of the parameters are provided (not None), it will create a new update the current instance with the new values and return a deep copied reference to it.
71        """
72        return ExecutorConfig(
73            timeout=timeout if timeout is not None else self.timeout,
74            end_on_error=end_on_error
75            if end_on_error is not None
76            else self.end_on_error,
77            logging_setting=logging_setting
78            if logging_setting is not None
79            else self.logging_setting,
80            log_file=log_file if log_file is not None else self.log_file,
81            broadcast_callback=subscriber
82            if subscriber is not None
83            else self.subscriber,
84            prompt_injection=prompt_injection
85            if prompt_injection is not None
86            else self.prompt_injection,
87            save_state=save_state if save_state is not None else self.save_state,
88        )

If any of the parameters are provided (not None), it will create a new update the current instance with the new values and return a deep copied reference to it.

def set_config( *, timeout: float | None = None, end_on_error: bool | None = None, logging_setting: Optional[Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', 'NONE']] = None, log_file: str | os.PathLike | None = None, broadcast_callback: Union[Callable[[str], NoneType], Callable[[str], Coroutine[NoneType, NoneType, NoneType]], NoneType] = None, prompt_injection: bool | None = None, save_state: bool | None = None):
360def set_config(
361    *,
362    timeout: float | None = None,
363    end_on_error: bool | None = None,
364    logging_setting: AllowableLogLevels | None = None,
365    log_file: str | os.PathLike | None = None,
366    broadcast_callback: (
367        Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
368    ) = None,
369    prompt_injection: bool | None = None,
370    save_state: bool | None = None,
371):
372    """
373    Sets the global configuration for the executor. This will be propagated to all new runners created after this call.
374
375    - If you call this function after the runner has been created, it will not affect the current runner.
376    - This function will only overwrite the values that are provided, leaving the rest unchanged.
377
378
379    """
380
381    if is_context_active():
382        warnings.warn(
383            "The executor config is being set after the runner has been created, this is not recommended"
384        )
385
386    config = global_executor_config.get()
387
388    if logging_setting or log_file:
389        # default will be set at module import time, this is for overwrites
390        configure_module_logging(level=logging_setting, log_file=log_file)
391
392    new_config = config.precedence_overwritten(
393        timeout=timeout,
394        end_on_error=end_on_error,
395        logging_setting=logging_setting,
396        log_file=log_file,
397        subscriber=broadcast_callback,
398        prompt_injection=prompt_injection,
399        save_state=save_state,
400    )
401
402    global_executor_config.set(new_config)

Sets the global configuration for the executor. This will be propagated to all new runners created after this call.

  • If you call this function after the runner has been created, it will not affect the current runner.
  • This function will only overwrite the values that are provided, leaving the rest unchanged.
def function_node( func: Union[Callable[~_P, Union[Coroutine[NoneType, NoneType, ~_TOutput], ~_TOutput]], List[Callable[~_P, Union[Coroutine[NoneType, NoneType, ~_TOutput], ~_TOutput]]]], /, *, name: str | None = None, manifest: ToolManifest | None = None) -> Union[Callable[~_P, Union[Coroutine[NoneType, NoneType, ~_TOutput], ~_TOutput]], List[Callable[~_P, Union[Coroutine[NoneType, NoneType, ~_TOutput], ~_TOutput]]], NoneType]:
173def function_node(
174    func: Callable[_P, Coroutine[None, None, _TOutput] | _TOutput]
175    | List[Callable[_P, Coroutine[None, None, _TOutput] | _TOutput]],
176    /,
177    *,
178    name: str | None = None,
179    manifest: ToolManifest | None = None,
180) -> (
181    Callable[_P, Coroutine[None, None, _TOutput] | _TOutput]
182    | List[Callable[_P, Coroutine[None, None, _TOutput] | _TOutput]]
183    | None
184):
185    """
186    Creates a new Node type from a function that can be used in `rt.call()`.
187
188    By default, it will parse the function's docstring and turn them into tool details and parameters. However, if
189    you provide custom ToolManifest it will override that logic.
190
191    WARNING: If you overriding tool parameters. It is on you to make sure they will work with your function.
192
193    NOTE: If you have already converted this function to a node this function will do nothing
194
195    Args:
196        func (Callable): The function to convert into a Node.
197        name (str, optional): Human-readable name for the node/tool.
198        manifest (ToolManifest, optional): The details you would like to override the tool with.
199    """
200
201    # handle the case where a list of functions is provided
202    if isinstance(func, list):
203        return [function_node(f, name=name, manifest=manifest) for f in func]
204
205    # check if the function has already been converted to a node
206    if hasattr(func, "node_type"):
207        warnings.warn(
208            "The provided function has already been converted to a node.",
209            UserWarning,
210        )
211        return func
212
213    # validate_function_parameters is separated out to allow for easier testing.
214    validate_function_parameters(func, manifest)
215
216    # assign the correct node class based on whether the function is async or sync
217    if asyncio.iscoroutinefunction(func):
218        node_class = AsyncDynamicFunctionNode
219    elif inspect.isfunction(func):
220        node_class = SyncDynamicFunctionNode
221    elif inspect.isbuiltin(func):
222        # builtin functions are written in C and do not have space for the addition of metadata like our node type.
223        # so instead we wrap them in a function that allows for the addition of the node type.
224        # this logic preserved details like the function name, docstring, and signature, but allows us to add the node type.
225        func = _function_preserving_metadata(func)
226        node_class = SyncDynamicFunctionNode
227    else:
228        raise NodeCreationError(
229            message=f"The provided function is not a valid coroutine or sync function it is {type(func)}.",
230            notes=[
231                "You must provide a valid function or coroutine function to make a node.",
232            ],
233        )
234
235    # build the node using the NodeBuilder
236    builder = NodeBuilder(
237        node_class,
238        name=name if name is not None else f"{func.__name__}",
239    )
240
241    builder.setup_function_node(
242        func,
243        tool_details=manifest.description if manifest is not None else None,
244        tool_params=manifest.parameters if manifest is not None else None,
245    )
246
247    completed_node_type = builder.build()
248
249    # there is some pretty scary logic here.
250    if issubclass(completed_node_type, AsyncDynamicFunctionNode):
251        setattr(func, "node_type", completed_node_type)
252        return func
253    elif issubclass(completed_node_type, SyncDynamicFunctionNode):
254        setattr(func, "node_type", completed_node_type)
255        return func
256    else:
257        raise NodeCreationError(
258            message="The provided function did not create a valid node type.",
259            notes=[
260                "Please make a github issue with the details of what went wrong.",
261            ],
262        )

Creates a new Node type from a function that can be used in rt.call().

By default, it will parse the function's docstring and turn them into tool details and parameters. However, if you provide custom ToolManifest it will override that logic.

WARNING: If you overriding tool parameters. It is on you to make sure they will work with your function.

NOTE: If you have already converted this function to a node this function will do nothing

Arguments:
  • func (Callable): The function to convert into a Node.
  • name (str, optional): Human-readable name for the node/tool.
  • manifest (ToolManifest, optional): The details you would like to override the tool with.
def agent_node( name: str | None = None, *, rag: RagConfig | None = None, tool_nodes: Optional[Iterable[Union[Type[railtracks.nodes.nodes.Node], Callable, railtracks.built_nodes.concrete.function_base.RTFunction]]] = None, output_schema: Optional[Type[~_TBaseModel]] = None, llm: Optional[railtracks.llm.ModelBase[~_TStream]] = None, max_tool_calls: int | None = None, system_message: railtracks.llm.SystemMessage | str | None = None, manifest: ToolManifest | None = None):
129def agent_node(
130    name: str | None = None,
131    *,
132    rag: RagConfig | None = None,
133    tool_nodes: Iterable[Type[Node] | Callable | RTFunction] | None = None,
134    output_schema: Type[_TBaseModel] | None = None,
135    llm: ModelBase[_TStream] | None = None,
136    max_tool_calls: int | None = None,
137    system_message: SystemMessage | str | None = None,
138    manifest: ToolManifest | None = None,
139):
140    """
141    Dynamically creates an agent based on the provided parameters.
142
143    Args:
144        name (str | None): The name of the agent. If none the default will be used.
145        rag (RagConfig | None): If your agent is a rag agent put in the vector store it is connected to.
146        tool_nodes (set[Type[Node] | Callable | RTFunction] | None): If your agent is a LLM with access to tools, what does it have access to?
147        output_schema (Type[_TBaseModel] | None): If your agent should return a structured output, what is the output_schema?
148        llm (ModelBase): The LLM model to use. If None it will need to be passed in at instance time.
149        max_tool_calls (int | None): Maximum number of tool calls allowed (if it is a ToolCall Agent).
150        system_message (SystemMessage | str | None): System message for the agent.
151        manifest (ToolManifest | None): If you want to use this as a tool in other agents you can pass in a ToolManifest.
152    """
153    unpacked_tool_nodes: set[Type[Node]] | None = None
154    if tool_nodes is not None:
155        unpacked_tool_nodes = set()
156        for node in tool_nodes:
157            if isinstance(node, FunctionType):
158                unpacked_tool_nodes.add(extract_node_from_function(node))
159            else:
160                assert issubclass(node, Node), (
161                    f"Expected {node} to be a subclass of Node"
162                )
163                unpacked_tool_nodes.add(node)
164
165    # See issue (___) this logic should be migrated soon.
166    if manifest is not None:
167        tool_details = manifest.description
168        tool_params = manifest.parameters
169    else:
170        tool_details = None
171        tool_params = None
172
173    if unpacked_tool_nodes is not None and len(unpacked_tool_nodes) > 0:
174        if output_schema is not None:
175            agent = structured_tool_call_llm(
176                tool_nodes=unpacked_tool_nodes,
177                output_schema=output_schema,
178                name=name,
179                llm=llm,
180                max_tool_calls=max_tool_calls,
181                system_message=system_message,
182                tool_details=tool_details,
183                tool_params=tool_params,
184            )
185        else:
186            agent = tool_call_llm(
187                tool_nodes=unpacked_tool_nodes,
188                name=name,
189                llm=llm,
190                max_tool_calls=max_tool_calls,
191                system_message=system_message,
192                tool_details=tool_details,
193                tool_params=tool_params,
194            )
195    else:
196        if output_schema is not None:
197            agent = structured_llm(
198                output_schema=output_schema,
199                name=name,
200                llm=llm,
201                system_message=system_message,
202                tool_details=tool_details,
203                tool_params=tool_params,
204            )
205        else:
206            agent = terminal_llm(
207                name=name,
208                llm=llm,
209                system_message=system_message,
210                tool_details=tool_details,
211                tool_params=tool_params,
212            )
213
214    if rag is not None:
215
216        def _update_message_history(node: LLMBase):
217            node.message_hist = update_context(
218                node.message_hist, vs=rag.vector_store, top_k=rag.top_k
219            )
220            return
221
222        agent.add_pre_invoke(_update_message_history)
223
224    return agent

Dynamically creates an agent based on the provided parameters.

Arguments:
  • name (str | None): The name of the agent. If none the default will be used.
  • rag (RagConfig | None): If your agent is a rag agent put in the vector store it is connected to.
  • tool_nodes (set[Type[Node] | Callable | RTFunction] | None): If your agent is a LLM with access to tools, what does it have access to?
  • output_schema (Type[_TBaseModel] | None): If your agent should return a structured output, what is the output_schema?
  • llm (ModelBase): The LLM model to use. If None it will need to be passed in at instance time.
  • max_tool_calls (int | None): Maximum number of tool calls allowed (if it is a ToolCall Agent).
  • system_message (SystemMessage | str | None): System message for the agent.
  • manifest (ToolManifest | None): If you want to use this as a tool in other agents you can pass in a ToolManifest.
class MCPStdioParams(mcp.client.stdio.StdioServerParameters):
19class MCPStdioParams(StdioServerParameters):
20    timeout: timedelta = timedelta(seconds=30)
21
22    def as_stdio_params(self) -> StdioServerParameters:
23        # Collect all attributes except 'timeout'
24        stdio_kwargs = self.dict(exclude={"timeout"})
25        return StdioServerParameters(**stdio_kwargs)

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
timeout: datetime.timedelta
def as_stdio_params(self) -> mcp.client.stdio.StdioServerParameters:
22    def as_stdio_params(self) -> StdioServerParameters:
23        # Collect all attributes except 'timeout'
24        stdio_kwargs = self.dict(exclude={"timeout"})
25        return StdioServerParameters(**stdio_kwargs)
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class MCPHttpParams(pydantic.main.BaseModel):
28class MCPHttpParams(BaseModel):
29    url: str
30    headers: dict[str, Any] | None = None
31    timeout: timedelta = timedelta(seconds=30)
32    sse_read_timeout: timedelta = timedelta(seconds=60 * 5)
33    terminate_on_close: bool = True

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
url: str
headers: dict[str, typing.Any] | None
timeout: datetime.timedelta
sse_read_timeout: datetime.timedelta
terminate_on_close: bool
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def connect_mcp( config: MCPStdioParams | MCPHttpParams, client_session: mcp.client.session.ClientSession | None = None) -> railtracks.rt_mcp.main.MCPServer:
 8def connect_mcp(
 9    config: MCPStdioParams | MCPHttpParams, client_session: ClientSession | None = None
10) -> MCPServer:
11    """
12    Returns an MCPServer class. On creation, it will connect to the MCP server and fetch the tools.
13    The connection will remain open until the server is closed with `close()`.
14
15    Args:
16        config: Configuration for the MCP server, either as StdioServerParameters or MCPHttpParams.
17        client_session: Optional ClientSession to use for the MCP server connection. If not provided, a new session will be created.
18
19    Returns:
20        MCPServer: An instance of the MCPServer class.
21    """
22    # Apply Jupyter compatibility patches if needed
23    apply_patches()
24
25    return MCPServer(config=config, client_session=client_session)

Returns an MCPServer class. On creation, it will connect to the MCP server and fetch the tools. The connection will remain open until the server is closed with close().

Arguments:
  • config: Configuration for the MCP server, either as StdioServerParameters or MCPHttpParams.
  • client_session: Optional ClientSession to use for the MCP server connection. If not provided, a new session will be created.
Returns:

MCPServer: An instance of the MCPServer class.

def create_mcp_server( nodes: List[railtracks.nodes.nodes.Node | railtracks.built_nodes.concrete.function_base.RTFunction], server_name: str = 'MCP Server', fastmcp: mcp.server.fastmcp.server.FastMCP | None = None):
 86def create_mcp_server(
 87    nodes: List[Node | RTFunction],
 88    server_name: str = "MCP Server",
 89    fastmcp: FastMCP | None = None,
 90):
 91    """
 92    Create a FastMCP server that can be used to run nodes as MCP tools.
 93
 94    Args:
 95        nodes: List of Node classes to be registered as tools with the MCP server.
 96        server_name: Name of the MCP server instance.
 97        fastmcp: Optional FastMCP instance to use instead of creating a new one.
 98
 99    Returns:
100        A FastMCP server instance.
101    """
102    if fastmcp is not None:
103        if not isinstance(fastmcp, FastMCP):
104            raise ValueError("Provided fastmcp must be an instance of FastMCP.")
105        mcp = fastmcp
106    else:
107        mcp = FastMCP(server_name)
108
109    for node in [n if not hasattr(n, "node_type") else n.node_type for n in nodes]:
110        node_info = node.tool_info()
111        func = _create_tool_function(node, node_info)
112
113        mcp._tool_manager._tools[node_info.name] = MCPTool(
114            fn=func,
115            name=node_info.name,
116            description=node_info.detail,
117            parameters=(
118                _parameters_to_json_schema(node_info.parameters)
119                if node_info.parameters is not None
120                else {}
121            ),
122            fn_metadata=func_metadata(func, []),
123            is_async=True,
124            context_kwarg=None,
125            annotations=None,
126        )  # Register the node as a tool
127
128    return mcp

Create a FastMCP server that can be used to run nodes as MCP tools.

Arguments:
  • nodes: List of Node classes to be registered as tools with the MCP server.
  • server_name: Name of the MCP server instance.
  • fastmcp: Optional FastMCP instance to use instead of creating a new one.
Returns:

A FastMCP server instance.

class ToolManifest:
 7class ToolManifest:
 8    """
 9    Creates a manifest for a tool, which includes its description and parameters.
10
11    Args:
12        description (str): A description of the tool.
13        parameters (Iterable[Parameter] | None): An iterable of parameters for the tool. If None, there are no paramerters.
14    """
15
16    def __init__(
17        self,
18        description: str,
19        parameters: Iterable[Parameter] | None = None,
20    ):
21        self.description = description
22        self.parameters: List[Parameter] = (
23            list(parameters) if parameters is not None else []
24        )

Creates a manifest for a tool, which includes its description and parameters.

Arguments:
  • description (str): A description of the tool.
  • parameters (Iterable[Parameter] | None): An iterable of parameters for the tool. If None, there are no paramerters.
ToolManifest( description: str, parameters: Optional[Iterable[railtracks.llm.Parameter]] = None)
16    def __init__(
17        self,
18        description: str,
19        parameters: Iterable[Parameter] | None = None,
20    ):
21        self.description = description
22        self.parameters: List[Parameter] = (
23            list(parameters) if parameters is not None else []
24        )
description
parameters: List[railtracks.llm.Parameter]
def session_id():
428def session_id():
429    """
430    Gets the current session ID if it exists, otherwise returns None.
431    """
432    try:
433        return get_session_id()
434    except ContextError:
435        return None

Gets the current session ID if it exists, otherwise returns None.

class RagConfig:
 8class RagConfig:
 9    """
10    Configuration object for Retrieval-Augmented Generation (RAG).
11    """
12
13    def __init__(self, vector_store: VectorStore, top_k: int = 3) -> None:
14        self.vector_store = vector_store
15        self.top_k = top_k

Configuration object for Retrieval-Augmented Generation (RAG).

RagConfig( vector_store: railtracks.vector_stores.vector_store_base.VectorStore, top_k: int = 3)
13    def __init__(self, vector_store: VectorStore, top_k: int = 3) -> None:
14        self.vector_store = vector_store
15        self.top_k = top_k
vector_store
top_k