railtracks

The Railtracks Framework for building resilient agentic systems in simple python

 1#   -------------------------------------------------------------
 2#   Copyright (c) Railtown AI. All rights reserved.
 3#   Licensed under the MIT License. See LICENSE in project root for information.
 4#   -------------------------------------------------------------
 5"""The Railtracks Framework for building resilient agentic systems in simple python"""
 6
 7from __future__ import annotations
 8
 9import logging
10
11from dotenv import load_dotenv
12
13__all__ = [
14    "Session",
15    "session",
16    "call",
17    "broadcast",
18    "call_batch",
19    "interactive",
20    "ExecutionInfo",
21    "ExecutorConfig",
22    "llm",
23    "context",
24    "set_config",
25    "context",
26    "function_node",
27    "agent_node",
28    "integrations",
29    "prebuilt",
30    "MCPStdioParams",
31    "MCPHttpParams",
32    "connect_mcp",
33    "create_mcp_server",
34    "ToolManifest",
35    "session_id",
36    "evaluations",
37    "vector_stores",
38    "rag",
39    "RagConfig",
40    "Flow",
41    "enable_logging",
42]
43
44from railtracks.built_nodes.concrete.rag import RagConfig
45from railtracks.built_nodes.easy_usage_wrappers import (
46    agent_node,
47    function_node,
48)
49
50from . import context, evaluations, integrations, llm, prebuilt, rag, vector_stores
51from ._session import ExecutionInfo, Session, session
52from .context.central import session_id, set_config
53from .interaction import broadcast, call, call_batch, interactive
54from .nodes.manifest import ToolManifest
55from .orchestration.flow import Flow
56from .rt_mcp import MCPHttpParams, MCPStdioParams, connect_mcp, create_mcp_server
57from .utils.config import ExecutorConfig
58from .utils.logging.config import enable_logging
59
60load_dotenv()
61
62# Library does not configure logging by default. Add NullHandler so the RT logger
63# never emits "No handlers could be found". Call enable_logging() to opt in.
64logging.getLogger("RT").addHandler(logging.NullHandler())
65
66# Do not worry about changing this version number manually. It will updated on release.
67__version__ = "1.0.0"
class Session:
 38class Session:
 39    """
 40    The main class for managing an execution session.
 41
 42    This class is responsible for setting up all the necessary components for running a Railtracks execution, including the coordinator, publisher, and state management.
 43
 44    For the configuration parameters of the setting. It will follow this precedence:
 45    1. The parameters in the `Session` constructor.
 46    2. The parameters in global context variables.
 47    3. The default values.
 48
 49    Default Values:
 50    - `name`: None
 51    - `timeout`: 150.0 seconds
 52    - `end_on_error`: False
 53    - `broadcast_callback`: None (no callback for broadcast messages)
 54    - `prompt_injection`: True (the prompt will be automatically injected from context variables)
 55    - `save_state`: True (the state of the execution will be saved to a file at the end of the run in the `.railtracks/data/sessions/` directory)
 56
 57
 58    Args:
 59        name (str | None, optional): Optional name for the session. This name will be included in the saved state file if `save_state` is True.
 60        context (Dict[str, Any], optional): A dictionary of global context variables to be used during the execution.
 61        flow_name (str | None, optional): The name of the flow this session is associated with.
 62        flow_id (str | None, optional): The unique identifier of the flow this session is associated with.
 63        timeout (float, optional): The maximum number of seconds to wait for a response to your top-level request.
 64        end_on_error (bool, optional): If True, the execution will stop when an exception is encountered.
 65        broadcast_callback (Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None, optional): A callback function that will be called with the broadcast messages.
 66        prompt_injection (bool, optional): If True, the prompt will be automatically injected from context variables.
 67        save_state (bool, optional): If True, the state of the execution will be saved to a file at the end of the run in the `.railtracks/data/sessions/` directory.
 68    """
 69
 70    def __init__(
 71        self,
 72        context: Dict[str, Any] | None = None,
 73        *,
 74        flow_name: str | None = None,
 75        flow_id: str | None = None,
 76        name: str | None = None,
 77        timeout: float | None = None,
 78        end_on_error: bool | None = None,
 79        broadcast_callback: (
 80            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
 81        ) = None,
 82        prompt_injection: bool | None = None,
 83        save_state: bool | None = None,
 84        payload_callback: Callable[[dict[str, Any]], None] | None = None,
 85    ):
 86        # first lets read from defaults if nessecary for the provided input config
 87
 88        if flow_name is None:
 89            warnings.warn(
 90                "Sessions should be tied to a flow for better observability and state management. Please use the Flow object to create and manage your sessions (see __ for more details). This warning will become an error in future versions.",
 91                DeprecationWarning,
 92            )
 93
 94        self.executor_config = self.global_config_precedence(
 95            timeout=timeout,
 96            end_on_error=end_on_error,
 97            broadcast_callback=broadcast_callback,
 98            prompt_injection=prompt_injection,
 99            save_state=save_state,
100            payload_callback=payload_callback,
101        )
102
103        if context is None:
104            context = {}
105
106        self.name = name
107        self.flow_name = flow_name
108        self.flow_id = flow_id
109
110        self.publisher: RTPublisher = RTPublisher()
111
112        self._identifier = str(uuid.uuid4())
113
114        executor_info = ExecutionInfo.create_new()
115        self.coordinator = Coordinator(
116            execution_modes={"async": AsyncioExecutionStrategy()}
117        )
118        self.rt_state = RTState(
119            executor_info, self.executor_config, self.coordinator, self.publisher
120        )
121
122        self.coordinator.start(self.publisher)
123        self._setup_subscriber()
124        register_globals(
125            session_id=self._identifier,
126            rt_publisher=self.publisher,
127            parent_id=None,
128            executor_config=self.executor_config,
129            global_context_vars=context,
130        )
131
132        self._start_time = time.time()
133
134        logger.debug("Session %s is initialized" % self._identifier)
135
136    @classmethod
137    def global_config_precedence(
138        cls,
139        timeout: float | None,
140        end_on_error: bool | None,
141        broadcast_callback: (
142            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
143        ),
144        prompt_injection: bool | None,
145        save_state: bool | None,
146        payload_callback: Callable[[dict[str, Any]], None] | None,
147    ) -> ExecutorConfig:
148        """
149        Uses the following precedence order to determine the configuration parameters:
150        1. The parameters in the method parameters.
151        2. The parameters in global context variables.
152        3. The default values.
153        """
154        global_executor_config = get_global_config()
155
156        return global_executor_config.precedence_overwritten(
157            timeout=timeout,
158            end_on_error=end_on_error,
159            subscriber=broadcast_callback,
160            prompt_injection=prompt_injection,
161            save_state=save_state,
162            payload_callback=payload_callback,
163        )
164
165    def __enter__(self):
166        return self
167
168    def __exit__(self, exc_type, exc_val, exc_tb):
169        if self.executor_config.save_state:
170            try:
171                railtracks_home = os.environ.get("RAILTRACKS_HOME", ".railtracks")
172                railtracks_dir = Path(railtracks_home)
173                sessions_dir = railtracks_dir / "data" / "sessions"
174                sessions_dir.mkdir(
175                    parents=True, exist_ok=True
176                )  # Creates directory structure if doesn't exist, skips otherwise.
177
178                # Try to create file path with name, fallback to identifier only if there's an issue
179                if self.flow_name is not None:
180                    name = self.flow_name
181                elif self.name is not None:
182                    name = self.name
183                else:
184                    name = ""
185
186                try:
187                    file_path = sessions_dir / f"{name}_{self._identifier}.json"
188                    file_path.touch()
189                except FileNotFoundError:
190                    logger.warning(
191                        get_message(
192                            ExceptionMessageKey.INVALID_SESSION_FILE_NAME_WARN
193                        ).format(name=name, identifier=self._identifier)
194                    )
195                    file_path = sessions_dir / f"{self._identifier}.json"
196
197                logger.info("Saving execution info to %s" % file_path)
198
199                file_path.write_text(json.dumps(self.payload()))
200
201            except Exception as e:
202                logger.error(
203                    "Error while saving to execution info to file",
204                    exc_info=e,
205                )
206        try:
207            if self.executor_config.payload_callback is not None:
208                self.executor_config.payload_callback(self.payload())
209        except Exception:
210            # TODO: add logging here.
211            pass
212
213        self._close()
214
215    def _setup_subscriber(self):
216        """
217        Prepares and attaches the saved broadcast_callback to the publisher attached to this runner.
218        """
219
220        if self.executor_config.subscriber is not None:
221            self.publisher.subscribe(
222                stream_subscriber(self.executor_config.subscriber),
223                name="Streaming Subscriber",
224            )
225
226    def _close(self):
227        """
228        Closes the runner and cleans up all resources.
229
230        - Shuts down the state object
231        - Deletes all the global variables that were registered in the context
232        """
233        # FIX: Resource leak - publisher background task wasn't being shut down on Session exit
234        # VISION: Session owns publisher lifecycle and must clean up all resources when exiting
235        if self.publisher.is_running():
236            try:
237                # Signal shutdown by setting the flag - the loop will check this and exit
238                self.publisher._running = False
239
240                # Try to cancel the background task if it exists and isn't done
241                if (
242                    self.publisher.pub_loop is not None
243                    and not self.publisher.pub_loop.done()
244                ):
245                    try:
246                        # Cancel the task - it will check _running and exit naturally
247                        self.publisher.pub_loop.cancel()
248                    except Exception:
249                        # Task might be done or in a different loop, that's okay
250                        pass
251            except Exception:
252                # If shutdown fails for any reason, log it but don't crash
253                logger.warning(
254                    "Failed to shutdown publisher during Session cleanup. "
255                    "This may indicate a resource leak.",
256                    exc_info=True,
257                )
258
259        self.rt_state.shutdown()
260
261        delete_globals()
262        # by deleting all of the state variables we are ensuring that the next time we create a runner it is fresh
263
264    @property
265    def info(self) -> ExecutionInfo:
266        """
267        Returns the current state of the runner.
268
269        This is useful for debugging and viewing the current state of the run.
270        """
271        return self.rt_state.info
272
273    def payload(self) -> Dict[str, Any]:
274        """
275        Gets the complete json payload tied to this session.
276
277        The outputted json schema is maintained in (link here)
278        """
279        info = self.info
280
281        run_list = info.graph_serialization()
282
283        full_dict = {
284            "flow_name": self.flow_name,
285            "flow_id": self.flow_id,
286            "session_id": self._identifier,
287            "session_name": self.name,
288            "start_time": self._start_time,
289            "end_time": time.time(),
290            "runs": run_list,
291        }
292
293        return json.loads(json.dumps(full_dict))

