Skip to content

Session Management

Warning

This state management approach was deprecated in a recent release. Please see Flows

Sessions in Railtracks manage the execution environment for your flows. The recommended approach is using the @rt.session decorator for clean, automatic session management.

The @rt.session Decorator

The decorator automatically wraps your top level async functions with a Railtracks session:

@rt.function_node
async def greet(name: str) -> str:
    return f"Hello, {name}!"


flow = rt.Flow("greet-flow", entry_point=greet)
result = flow.invoke(name="Alice")
print(result)  # "Hello, Alice!"

Configuring Your Session

The decorator supports all session configuration options:

@rt.function_node
async def greet_multiple(names: list[str]):
    results = []
    for name in names:
        result = await rt.call(greet, name=name)
        results.append(result)
    return results

greeting_multiple_flow = rt.Flow(
    "greet-multiple-flow",
    entry_point=greet_multiple,
    timeout=30,  # 30 second timeout
    context={"user_id": "123"},  # Global context variables
    save_state=True,  # Save execution state to file
)

multiple_greeting_response = greeting_multiple_flow.invoke(names=["Bob", "Charlie"])
print(multiple_greeting_response)  # ['Hello, Bob!', 'Hello, Charlie!']

Multiple Workflows

Each decorated function gets its own isolated session:

@rt.function_node
async def farewell(name: str) -> str:
    return f"Bye, {name}!"

@rt.function_node
async def conditional_greet():
    if rt.context.get("action") == "greet":
        return await rt.call(greet, rt.context.get("name"))

@rt.function_node
async def conditional_farewell():
    if rt.context.get("action") == "farewell":
        return await rt.call(farewell, rt.context.get("name"))

# Create independent flows
first_flow = rt.Flow(
    "greet-flow",
    entry_point=conditional_greet,
    context={"action": "greet", "name": "Diana"},
)

second_flow = rt.Flow(
    "farewell-flow",
    entry_point=conditional_farewell,
    context={"action": "farewell", "name": "Robert"},
)

# Run independently
result1 = first_flow.invoke()
result2 = second_flow.invoke()
print(result1)  # "Hello, Diana!"
print(result2)  # "Bye, Robert!"

Important Notes

  • Async Only: The @rt.session decorator only works with async functions. Using it on sync functions raises a TypeError
  • Automatic Cleanup: Sessions automatically clean up resources when functions complete
  • Unique Identifiers: Each session gets a unique identifier for tracking and debugging. If you do use the same identifier in different flows, their unique saved states will overwrite with the warning: RT.Session : WARNING - File .railtracks/my-unique-run.json already exists, overwriting....
Session Context Manager

When to Use Context Managers

For more complex scenarios or when you need fine-grained control, use the context manager approach:

# Flow configuration approach (replaces session context manager)
second_flow = rt.Flow(
    "greet-multiple-flow",
    entry_point=greet_multiple,
    timeout=30,  # 30 second timeout
    context={"user_id": "123"},  # Global context variables
    save_state=True,  # Save execution state to file
)

result = second_flow.invoke(names =["Bob", "Charlie"])
print(result)  # ['Hello, Bob!', 'Hello, Charlie!']
More Examples

Error Handling

sample_node_flow = rt.Flow("sample-flow", entry_point=sample_node, end_on_error=True)
try:
    result = sample_node_flow.invoke()
except Exception as e:
    print(f"Flow failed: {e}")

API Workflows

sample_node_flow = rt.Flow("api-flow", entry_point=sample_node, context={"api_key": "secret", "region": "us-west"})
# Context variables are available to all nodes
result = sample_node_flow.invoke()

Tracked Execution

sample_node_flow = rt.Flow("daily-report-v1", entry_point=sample_node, save_state=True)
# Execution state saved to .railtracks/daily-report-v1.json
result = sample_node_flow.invoke()