The main class for managing an execution session.

This class is responsible for setting up all the necessary components for running a Railtracks execution, including the coordinator, publisher, and state management.

For the configuration parameters of the setting. It will follow this precedence:

  1. The parameters in the Session constructor.
  2. The parameters in global context variables.
  3. The default values.

Default Values:

  • name: None
  • timeout: 150.0 seconds
  • end_on_error: False
  • broadcast_callback: None (no callback for broadcast messages)
  • prompt_injection: True (the prompt will be automatically injected from context variables)
  • save_state: True (the state of the execution will be saved to a file at the end of the run in the .railtracks/data/sessions/ directory)
Arguments:
  • name (str | None, optional): Optional name for the session. This name will be included in the saved state file if save_state is True.
  • context (Dict[str, Any], optional): A dictionary of global context variables to be used during the execution.
  • flow_name (str | None, optional): The name of the flow this session is associated with.
  • flow_id (str | None, optional): The unique identifier of the flow this session is associated with.
  • timeout (float, optional): The maximum number of seconds to wait for a response to your top-level request.
  • end_on_error (bool, optional): If True, the execution will stop when an exception is encountered.
  • broadcast_callback (Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None, optional): A callback function that will be called with the broadcast messages.
  • prompt_injection (bool, optional): If True, the prompt will be automatically injected from context variables.
  • save_state (bool, optional): If True, the state of the execution will be saved to a file at the end of the run in the .railtracks/data/sessions/ directory.
Session( context: Optional[Dict[str, Any]] = None, *, flow_name: str | None = None, flow_id: str | None = None, name: str | None = None, timeout: float | None = None, end_on_error: bool | None = None, broadcast_callback: Union[Callable[[str], NoneType], Callable[[str], Coroutine[NoneType, NoneType, NoneType]], NoneType] = None, prompt_injection: bool | None = None, save_state: bool | None = None, payload_callback: Optional[Callable[[dict[str, Any]], NoneType]] = None)
 70    def __init__(
 71        self,
 72        context: Dict[str, Any] | None = None,
 73        *,
 74        flow_name: str | None = None,
 75        flow_id: str | None = None,
 76        name: str | None = None,
 77        timeout: float | None = None,
 78        end_on_error: bool | None = None,
 79        broadcast_callback: (
 80            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
 81        ) = None,
 82        prompt_injection: bool | None = None,
 83        save_state: bool | None = None,
 84        payload_callback: Callable[[dict[str, Any]], None] | None = None,
 85    ):
 86        # first lets read from defaults if nessecary for the provided input config
 87
 88        if flow_name is None:
 89            warnings.warn(
 90                "Sessions should be tied to a flow for better observability and state management. Please use the Flow object to create and manage your sessions (see __ for more details). This warning will become an error in future versions.",
 91                DeprecationWarning,
 92            )
 93
 94        self.executor_config = self.global_config_precedence(
 95            timeout=timeout,
 96            end_on_error=end_on_error,
 97            broadcast_callback=broadcast_callback,
 98            prompt_injection=prompt_injection,
 99            save_state=save_state,
100            payload_callback=payload_callback,
101        )
102
103        if context is None:
104            context = {}
105
106        self.name = name
107        self.flow_name = flow_name
108        self.flow_id = flow_id
109
110        self.publisher: RTPublisher = RTPublisher()
111
112        self._identifier = str(uuid.uuid4())
113
114        executor_info = ExecutionInfo.create_new()
115        self.coordinator = Coordinator(
116            execution_modes={"async": AsyncioExecutionStrategy()}
117        )
118        self.rt_state = RTState(
119            executor_info, self.executor_config, self.coordinator, self.publisher
120        )
121
122        self.coordinator.start(self.publisher)
123        self._setup_subscriber()
124        register_globals(
125            session_id=self._identifier,
126            rt_publisher=self.publisher,
127            parent_id=None,
128            executor_config=self.executor_config,
129            global_context_vars=context,
130        )
131
132        self._start_time = time.time()
133
134        logger.debug("Session %s is initialized" % self._identifier)
executor_config
name
flow_name
flow_id
publisher: railtracks.pubsub.publisher.RTPublisher
coordinator
rt_state
@classmethod
def global_config_precedence( cls, timeout: float | None, end_on_error: bool | None, broadcast_callback: Union[Callable[[str], NoneType], Callable[[str], Coroutine[NoneType, NoneType, NoneType]], NoneType], prompt_injection: bool | None, save_state: bool | None, payload_callback: Optional[Callable[[dict[str, Any]], NoneType]]) -> ExecutorConfig:
136    @classmethod
137    def global_config_precedence(
138        cls,
139        timeout: float | None,
140        end_on_error: bool | None,
141        broadcast_callback: (
142            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
143        ),
144        prompt_injection: bool | None,
145        save_state: bool | None,
146        payload_callback: Callable[[dict[str, Any]], None] | None,
147    ) -> ExecutorConfig:
148        """
149        Uses the following precedence order to determine the configuration parameters:
150        1. The parameters in the method parameters.
151        2. The parameters in global context variables.
152        3. The default values.
153        """
154        global_executor_config = get_global_config()
155
156        return global_executor_config.precedence_overwritten(
157            timeout=timeout,
158            end_on_error=end_on_error,
159            subscriber=broadcast_callback,
160            prompt_injection=prompt_injection,
161            save_state=save_state,
162            payload_callback=payload_callback,
163        )

Uses the following precedence order to determine the configuration parameters:

  1. The parameters in the method parameters.
  2. The parameters in global context variables.
  3. The default values.
info: ExecutionInfo
264    @property
265    def info(self) -> ExecutionInfo:
266        """
267        Returns the current state of the runner.
268
269        This is useful for debugging and viewing the current state of the run.
270        """
271        return self.rt_state.info

Returns the current state of the runner.

This is useful for debugging and viewing the current state of the run.

def payload(self) -> Dict[str, Any]:
273    def payload(self) -> Dict[str, Any]:
274        """
275        Gets the complete json payload tied to this session.
276
277        The outputted json schema is maintained in (link here)
278        """
279        info = self.info
280
281        run_list = info.graph_serialization()
282
283        full_dict = {
284            "flow_name": self.flow_name,
285            "flow_id": self.flow_id,
286            "session_id": self._identifier,
287            "session_name": self.name,
288            "start_time": self._start_time,
289            "end_time": time.time(),
290            "runs": run_list,
291        }
292
293        return json.loads(json.dumps(full_dict))

Gets the complete json payload tied to this session.

The outputted json schema is maintained in (link here)

def session( func: Optional[Callable[~_P, Coroutine[Any, Any, ~_TOutput]]] = None, *, name: str | None = None, context: Optional[Dict[str, Any]] = None, timeout: float | None = None, end_on_error: bool | None = None, broadcast_callback: Union[Callable[[str], NoneType], Callable[[str], Coroutine[NoneType, NoneType, NoneType]], NoneType] = None, prompt_injection: bool | None = None, save_state: bool | None = None) -> Union[Callable[~_P, Coroutine[Any, Any, Tuple[~_TOutput, Session]]], Callable[[Callable[~_P, Coroutine[Any, Any, ~_TOutput]]], Callable[~_P, Coroutine[Any, Any, Tuple[~_TOutput, Session]]]]]:
350def session(
351    func: Callable[_P, Coroutine[Any, Any, _TOutput]] | None = None,
352    *,
353    name: str | None = None,
354    context: Dict[str, Any] | None = None,
355    timeout: float | None = None,
356    end_on_error: bool | None = None,
357    broadcast_callback: (
358        Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
359    ) = None,
360    prompt_injection: bool | None = None,
361    save_state: bool | None = None,
362) -> (
363    Callable[_P, Coroutine[Any, Any, Tuple[_TOutput, Session]]]
364    | Callable[
365        [Callable[_P, Coroutine[Any, Any, _TOutput]]],
366        Callable[_P, Coroutine[Any, Any, Tuple[_TOutput, Session]]],
367    ]
368):
369    """
370    This decorator automatically creates and manages a Session context for the decorated function,
371    allowing async functions to use Railtracks operations without manually managing the session lifecycle.
372
373    Can be used as:
374    - @session (without parentheses) - uses default settings
375    - @session() (with empty parentheses) - uses default settings
376    - @session(name="my_task", timeout=30) (with configuration parameters)
377
378    When using this decorator, the function returns a tuple containing:
379    1. The original function's return value
380    2. The Session object used during execution
381
382    This allows access to session information (like execution state, logs, etc.) after the function completes,
383    while maintaining the simplicity of decorator usage.
384
385    Args:
386        name (str | None, optional): Optional name for the session. This name will be included in the saved state file if `save_state` is True.
387        context (Dict[str, Any], optional): A dictionary of global context variables to be used during the execution.
388        timeout (float, optional): The maximum number of seconds to wait for a response to your top-level request.
389        end_on_error (bool, optional): If True, the execution will stop when an exception is encountered.
390        broadcast_callback (Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None, optional): A callback function that will be called with the broadcast messages.
391        prompt_injection (bool, optional): If True, the prompt will be automatically injected from context variables.
392        save_state (bool, optional): If True, the state of the execution will be saved to a file at the end of the run in the `.railtracks/data/sessions/` directory.
393
394    Returns:
395        When used as @session (without parentheses): Returns the decorated function that returns (result, session).
396        When used as @session(...) (with parameters): Returns a decorator function that takes an async function
397        and returns a new async function that returns (result, session).
398    """
399
400    def decorator(
401        target_func: Callable[_P, Coroutine[Any, Any, _TOutput]],
402    ) -> Callable[_P, Coroutine[Any, Any, Tuple[_TOutput, Session]]]:
403        # Validate that the decorated function is async
404        if not inspect.iscoroutinefunction(target_func):
405            raise TypeError(
406                f"@session decorator can only be applied to async functions. "
407                f"Function '{target_func.__name__}' is not async. "
408                f"Add 'async' keyword to your function definition."
409            )
410
411        @wraps(target_func)
412        async def wrapper(
413            *args: _P.args, **kwargs: _P.kwargs
414        ) -> Tuple[_TOutput, Session]:
415            session_obj = Session(
416                context=context,
417                timeout=timeout,
418                end_on_error=end_on_error,
419                broadcast_callback=broadcast_callback,
420                name=name,
421                prompt_injection=prompt_injection,
422                save_state=save_state,
423            )
424
425            with session_obj:
426                result = await target_func(*args, **kwargs)
427                return result, session_obj
428
429        return wrapper
430
431    # If used as @session without parentheses
432    if func is not None:
433        return decorator(func)
434
435    # If used as @session(...)
436    return decorator

This decorator automatically creates and manages a Session context for the decorated function, allowing async functions to use Railtracks operations without manually managing the session lifecycle.

Can be used as:

  • @session (without parentheses) - uses default settings
  • @session() (with empty parentheses) - uses default settings
  • @session(name="my_task", timeout=30) (with configuration parameters)

When using this decorator, the function returns a tuple containing:

  1. The original function's return value
  2. The Session object used during execution

This allows access to session information (like execution state, logs, etc.) after the function completes, while maintaining the simplicity of decorator usage.

Arguments:
  • name (str | None, optional): Optional name for the session. This name will be included in the saved state file if save_state is True.
  • context (Dict[str, Any], optional): A dictionary of global context variables to be used during the execution.
  • timeout (float, optional): The maximum number of seconds to wait for a response to your top-level request.
  • end_on_error (bool, optional): If True, the execution will stop when an exception is encountered.
  • broadcast_callback (Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None, optional): A callback function that will be called with the broadcast messages.
  • prompt_injection (bool, optional): If True, the prompt will be automatically injected from context variables.
  • save_state (bool, optional): If True, the state of the execution will be saved to a file at the end of the run in the .railtracks/data/sessions/ directory.
Returns:

When used as @session (without parentheses): Returns the decorated function that returns (result, session). When used as @session(...) (with parameters): Returns a decorator function that takes an async function and returns a new async function that returns (result, session).

async def call( node_: Union[Callable[~_P, railtracks.nodes.nodes.Node[~_TOutput]], railtracks.built_nodes.concrete.function_base.RTFunction[~_P, ~_TOutput]], *args: _P.args, **kwargs: _P.kwargs) -> ~_TOutput:
 59async def call(
 60    node_: Callable[_P, Node[_TOutput]] | RTFunction[_P, _TOutput],
 61    *args: _P.args,
 62    **kwargs: _P.kwargs,
 63) -> _TOutput:
 64    """
 65    Call a node from within a node inside the framework. This will return a coroutine that you can interact with
 66    in whatever way using async/await logic.
 67
 68    Usage:
 69    ```python
 70    # for sequential operation
 71    result = await call(NodeA, "hello world", 42)
 72
 73    # for parallel operation
 74    tasks = [call(NodeA, "hello world", i) for i in range(10)]
 75    results = await asyncio.gather(*tasks)
 76    ```
 77
 78    Args:
 79        node: The node type you would like to create. This could be a function decorated with `@function_node`, a function, or a Node instance.
 80        *args: The arguments to pass to the node
 81        **kwargs: The keyword arguments to pass to the node
 82    """
 83    node: Callable[_P, Node[_TOutput]]
 84    # this entire section is a bit of a typing nightmare becuase all overloads we provide.
 85    if isinstance(node_, FunctionType):
 86        node = extract_node_from_function(node_)
 87    else:
 88        node = node_
 89    # if the context is none then we will need to create a wrapper for the state object to work with.
 90    if not is_context_present():
 91        # we have to use lazy import here to prevent a circular import issue. This is a must have unfortunately.
 92        from railtracks import Session
 93
 94        with Session():
 95            result = await _start(node, args=args, kwargs=kwargs)
 96            return result
 97
 98    # if the context is not active then we know this is the top level request
 99    if not is_context_active():
100        result = await _start(node, args=args, kwargs=kwargs)
101        return result
102
103    # if the context is active then we can just run the node
104    result = await _run(node, args=args, kwargs=kwargs)
105    return result

Call a node from within a node inside the framework. This will return a coroutine that you can interact with in whatever way using async/await logic.

Usage:

# for sequential operation
result = await call(NodeA, "hello world", 42)

# for parallel operation
tasks = [call(NodeA, "hello world", i) for i in range(10)]
results = await asyncio.gather(*tasks)
Arguments:
  • node: The node type you would like to create. This could be a function decorated with @function_node, a function, or a Node instance.
  • *args: The arguments to pass to the node
  • **kwargs: The keyword arguments to pass to the node
async def broadcast(item: str):
 6async def broadcast(item: str):
 7    """
 8    Streams the given message
 9
10    This will trigger the broadcast_callback callback you have already provided.
11
12    Args:
13        item (str): The item you want to stream.
14    """
15    publisher = get_publisher()
16
17    await publisher.publish(Streaming(node_id=get_parent_id(), streamed_object=item))

Streams the given message

This will trigger the broadcast_callback callback you have already provided.

Arguments:
  • item (str): The item you want to stream.
async def call_batch( node: 'Callable[..., Node[_TOutput]] | Callable[..., _TOutput] | _AsyncNodeAttachedFunc[_P, _TOutput] | _SyncNodeAttachedFunc[_P, _TOutput]', *iterables: Iterable[Any], return_exceptions: bool = True):
27async def call_batch(
28    node: Callable[..., Node[_TOutput]]
29    | Callable[..., _TOutput]
30    | _AsyncNodeAttachedFunc[_P, _TOutput]
31    | _SyncNodeAttachedFunc[_P, _TOutput],
32    *iterables: Iterable[Any],
33    return_exceptions: bool = True,
34):
35    """
36    Complete a node over multiple iterables, allowing for parallel execution.
37
38    Note the results will be returned in the order of the iterables, not the order of completion.
39
40    If one of the nodes returns an exception, the thrown exception will be included as a response.
41
42    Args:
43        node: The node type to create.
44        *iterables: The iterables to map the node over.
45        return_exceptions: If True, exceptions will be returned as part of the results.
46            If False, exceptions will be raised immediately, and you will lose access to the results.
47            Defaults to true.
48
49    Returns:
50        An iterable of results from the node.
51
52    Usage:
53        ```python
54        results = await batch(NodeA, ["hello world"] * 10)
55        for result in results:
56            handle(result)
57        ```
58    """
59    # this is big typing disaster but there is no way around it. Try if if you want to.
60    contracts = [call(node, *args) for args in zip(*iterables)]
61
62    results = await asyncio.gather(*contracts, return_exceptions=return_exceptions)
63    return results

Complete a node over multiple iterables, allowing for parallel execution.

Note the results will be returned in the order of the iterables, not the order of completion.

If one of the nodes returns an exception, the thrown exception will be included as a response.

Arguments:
  • node: The node type to create.
  • *iterables: The iterables to map the node over.
  • return_exceptions: If True, exceptions will be returned as part of the results. If False, exceptions will be raised immediately, and you will lose access to the results. Defaults to true.
Returns:

An iterable of results from the node.

Usage:
results = await batch(NodeA, ["hello world"] * 10)
for result in results:
    handle(result)
class ExecutionInfo:
 19class ExecutionInfo:
 20    """
 21    A class that contains the full details of the state of a run at any given point in time.
 22
 23    The class is designed to be used as a snapshot of state that can be used to display the state of the run, or to
 24    create a graphical representation of the system.
 25    """
 26
 27    def __init__(
 28        self,
 29        request_forest: RequestForest,
 30        node_forest: NodeForest,
 31        stamper: StampManager,
 32    ):
 33        self.request_forest = request_forest
 34        self.node_forest = node_forest
 35        self.stamper = stamper
 36
 37    @classmethod
 38    def default(cls) -> ExecutionInfo:
 39        """Creates a new "empty" instance of the ExecutionInfo class with the default values."""
 40        return cls.create_new()
 41
 42    @classmethod
 43    def create_new(
 44        cls,
 45    ) -> ExecutionInfo:
 46        """
 47        Creates a new empty instance of state variables with the provided executor configuration.
 48
 49        """
 50        request_heap = RequestForest()
 51        node_heap = NodeForest()
 52        stamper = StampManager()
 53
 54        return ExecutionInfo(
 55            request_forest=request_heap,
 56            node_forest=node_heap,
 57            stamper=stamper,
 58        )
 59
 60    @property
 61    def answer(self):
 62        """Convenience method to access the answer of the run."""
 63        return self.request_forest.answer
 64
 65    @property
 66    def all_stamps(self) -> List[Stamp]:
 67        """Convenience method to access all the stamps of the run."""
 68        return self.stamper.all_stamps
 69
 70    @property
 71    def name(self):
 72        """
 73        Gets the name of the graph by pulling the name of the insertion request. It will raise a ValueError if the insertion
 74        request is not present or there are multiple insertion requests.
 75        """
 76        insertion_requests = self.insertion_requests
 77
 78        # The name is only defined for the length of 1.
 79        # NOTE: Maybe we should send a warning once to user in other cases.
 80        if len(insertion_requests) != 1:
 81            return None
 82
 83        i_r = insertion_requests[0]
 84
 85        return self.node_forest.get_node_type(i_r.sink_id).name()
 86
 87    @property
 88    def insertion_requests(self):
 89        """A convenience method to access all the insertion requests of the run."""
 90        return self.request_forest.insertion_request
 91
 92    def _get_info(self, ids: List[str] | str | None = None) -> ExecutionInfo:
 93        """
 94        Gets a subset of the current state based on the provided node ids. It will contain all the children of the provided node ids
 95
 96        Note: If no ids are provided, the full state is returned.
 97
 98        Args:
 99            ids (List[str] | str | None): A list of node ids to filter the state by. If None, the full state is returned.
100
101        Returns:
102            ExecutionInfo: A new instance of ExecutionInfo containing only the children of the provided ids.
103
104        """
105        if ids is None:
106            return self
107        else:
108            # firstly lets
109            if isinstance(ids, str):
110                ids = [ids]
111
112            # we need to quickly check to make sure these ids are valid
113            for identifier in ids:
114                if identifier not in self.request_forest:
115                    raise ValueError(
116                        f"Identifier '{identifier}' not found in the current state."
117                    )
118
119            new_node_forest, new_request_forest = create_sub_state_info(
120                self.node_forest.heap(),
121                self.request_forest.heap(),
122                ids,
123            )
124            return ExecutionInfo(
125                node_forest=new_node_forest,
126                request_forest=new_request_forest,
127                stamper=self.stamper,
128            )
129
130    def _to_graph(self) -> Tuple[List[Vertex], List[Edge]]:
131        """
132        Converts the current state into its graph representation.
133
134        Returns:
135            List[Node]: An iterable of nodes in the graph.
136            List[Edge]: An iterable of edges in the graph.
137        """
138        return self.node_forest.to_vertices(), self.request_forest.to_edges()
139
140    def graph_serialization(self) -> dict[str, Any]:
141        """
142                Creates a string (JSON) representation of this info object designed to be used to construct a graph for this
143                info object.
144
145                Some important notes about its structure are outlined below:
146                - The `nodes` key contains a list of all the nodes in the graph, represented as `Vertex` objects.
147                - The `edges` key contains a list of all the edges in the graph, represented as `Edge` objects.
148                - The `stamps` key contains an ease of use list of all the stamps associated with the run, represented as `Stamp` objects.
149
150                - The "nodes" and "requests" key will be outlined with normal graph details like connections and identifiers in addition to a loose details object.
151                - However, both will carry an addition param called "stamp" which is a timestamp style object.
152                - They also will carry a "parent" param which is a recursive structure that allows you to traverse the graph in time.
153
154
155        ```
156        """
157        parent_nodes = [x.identifier for x in self.insertion_requests]
158
159        infos = [self._get_info(parent_node) for parent_node in parent_nodes]
160
161        runs = []
162
163        for info, parent_node_id in zip(infos, parent_nodes):
164            insertion_requests = info.request_forest.insertion_request
165
166            assert len(insertion_requests) == 1
167            parent_request = insertion_requests[0]
168
169            all_parents = parent_request.get_all_parents()
170
171            start_time = all_parents[-1].stamp.time
172
173            assert len([x for x in all_parents if x.status == "Completed"]) <= 1
174            end_time = None
175            for req in all_parents:
176                if req.status in ["Completed", "Failed"]:
177                    end_time = req.stamp.time
178                    break
179
180            entry = {
181                "name": info.name,
182                "run_id": parent_node_id,
183                "nodes": info.node_forest.to_vertices(),
184                "status": parent_request.status,
185                "edges": info.request_forest.to_edges(),
186                "steps": _get_stamps_from_forests(
187                    info.node_forest, info.request_forest
188                ),
189                "start_time": start_time,
190                "end_time": end_time,
191            }
192            runs.append(entry)
193
194        return json.loads(
195            json.dumps(
196                runs,
197                cls=RTJSONEncoder,
198            )
199        )

A class that contains the full details of the state of a run at any given point in time.

The class is designed to be used as a snapshot of state that can be used to display the state of the run, or to create a graphical representation of the system.

ExecutionInfo( request_forest: railtracks.state.request.RequestForest, node_forest: railtracks.state.node.NodeForest, stamper: railtracks.utils.profiling.StampManager)
27    def __init__(
28        self,
29        request_forest: RequestForest,
30        node_forest: NodeForest,
31        stamper: StampManager,
32    ):
33        self.request_forest = request_forest
34        self.node_forest = node_forest
35        self.stamper = stamper
request_forest
node_forest
stamper
@classmethod
def default(cls) -> ExecutionInfo:
37    @classmethod
38    def default(cls) -> ExecutionInfo:
39        """Creates a new "empty" instance of the ExecutionInfo class with the default values."""
40        return cls.create_new()

Creates a new "empty" instance of the ExecutionInfo class with the default values.

@classmethod
def create_new(cls) -> ExecutionInfo:
42    @classmethod
43    def create_new(
44        cls,
45    ) -> ExecutionInfo:
46        """
47        Creates a new empty instance of state variables with the provided executor configuration.
48
49        """
50        request_heap = RequestForest()
51        node_heap = NodeForest()
52        stamper = StampManager()
53
54        return ExecutionInfo(
55            request_forest=request_heap,
56            node_forest=node_heap,
57            stamper=stamper,
58        )

Creates a new empty instance of state variables with the provided executor configuration.

answer
60    @property
61    def answer(self):
62        """Convenience method to access the answer of the run."""
63        return self.request_forest.answer

Convenience method to access the answer of the run.

all_stamps: List[railtracks.utils.profiling.Stamp]
65    @property
66    def all_stamps(self) -> List[Stamp]:
67        """Convenience method to access all the stamps of the run."""
68        return self.stamper.all_stamps

Convenience method to access all the stamps of the run.

name
70    @property
71    def name(self):
72        """
73        Gets the name of the graph by pulling the name of the insertion request. It will raise a ValueError if the insertion
74        request is not present or there are multiple insertion requests.
75        """
76        insertion_requests = self.insertion_requests
77
78        # The name is only defined for the length of 1.
79        # NOTE: Maybe we should send a warning once to user in other cases.
80        if len(insertion_requests) != 1:
81            return None
82
83        i_r = insertion_requests[0]
84
85        return self.node_forest.get_node_type(i_r.sink_id).name()

Gets the name of the graph by pulling the name of the insertion request. It will raise a ValueError if the insertion request is not present or there are multiple insertion requests.

insertion_requests
87    @property
88    def insertion_requests(self):
89        """A convenience method to access all the insertion requests of the run."""
90        return self.request_forest.insertion_request

A convenience method to access all the insertion requests of the run.

def graph_serialization(self) -> dict[str, typing.Any]:
140    def graph_serialization(self) -> dict[str, Any]:
141        """
142                Creates a string (JSON) representation of this info object designed to be used to construct a graph for this
143                info object.
144
145                Some important notes about its structure are outlined below:
146                - The `nodes` key contains a list of all the nodes in the graph, represented as `Vertex` objects.
147                - The `edges` key contains a list of all the edges in the graph, represented as `Edge` objects.
148                - The `stamps` key contains an ease of use list of all the stamps associated with the run, represented as `Stamp` objects.
149
150                - The "nodes" and "requests" key will be outlined with normal graph details like connections and identifiers in addition to a loose details object.
151                - However, both will carry an addition param called "stamp" which is a timestamp style object.
152                - They also will carry a "parent" param which is a recursive structure that allows you to traverse the graph in time.
153
154
155        ```
156        """
157        parent_nodes = [x.identifier for x in self.insertion_requests]
158
159        infos = [self._get_info(parent_node) for parent_node in parent_nodes]
160
161        runs = []
162
163        for info, parent_node_id in zip(infos, parent_nodes):
164            insertion_requests = info.request_forest.insertion_request
165
166            assert len(insertion_requests) == 1
167            parent_request = insertion_requests[0]
168
169            all_parents = parent_request.get_all_parents()
170
171            start_time = all_parents[-1].stamp.time
172
173            assert len([x for x in all_parents if x.status == "Completed"]) <= 1
174            end_time = None
175            for req in all_parents:
176                if req.status in ["Completed", "Failed"]:
177                    end_time = req.stamp.time
178                    break
179
180            entry = {
181                "name": info.name,
182                "run_id": parent_node_id,
183                "nodes": info.node_forest.to_vertices(),
184                "status": parent_request.status,
185                "edges": info.request_forest.to_edges(),
186                "steps": _get_stamps_from_forests(
187                    info.node_forest, info.request_forest
188                ),
189                "start_time": start_time,
190                "end_time": end_time,
191            }
192            runs.append(entry)
193
194        return json.loads(
195            json.dumps(
196                runs,
197                cls=RTJSONEncoder,
198            )
199        )

Creates a string (JSON) representation of this info object designed to be used to construct a graph for this info object.

    Some important notes about its structure are outlined below:
    - The `nodes` key contains a list of all the nodes in the graph, represented as `Vertex` objects.
    - The `edges` key contains a list of all the edges in the graph, represented as `Edge` objects.
    - The `stamps` key contains an ease of use list of all the stamps associated with the run, represented as `Stamp` objects.

    - The "nodes" and "requests" key will be outlined with normal graph details like connections and identifiers in addition to a loose details object.
    - However, both will carry an addition param called "stamp" which is a timestamp style object.
    - They also will carry a "parent" param which is a recursive structure that allows you to traverse the graph in time.

```

class ExecutorConfig:
 8class ExecutorConfig:
 9    def __init__(
10        self,
11        *,
12        timeout: float | None = None,
13        end_on_error: bool = False,
14        broadcast_callback: (
15            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
16        ) = None,
17        prompt_injection: bool = True,
18        save_state: bool = True,
19        payload_callback: Callable[[dict[str, Any]], None] | None = None,
20    ):
21        """
22        ExecutorConfig is special configuration object designed to allow customization of the executor in the RT system.
23
24        Args:
25            timeout (float | None): The maximum number of seconds to wait for a response to your top level request. Pass None (or omit) to disable the timeout entirely.
26            end_on_error (bool): If true, the executor will stop execution when an exception is encountered.
27            broadcast_callback (Callable or Coroutine): A function or coroutine that will handle streaming messages.
28            prompt_injection (bool): If true, prompts can be injected with global context
29            save_state (bool): If true, the state of the executor will be saved to disk.
30        """
31        self.timeout = timeout
32        self.end_on_error = end_on_error
33        self.subscriber = broadcast_callback
34        self.prompt_injection = prompt_injection
35        # During test runs, disable save_state by default unless RAILTRACKS_ALLOW_PERSISTENCE is set
36        self._user_save_state = save_state
37
38        self.payload_callback = payload_callback
39
40    # this is done because if we try to lock the save_state in init
41    # later when we want to allow a few tests to actually run persistance, they wont be able to do so
42    @property
43    def save_state(self) -> bool:
44        if os.getenv("RAILTRACKS_TEST_MODE") and not os.getenv(
45            "RAILTRACKS_ALLOW_PERSISTENCE"
46        ):
47            return False
48        return self._user_save_state
49
50    def precedence_overwritten(
51        self,
52        *,
53        timeout: float | None = None,
54        end_on_error: bool | None = None,
55        subscriber: (
56            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
57        ) = None,
58        prompt_injection: bool | None = None,
59        save_state: bool | None = None,
60        payload_callback: Callable[[dict[str, Any]], None] | None = None,
61    ):
62        """
63        If any of the parameters are provided (not None), it will create a new update the current instance with the new values and return a deep copied reference to it.
64        """
65        return ExecutorConfig(
66            timeout=timeout,
67            end_on_error=end_on_error
68            if end_on_error is not None
69            else self.end_on_error,
70            broadcast_callback=subscriber
71            if subscriber is not None
72            else self.subscriber,
73            prompt_injection=prompt_injection
74            if prompt_injection is not None
75            else self.prompt_injection,
76            save_state=save_state if save_state is not None else self.save_state,
77            payload_callback=payload_callback
78            if payload_callback is not None
79            else self.payload_callback,
80        )
81
82    def __repr__(self):
83        return (
84            f"ExecutorConfig(timeout={self.timeout}, end_on_error={self.end_on_error}, "
85            f"prompt_injection={self.prompt_injection}, "
86            f"save_state={self.save_state}, payload_callback={self.payload_callback})"
87        )
ExecutorConfig( *, timeout: float | None = None, end_on_error: bool = False, broadcast_callback: Union[Callable[[str], NoneType], Callable[[str], Coroutine[NoneType, NoneType, NoneType]], NoneType] = None, prompt_injection: bool = True, save_state: bool = True, payload_callback: Optional[Callable[[dict[str, Any]], NoneType]] = None)
 9    def __init__(
10        self,
11        *,
12        timeout: float | None = None,
13        end_on_error: bool = False,
14        broadcast_callback: (
15            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
16        ) = None,
17        prompt_injection: bool = True,
18        save_state: bool = True,
19        payload_callback: Callable[[dict[str, Any]], None] | None = None,
20    ):
21        """
22        ExecutorConfig is special configuration object designed to allow customization of the executor in the RT system.
23
24        Args:
25            timeout (float | None): The maximum number of seconds to wait for a response to your top level request. Pass None (or omit) to disable the timeout entirely.
26            end_on_error (bool): If true, the executor will stop execution when an exception is encountered.
27            broadcast_callback (Callable or Coroutine): A function or coroutine that will handle streaming messages.
28            prompt_injection (bool): If true, prompts can be injected with global context
29            save_state (bool): If true, the state of the executor will be saved to disk.
30        """
31        self.timeout = timeout
32        self.end_on_error = end_on_error
33        self.subscriber = broadcast_callback
34        self.prompt_injection = prompt_injection
35        # During test runs, disable save_state by default unless RAILTRACKS_ALLOW_PERSISTENCE is set
36        self._user_save_state = save_state
37
38        self.payload_callback = payload_callback

ExecutorConfig is special configuration object designed to allow customization of the executor in the RT system.

Arguments:
  • timeout (float | None): The maximum number of seconds to wait for a response to your top level request. Pass None (or omit) to disable the timeout entirely.
  • end_on_error (bool): If true, the executor will stop execution when an exception is encountered.
  • broadcast_callback (Callable or Coroutine): A function or coroutine that will handle streaming messages.
  • prompt_injection (bool): If true, prompts can be injected with global context
  • save_state (bool): If true, the state of the executor will be saved to disk.
timeout
end_on_error
subscriber
prompt_injection
payload_callback
save_state: bool
42    @property
43    def save_state(self) -> bool:
44        if os.getenv("RAILTRACKS_TEST_MODE") and not os.getenv(
45            "RAILTRACKS_ALLOW_PERSISTENCE"
46        ):
47            return False
48        return self._user_save_state
def precedence_overwritten( self, *, timeout: float | None = None, end_on_error: bool | None = None, subscriber: Union[Callable[[str], NoneType], Callable[[str], Coroutine[NoneType, NoneType, NoneType]], NoneType] = None, prompt_injection: bool | None = None, save_state: bool | None = None, payload_callback: Optional[Callable[[dict[str, Any]], NoneType]] = None):
50    def precedence_overwritten(
51        self,
52        *,
53        timeout: float | None = None,
54        end_on_error: bool | None = None,
55        subscriber: (
56            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
57        ) = None,
58        prompt_injection: bool | None = None,
59        save_state: bool | None = None,
60        payload_callback: Callable[[dict[str, Any]], None] | None = None,
61    ):
62        """
63        If any of the parameters are provided (not None), it will create a new update the current instance with the new values and return a deep copied reference to it.
64        """
65        return ExecutorConfig(
66            timeout=timeout,
67            end_on_error=end_on_error
68            if end_on_error is not None
69            else self.end_on_error,
70            broadcast_callback=subscriber
71            if subscriber is not None
72            else self.subscriber,
73            prompt_injection=prompt_injection
74            if prompt_injection is not None
75            else self.prompt_injection,
76            save_state=save_state if save_state is not None else self.save_state,
77            payload_callback=payload_callback
78            if payload_callback is not None
79            else self.payload_callback,
80        )

If any of the parameters are provided (not None), it will create a new update the current instance with the new values and return a deep copied reference to it.

def set_config( *, timeout: float | None = None, end_on_error: bool | None = None, broadcast_callback: Union[Callable[[str], NoneType], Callable[[str], Coroutine[NoneType, NoneType, NoneType]], NoneType] = None, prompt_injection: bool | None = None, save_state: bool | None = None):
357def set_config(
358    *,
359    timeout: float | None = None,
360    end_on_error: bool | None = None,
361    broadcast_callback: (
362        Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
363    ) = None,
364    prompt_injection: bool | None = None,
365    save_state: bool | None = None,
366):
367    """
368    Sets the global configuration for the executor. This will be propagated to all new runners created after this call.
369
370    - If you call this function after the runner has been created, it will not affect the current runner.
371    - This function will only overwrite the values that are provided, leaving the rest unchanged.
372
373
374    """
375
376    if is_context_active():
377        warnings.warn(
378            "The executor config is being set after the runner has been created, this is not recommended"
379        )
380
381    config = global_executor_config.get()
382
383    new_config = config.precedence_overwritten(
384        timeout=timeout,
385        end_on_error=end_on_error,
386        subscriber=broadcast_callback,
387        prompt_injection=prompt_injection,
388        save_state=save_state,
389    )
390
391    global_executor_config.set(new_config)

Sets the global configuration for the executor. This will be propagated to all new runners created after this call.

  • If you call this function after the runner has been created, it will not affect the current runner.
  • This function will only overwrite the values that are provided, leaving the rest unchanged.
def function_node( func: Union[Callable[~_P, Union[Coroutine[NoneType, NoneType, ~_TOutput], ~_TOutput]], List[Callable[~_P, Union[Coroutine[NoneType, NoneType, ~_TOutput], ~_TOutput]]]], /, *, name: str | None = None, manifest: ToolManifest | None = None) -> Union[Callable[~_P, Union[Coroutine[NoneType, NoneType, ~_TOutput], ~_TOutput]], List[Callable[~_P, Union[Coroutine[NoneType, NoneType, ~_TOutput], ~_TOutput]]], NoneType]:
173def function_node(
174    func: Callable[_P, Coroutine[None, None, _TOutput] | _TOutput]
175    | List[Callable[_P, Coroutine[None, None, _TOutput] | _TOutput]],
176    /,
177    *,
178    name: str | None = None,
179    manifest: ToolManifest | None = None,
180) -> (
181    Callable[_P, Coroutine[None, None, _TOutput] | _TOutput]
182    | List[Callable[_P, Coroutine[None, None, _TOutput] | _TOutput]]
183    | None
184):
185    """
186    Creates a new Node type from a function that can be used in `rt.call()`.
187
188    By default, it will parse the function's docstring and turn them into tool details and parameters. However, if
189    you provide custom ToolManifest it will override that logic.
190
191    WARNING: If you overriding tool parameters. It is on you to make sure they will work with your function.
192
193    NOTE: If you have already converted this function to a node this function will do nothing
194
195    Args:
196        func (Callable): The function to convert into a Node.
197        name (str, optional): Human-readable name for the node/tool.
198        manifest (ToolManifest, optional): The details you would like to override the tool with.
199    """
200
201    # handle the case where a list of functions is provided
202    if isinstance(func, list):
203        return [function_node(f, name=name, manifest=manifest) for f in func]
204
205    # check if the function has already been converted to a node
206    if hasattr(func, "node_type"):
207        warnings.warn(
208            "The provided function has already been converted to a node.",
209            UserWarning,
210        )
211        return func
212
213    # validate_function_parameters is separated out to allow for easier testing.
214    validate_function_parameters(func, manifest)
215
216    # assign the correct node class based on whether the function is async or sync
217    if asyncio.iscoroutinefunction(func):
218        node_class = AsyncDynamicFunctionNode
219    elif inspect.isfunction(func):
220        node_class = SyncDynamicFunctionNode
221    elif inspect.isbuiltin(func):
222        # builtin functions are written in C and do not have space for the addition of metadata like our node type.
223        # so instead we wrap them in a function that allows for the addition of the node type.
224        # this logic preserved details like the function name, docstring, and signature, but allows us to add the node type.
225        func = _function_preserving_metadata(func)
226        node_class = SyncDynamicFunctionNode
227    else:
228        raise NodeCreationError(
229            message=f"The provided function is not a valid coroutine or sync function it is {type(func)}.",
230            notes=[
231                "You must provide a valid function or coroutine function to make a node.",
232            ],
233        )
234
235    # build the node using the NodeBuilder
236    builder = NodeBuilder(
237        node_class,
238        name=name if name is not None else f"{func.__name__}",
239    )
240
241    builder.setup_function_node(
242        func,
243        tool_details=manifest.description if manifest is not None else None,
244        tool_params=manifest.parameters if manifest is not None else None,
245    )
246
247    completed_node_type = builder.build()
248
249    # there is some pretty scary logic here.
250    if issubclass(completed_node_type, AsyncDynamicFunctionNode):
251        setattr(func, "node_type", completed_node_type)
252        return func
253    elif issubclass(completed_node_type, SyncDynamicFunctionNode):
254        setattr(func, "node_type", completed_node_type)
255        return func
256    else:
257        raise NodeCreationError(
258            message="The provided function did not create a valid node type.",
259            notes=[
260                "Please make a github issue with the details of what went wrong.",
261            ],
262        )

Creates a new Node type from a function that can be used in rt.call().

By default, it will parse the function's docstring and turn them into tool details and parameters. However, if you provide custom ToolManifest it will override that logic.

WARNING: If you overriding tool parameters. It is on you to make sure they will work with your function.

NOTE: If you have already converted this function to a node this function will do nothing

Arguments:
  • func (Callable): The function to convert into a Node.
  • name (str, optional): Human-readable name for the node/tool.
  • manifest (ToolManifest, optional): The details you would like to override the tool with.
def agent_node( name: str | None = None, *, rag: RagConfig | None = None, tool_nodes: Optional[Iterable[Union[Type[railtracks.nodes.nodes.Node], Callable, railtracks.built_nodes.concrete.function_base.RTFunction]]] = None, output_schema: Optional[Type[~_TBaseModel]] = None, llm: Optional[railtracks.llm.ModelBase[~_TStream]] = None, max_tool_calls: int | None = None, system_message: railtracks.llm.SystemMessage | str | None = None, manifest: ToolManifest | None = None):
129def agent_node(
130    name: str | None = None,
131    *,
132    rag: RagConfig | None = None,
133    tool_nodes: Iterable[Type[Node] | Callable | RTFunction] | None = None,
134    output_schema: Type[_TBaseModel] | None = None,
135    llm: ModelBase[_TStream] | None = None,
136    max_tool_calls: int | None = None,
137    system_message: SystemMessage | str | None = None,
138    manifest: ToolManifest | None = None,
139):
140    """
141    Dynamically creates an agent based on the provided parameters.
142
143    Args:
144        name (str | None): The name of the agent. If none the default will be used.
145        rag (RagConfig | None): If your agent is a rag agent put in the vector store it is connected to.
146        tool_nodes (set[Type[Node] | Callable | RTFunction] | None): If your agent is a LLM with access to tools, what does it have access to?
147        output_schema (Type[_TBaseModel] | None): If your agent should return a structured output, what is the output_schema?
148        llm (ModelBase): The LLM model to use. If None it will need to be passed in at instance time.
149        max_tool_calls (int | None): Maximum number of tool calls allowed (if it is a ToolCall Agent).
150        system_message (SystemMessage | str | None): System message for the agent.
151        manifest (ToolManifest | None): If you want to use this as a tool in other agents you can pass in a ToolManifest.
152    """
153    unpacked_tool_nodes: set[Type[Node]] | None = None
154    if tool_nodes is not None:
155        unpacked_tool_nodes = set()
156        for node in tool_nodes:
157            if isinstance(node, FunctionType):
158                unpacked_tool_nodes.add(extract_node_from_function(node))
159            else:
160                assert issubclass(node, Node), (
161                    f"Expected {node} to be a subclass of Node"
162                )
163                unpacked_tool_nodes.add(node)
164
165    # See issue (___) this logic should be migrated soon.
166    if manifest is not None:
167        tool_details = manifest.description
168        tool_params = manifest.parameters
169    else:
170        tool_details = None
171        tool_params = None
172
173    if unpacked_tool_nodes is not None and len(unpacked_tool_nodes) > 0:
174        if output_schema is not None:
175            agent = structured_tool_call_llm(
176                tool_nodes=unpacked_tool_nodes,
177                output_schema=output_schema,
178                name=name,
179                llm=llm,
180                max_tool_calls=max_tool_calls,
181                system_message=system_message,
182                tool_details=tool_details,
183                tool_params=tool_params,
184            )
185        else:
186            agent = tool_call_llm(
187                tool_nodes=unpacked_tool_nodes,
188                name=name,
189                llm=llm,
190                max_tool_calls=max_tool_calls,
191                system_message=system_message,
192                tool_details=tool_details,
193                tool_params=tool_params,
194            )
195    else:
196        if output_schema is not None:
197            agent = structured_llm(
198                output_schema=output_schema,
199                name=name,
200                llm=llm,
201                system_message=system_message,
202                tool_details=tool_details,
203                tool_params=tool_params,
204            )
205        else:
206            agent = terminal_llm(
207                name=name,
208                llm=llm,
209                system_message=system_message,
210                tool_details=tool_details,
211                tool_params=tool_params,
212            )
213
214    if rag is not None:
215
216        def _update_message_history(node: LLMBase):
217            node.message_hist = update_context(
218                node.message_hist, vs=rag.vector_store, top_k=rag.top_k
219            )
220            return
221
222        agent.add_pre_invoke(_update_message_history)
223
224    return agent

Dynamically creates an agent based on the provided parameters.

Arguments:
  • name (str | None): The name of the agent. If none the default will be used.
  • rag (RagConfig | None): If your agent is a rag agent put in the vector store it is connected to.
  • tool_nodes (set[Type[Node] | Callable | RTFunction] | None): If your agent is a LLM with access to tools, what does it have access to?
  • output_schema (Type[_TBaseModel] | None): If your agent should return a structured output, what is the output_schema?
  • llm (ModelBase): The LLM model to use. If None it will need to be passed in at instance time.
  • max_tool_calls (int | None): Maximum number of tool calls allowed (if it is a ToolCall Agent).
  • system_message (SystemMessage | str | None): System message for the agent.
  • manifest (ToolManifest | None): If you want to use this as a tool in other agents you can pass in a ToolManifest.
class MCPStdioParams(mcp.client.stdio.StdioServerParameters):
19class MCPStdioParams(StdioServerParameters):
20    """
21    Configuration parameters for STDIO-based MCP server connections.
22
23    Extends the standard StdioServerParameters with a timeout field.
24
25    Attributes:
26        timeout: Maximum time to wait for operations (default: 30 seconds)
27    """
28
29    timeout: timedelta = timedelta(seconds=30)
30
31    def as_stdio_params(self) -> StdioServerParameters:
32        """
33        Convert to standard StdioServerParameters, excluding the timeout field.
34
35        Returns:
36            StdioServerParameters without the timeout attribute
37        """
38        stdio_kwargs = self.dict(exclude={"timeout"})
39        return StdioServerParameters(**stdio_kwargs)

Configuration parameters for STDIO-based MCP server connections.

Extends the standard StdioServerParameters with a timeout field.

Attributes:
  • timeout: Maximum time to wait for operations (default: 30 seconds)
timeout: datetime.timedelta
def as_stdio_params(self) -> mcp.client.stdio.StdioServerParameters:
31    def as_stdio_params(self) -> StdioServerParameters:
32        """
33        Convert to standard StdioServerParameters, excluding the timeout field.
34
35        Returns:
36            StdioServerParameters without the timeout attribute
37        """
38        stdio_kwargs = self.dict(exclude={"timeout"})
39        return StdioServerParameters(**stdio_kwargs)

Convert to standard StdioServerParameters, excluding the timeout field.

Returns:

StdioServerParameters without the timeout attribute

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class MCPHttpParams(pydantic.main.BaseModel):
42class MCPHttpParams(BaseModel):
43    """
44    Configuration parameters for HTTP-based MCP server connections.
45
46    Supports both SSE (Server-Sent Events) and streamable HTTP transports.
47    The transport type is automatically determined based on the URL.
48
49    Attributes:
50        url: The MCP server URL (use /sse suffix for SSE transport)
51        headers: Optional HTTP headers for authentication
52        timeout: Connection timeout (default: 30 seconds)
53        sse_read_timeout: SSE read timeout (default: 5 minutes)
54        terminate_on_close: Whether to terminate connection on close (default: True)
55    """
56
57    url: str
58    headers: dict[str, Any] | None = None
59    timeout: timedelta = timedelta(seconds=30)
60    sse_read_timeout: timedelta = timedelta(seconds=60 * 5)
61    terminate_on_close: bool = True

Configuration parameters for HTTP-based MCP server connections.

Supports both SSE (Server-Sent Events) and streamable HTTP transports. The transport type is automatically determined based on the URL.

Attributes:
  • url: The MCP server URL (use /sse suffix for SSE transport)
  • headers: Optional HTTP headers for authentication
  • timeout: Connection timeout (default: 30 seconds)
  • sse_read_timeout: SSE read timeout (default: 5 minutes)
  • terminate_on_close: Whether to terminate connection on close (default: True)
url: str
headers: dict[str, typing.Any] | None
timeout: datetime.timedelta
sse_read_timeout: datetime.timedelta
terminate_on_close: bool
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def connect_mcp( config: MCPStdioParams | MCPHttpParams, client_session: mcp.client.session.ClientSession | None = None, setup_timeout: float = 30) -> railtracks.rt_mcp.main.MCPServer:
 8def connect_mcp(
 9    config: MCPStdioParams | MCPHttpParams,
10    client_session: ClientSession | None = None,
11    setup_timeout: float = 30,
12) -> MCPServer:
13    """
14    Connect to an MCP server and return a server instance with available tools.
15
16    This is the primary entry point for using MCP servers in Railtracks.
17    The server will connect in the background, discover available tools,
18    and convert them to Railtracks Node classes.
19
20    The connection remains active until explicitly closed or the context exits.
21
22    Usage Examples:
23        # STDIO connection (local MCP server)
24        config = rt.MCPStdioParams(
25            command="uvx",
26            args=["mcp-server-time"]
27        )
28        server = rt.connect_mcp(config)
29
30        # HTTP connection (remote MCP server)
31        config = rt.MCPHttpParams(
32            url="https://mcp.example.com/sse",
33            headers={"Authorization": "Bearer token"}
34        )
35        server = rt.connect_mcp(config)
36
37        # Context manager (recommended)
38        with rt.connect_mcp(config) as server:
39            tools = server.tools
40            # Use tools...
41        # Automatically closed
42
43        # Access tools
44        for tool in server.tools:
45            print(f"Tool: {tool.name()}")
46            print(f"Description: {tool.tool_info().description}")
47
48    Args:
49        config: Server configuration:
50            - MCPStdioParams: For local servers via stdin/stdout
51            - MCPHttpParams: For remote servers via HTTP/SSE
52        client_session: Optional pre-configured ClientSession for advanced use cases.
53                       If not provided, a new session will be created automatically.
54        setup_timeout: Maximum seconds to wait for connection (default: 30).
55                      Increase for slow servers or complex authentication flows.
56
57    Returns:
58        MCPServer: Connected server instance with:
59            - tools: List of Node classes representing MCP tools
60            - close(): Method to explicitly close the connection
61            - Context manager support for automatic cleanup
62
63    Raises:
64        FileNotFoundError: If STDIO command not found. Verify the command is:
65                          - Installed and in your PATH
66                          - Spelled correctly (check for typos)
67                          - Executable (check permissions on Unix)
68        ConnectionError: If connection to server fails. Check:
69                        - Server URL is correct and accessible
70                        - Network connectivity and firewall settings
71                        - Authentication credentials are valid
72                        - Server is running and accepting connections
73        TimeoutError: If connection exceeds setup_timeout. Try:
74                     - Increasing setup_timeout parameter
75                     - Checking server performance/load
76                     - Verifying server is responding
77        RuntimeError: For other setup failures (e.g., protocol errors, config issues)
78
79    Note:
80        - The connection runs in a background thread for sync/async bridging
81        - Tools are cached after first retrieval for performance
82        - Always close() the server when done or use context manager
83        - Jupyter compatibility patches are applied automatically
84    """
85    # Apply Jupyter compatibility patches if needed
86    apply_patches()
87
88    return MCPServer(
89        config=config, client_session=client_session, setup_timeout=setup_timeout
90    )

Connect to an MCP server and return a server instance with available tools.

This is the primary entry point for using MCP servers in Railtracks. The server will connect in the background, discover available tools, and convert them to Railtracks Node classes.

The connection remains active until explicitly closed or the context exits.

Usage Examples:

STDIO connection (local MCP server)

config = rt.MCPStdioParams( command="uvx", args=["mcp-server-time"] ) server = rt.connect_mcp(config)

HTTP connection (remote MCP server)

config = rt.MCPHttpParams( url="https://mcp.example.com/sse", headers={"Authorization": "Bearer token"} ) server = rt.connect_mcp(config)

Context manager (recommended)

with rt.connect_mcp(config) as server: tools = server.tools # Use tools...

Automatically closed

Access tools

for tool in server.tools: print(f"Tool: {tool.name()}") print(f"Description: {tool.tool_info().description}")

Arguments:
  • config: Server configuration:
    • MCPStdioParams: For local servers via stdin/stdout
    • MCPHttpParams: For remote servers via HTTP/SSE
  • client_session: Optional pre-configured ClientSession for advanced use cases. If not provided, a new session will be created automatically.
  • setup_timeout: Maximum seconds to wait for connection (default: 30). Increase for slow servers or complex authentication flows.
Returns:

MCPServer: Connected server instance with: - tools: List of Node classes representing MCP tools - close(): Method to explicitly close the connection - Context manager support for automatic cleanup

Raises:
  • FileNotFoundError: If STDIO command not found. Verify the command is:
    • Installed and in your PATH
    • Spelled correctly (check for typos)
    • Executable (check permissions on Unix)
  • ConnectionError: If connection to server fails. Check:
    • Server URL is correct and accessible
    • Network connectivity and firewall settings
    • Authentication credentials are valid
    • Server is running and accepting connections
  • TimeoutError: If connection exceeds setup_timeout. Try:
    • Increasing setup_timeout parameter
    • Checking server performance/load
    • Verifying server is responding
  • RuntimeError: For other setup failures (e.g., protocol errors, config issues)
Note:
  • The connection runs in a background thread for sync/async bridging
  • Tools are cached after first retrieval for performance
  • Always close() the server when done or use context manager
  • Jupyter compatibility patches are applied automatically
def create_mcp_server( nodes: List[railtracks.nodes.nodes.Node | railtracks.built_nodes.concrete.function_base.RTFunction], server_name: str = 'MCP Server', fastmcp: mcp.server.fastmcp.server.FastMCP | None = None):
 86def create_mcp_server(
 87    nodes: List[Node | RTFunction],
 88    server_name: str = "MCP Server",
 89    fastmcp: FastMCP | None = None,
 90):
 91    """
 92    Create a FastMCP server that can be used to run nodes as MCP tools.
 93
 94    Args:
 95        nodes: List of Node classes to be registered as tools with the MCP server.
 96        server_name: Name of the MCP server instance.
 97        fastmcp: Optional FastMCP instance to use instead of creating a new one.
 98
 99    Returns:
100        A FastMCP server instance.
101    """
102    if fastmcp is not None:
103        if not isinstance(fastmcp, FastMCP):
104            raise ValueError("Provided fastmcp must be an instance of FastMCP.")
105        mcp = fastmcp
106    else:
107        mcp = FastMCP(server_name)
108
109    for node in [n if not hasattr(n, "node_type") else n.node_type for n in nodes]:
110        node_info = node.tool_info()
111        func = _create_tool_function(node, node_info)
112
113        mcp._tool_manager._tools[node_info.name] = MCPTool(
114            fn=func,
115            name=node_info.name,
116            description=node_info.detail,
117            parameters=(
118                _parameters_to_json_schema(node_info.parameters)
119                if node_info.parameters is not None
120                else {}
121            ),
122            fn_metadata=func_metadata(func, []),
123            is_async=True,
124            context_kwarg=None,
125            annotations=None,
126        )  # Register the node as a tool
127
128    return mcp

Create a FastMCP server that can be used to run nodes as MCP tools.

Arguments:
  • nodes: List of Node classes to be registered as tools with the MCP server.
  • server_name: Name of the MCP server instance.
  • fastmcp: Optional FastMCP instance to use instead of creating a new one.
Returns:

A FastMCP server instance.

class ToolManifest:
 7class ToolManifest:
 8    """
 9    Creates a manifest for a tool, which includes its description and parameters.
10
11    Args:
12        description (str): A description of the tool.
13        parameters (Iterable[Parameter] | None): An iterable of parameters for the tool. If None, there are no paramerters.
14    """
15
16    def __init__(
17        self,
18        description: str,
19        parameters: Iterable[Parameter] | None = None,
20    ):
21        self.description = description
22        self.parameters: List[Parameter] = (
23            list(parameters) if parameters is not None else []
24        )

Creates a manifest for a tool, which includes its description and parameters.

Arguments:
  • description (str): A description of the tool.
  • parameters (Iterable[Parameter] | None): An iterable of parameters for the tool. If None, there are no paramerters.
ToolManifest( description: str, parameters: Optional[Iterable[railtracks.llm.Parameter]] = None)
16    def __init__(
17        self,
18        description: str,
19        parameters: Iterable[Parameter] | None = None,
20    ):
21        self.description = description
22        self.parameters: List[Parameter] = (
23            list(parameters) if parameters is not None else []
24        )
description
parameters: List[railtracks.llm.Parameter]
def session_id():
417def session_id():
418    """
419    Gets the current session ID if it exists, otherwise returns None.
420    """
421    try:
422        return get_session_id()
423    except ContextError:
424        return None

Gets the current session ID if it exists, otherwise returns None.

class RagConfig:
 8class RagConfig:
 9    """
10    Configuration object for Retrieval-Augmented Generation (RAG).
11    """
12
13    def __init__(self, vector_store: VectorStore, top_k: int = 3) -> None:
14        self.vector_store = vector_store
15        self.top_k = top_k

Configuration object for Retrieval-Augmented Generation (RAG).

RagConfig( vector_store: railtracks.vector_stores.vector_store_base.VectorStore, top_k: int = 3)
13    def __init__(self, vector_store: VectorStore, top_k: int = 3) -> None:
14        self.vector_store = vector_store
15        self.top_k = top_k
vector_store
top_k
class Flow(typing.Generic[~_P, ~_TOutput]):
 23class Flow(Generic[_P, _TOutput]):
 24    """
 25    Initializes a Flow object with a provided entry point and a unique name.
 26
 27    A flow object is the configuration where you can run your agent with different input arguments.
 28
 29    Args:
 30        name (str): A unique name for the flow. This is used for logging and state management.
 31        entry_point (Callable | RTSyncFunction | RTAsyncFunction): The starting point of your flow.
 32        context (dict[str, Any], optional): Context to be passed to all instantiations (or runs) of this flow. Note that the context can be overridden at invocation time.
 33        timeout (float, optional): The maximum number of seconds to wait for a response to your top-level request.
 34        end_on_error (bool, optional): If True, the execution will stop when an exception is encountered.
 35        broadcast_callback (Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None, optional): A callback function that will be called with the broadcast messages.
 36        prompt_injection (bool, optional): If True, the prompt will be automatically injected from context variables.
 37        save_state (bool, optional): If True, the state of the execution will be saved to a file at the end of the run in the `.railtracks/data/sessions/` directory.
 38        payload_callback (Callable[[dict[str, Any]], None], optional): A callback function that will run upon completion of the flow with the final payload as an argument.
 39    """
 40
 41    def __init__(
 42        self,
 43        name: str,
 44        entry_point: (
 45            Callable[_P, Node[_TOutput]]
 46            | RTSyncFunction[_P, _TOutput]
 47            | RTAsyncFunction[_P, _TOutput]
 48        ),
 49        *,
 50        context: dict[str, Any] | None = None,
 51        timeout: float | None = None,
 52        end_on_error: bool | None = None,
 53        broadcast_callback: (
 54            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
 55        ) = None,
 56        prompt_injection: bool | None = None,
 57        save_state: bool | None = None,
 58        payload_callback: Callable[[dict[str, Any]], None] | None = None,
 59    ) -> None:
 60        self.entry_point: Callable[_P, Node[_TOutput]]
 61
 62        if hasattr(entry_point, "node_type"):
 63            self.entry_point = entry_point.node_type
 64        else:
 65            self.entry_point = entry_point
 66
 67        self.name = name
 68        self._context: dict[str, Any] = context or {}
 69        self._timeout = timeout
 70        self._end_on_error = end_on_error
 71        self._broadcast_callback = broadcast_callback
 72        self._prompt_injection = prompt_injection
 73        self._save_state = save_state
 74        self._payload_callback = payload_callback
 75
 76    def update_context(self, context: dict[str, Any]) -> Flow[_P, _TOutput]:
 77        """
 78        Creates a new Flow with the updated context. Note this will include the previous context values.
 79        """
 80        new_obj = deepcopy(self)
 81        new_obj._context.update(context)
 82        return new_obj
 83
 84    async def ainvoke(self, *args: _P.args, **kwargs: _P.kwargs) -> _TOutput:
 85        with Session(
 86            context=deepcopy(self._context),
 87            flow_name=self.name,
 88            flow_id=self.equality_hash(),
 89            name=None,
 90            timeout=self._timeout,
 91            end_on_error=self._end_on_error,
 92            broadcast_callback=self._broadcast_callback,
 93            prompt_injection=self._prompt_injection,
 94            save_state=self._save_state,
 95            payload_callback=self._payload_callback,
 96        ):
 97            result = await call(self.entry_point, *args, **kwargs)
 98
 99        return result
100
101    def invoke(self, *args: _P.args, **kwargs: _P.kwargs) -> _TOutput:
102        try:
103            return asyncio.run(self.ainvoke(*args, **kwargs))
104
105        except RuntimeError:
106            raise RuntimeError(
107                "Cannot invoke flow synchronously within an active event loop. Use 'ainvoke' instead."
108            )
109
110    def equality_hash(self) -> str:
111        """
112        Generates a hash based on the flow's configuration for equality checks.
113        """
114        config_string = json.dumps(self._get_hash_content(), sort_keys=True)
115        return hashlib.sha256(config_string.encode()).hexdigest()
116
117    def _get_hash_content(self) -> dict:
118        return {
119            "name": self.name,
120        }

Initializes a Flow object with a provided entry point and a unique name.

A flow object is the configuration where you can run your agent with different input arguments.

Arguments:
  • name (str): A unique name for the flow. This is used for logging and state management.
  • entry_point (Callable | RTSyncFunction | RTAsyncFunction): The starting point of your flow.
  • context (dict[str, Any], optional): Context to be passed to all instantiations (or runs) of this flow. Note that the context can be overridden at invocation time.
  • timeout (float, optional): The maximum number of seconds to wait for a response to your top-level request.
  • end_on_error (bool, optional): If True, the execution will stop when an exception is encountered.
  • broadcast_callback (Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None, optional): A callback function that will be called with the broadcast messages.
  • prompt_injection (bool, optional): If True, the prompt will be automatically injected from context variables.
  • save_state (bool, optional): If True, the state of the execution will be saved to a file at the end of the run in the .railtracks/data/sessions/ directory.
  • payload_callback (Callable[[dict[str, Any]], None], optional): A callback function that will run upon completion of the flow with the final payload as an argument.
Flow( name: str, entry_point: Union[Callable[~_P, railtracks.nodes.nodes.Node[~_TOutput]], railtracks.built_nodes.concrete.function_base.RTSyncFunction[~_P, ~_TOutput], railtracks.built_nodes.concrete.function_base.RTAsyncFunction[~_P, ~_TOutput]], *, context: dict[str, typing.Any] | None = None, timeout: float | None = None, end_on_error: bool | None = None, broadcast_callback: Union[Callable[[str], NoneType], Callable[[str], Coroutine[NoneType, NoneType, NoneType]], NoneType] = None, prompt_injection: bool | None = None, save_state: bool | None = None, payload_callback: Optional[Callable[[dict[str, Any]], NoneType]] = None)
41    def __init__(
42        self,
43        name: str,
44        entry_point: (
45            Callable[_P, Node[_TOutput]]
46            | RTSyncFunction[_P, _TOutput]
47            | RTAsyncFunction[_P, _TOutput]
48        ),
49        *,
50        context: dict[str, Any] | None = None,
51        timeout: float | None = None,
52        end_on_error: bool | None = None,
53        broadcast_callback: (
54            Callable[[str], None] | Callable[[str], Coroutine[None, None, None]] | None
55        ) = None,
56        prompt_injection: bool | None = None,
57        save_state: bool | None = None,
58        payload_callback: Callable[[dict[str, Any]], None] | None = None,
59    ) -> None:
60        self.entry_point: Callable[_P, Node[_TOutput]]
61
62        if hasattr(entry_point, "node_type"):
63            self.entry_point = entry_point.node_type
64        else:
65            self.entry_point = entry_point
66
67        self.name = name
68        self._context: dict[str, Any] = context or {}
69        self._timeout = timeout
70        self._end_on_error = end_on_error
71        self._broadcast_callback = broadcast_callback
72        self._prompt_injection = prompt_injection
73        self._save_state = save_state
74        self._payload_callback = payload_callback
entry_point: Callable[~_P, railtracks.nodes.nodes.Node[~_TOutput]]
name
def update_context( self, context: dict[str, typing.Any]) -> Flow[~_P, ~_TOutput]:
76    def update_context(self, context: dict[str, Any]) -> Flow[_P, _TOutput]:
77        """
78        Creates a new Flow with the updated context. Note this will include the previous context values.
79        """
80        new_obj = deepcopy(self)
81        new_obj._context.update(context)
82        return new_obj

Creates a new Flow with the updated context. Note this will include the previous context values.

async def ainvoke(self, *args: _P.args, **kwargs: _P.kwargs) -> ~_TOutput:
84    async def ainvoke(self, *args: _P.args, **kwargs: _P.kwargs) -> _TOutput:
85        with Session(
86            context=deepcopy(self._context),
87            flow_name=self.name,
88            flow_id=self.equality_hash(),
89            name=None,
90            timeout=self._timeout,
91            end_on_error=self._end_on_error,
92            broadcast_callback=self._broadcast_callback,
93            prompt_injection=self._prompt_injection,
94            save_state=self._save_state,
95            payload_callback=self._payload_callback,
96        ):
97            result = await call(self.entry_point, *args, **kwargs)
98
99        return result
def invoke(self, *args: _P.args, **kwargs: _P.kwargs) -> ~_TOutput:
101    def invoke(self, *args: _P.args, **kwargs: _P.kwargs) -> _TOutput:
102        try:
103            return asyncio.run(self.ainvoke(*args, **kwargs))
104
105        except RuntimeError:
106            raise RuntimeError(
107                "Cannot invoke flow synchronously within an active event loop. Use 'ainvoke' instead."
108            )
def equality_hash(self) -> str:
110    def equality_hash(self) -> str:
111        """
112        Generates a hash based on the flow's configuration for equality checks.
113        """
114        config_string = json.dumps(self._get_hash_content(), sort_keys=True)
115        return hashlib.sha256(config_string.encode()).hexdigest()

Generates a hash based on the flow's configuration for equality checks.

def enable_logging( level: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', 'NONE'] = 'INFO', log_file: str | os.PathLike | None = None) -> None:
281def enable_logging(
282    level: AllowableLogLevels = "INFO",
283    log_file: str | os.PathLike | None = None,
284) -> None:
285    """
286    Opt-in helper to enable Railtracks logging. Call this explicitly from your
287    application entry point (CLI, main.py, server startup); the library never
288    calls it automatically.
289
290    Uses the given level and log_file; when None, reads RT_LOG_LEVEL and
291    RT_LOG_FILE from the environment. Sets up console output (and optional file)
292    with a ThreadAwareFilter for per-thread level control.
293
294    Args:
295        level: Logging level (default "INFO"). Overridden by RT_LOG_LEVEL when None.
296        log_file: Optional path for a log file. Overridden by RT_LOG_FILE when None.
297    """
298    initialize_module_logging(level=level, log_file=log_file)

Opt-in helper to enable Railtracks logging. Call this explicitly from your application entry point (CLI, main.py, server startup); the library never calls it automatically.

Uses the given level and log_file; when None, reads RT_LOG_LEVEL and RT_LOG_FILE from the environment. Sets up console output (and optional file) with a ThreadAwareFilter for per-thread level control.

Arguments:
  • level: Logging level (default "INFO"). Overridden by RT_LOG_LEVEL when None.
  • log_file: Optional path for a log file. Overridden by RT_LOG_FILE when None.