Skip to content

Session Management

Sessions in RailTracks manage the execution environment for your flows. The recommended approach is using the @rt.session decorator for clean, automatic session management.

The @rt.session Decorator

The decorator automatically wraps your top level async functions with a RailTracks session:

@rt.function_node
async def greet(name: str) -> str:
    return f"Hello, {name}!"


@rt.session()
async def empty_workflow():
    result = await rt.call(greet, name="Alice")
    return result


# Run your workflow
result, session = asyncio.run(empty_workflow())
print(result)  # "Hello, Alice!"
print(f"Session ID: {session._identifier}")

Configuring Your Session

The decorator supports all session configuration options:

@rt.session(
    timeout=30,  # 30 second timeout
    context={"user_id": "123"},  # Global context variables
    logging_setting="QUIET",  # Enable debug logging
    save_state=True,  # Save execution state to file
    name="my-unique-run",  # Custom session name
)
async def configured_workflow():
    result1 = await rt.call(greet, name="Bob")
    result2 = await rt.call(greet, name="Charlie")
    return [result1, result2]


result, session = asyncio.run(configured_workflow())
print(result)  # ['Hello, Bob!', 'Hello, Charlie!']
print(f"Session ID: {session._identifier}")

Multiple Workflows

Each decorated function gets its own isolated session:

@rt.function_node
async def farewell(name: str) -> str:
    return f"Bye, {name}!"


@rt.session(context={"action": "greet", "name": "Diana"}, logging_setting="QUIET")
async def first_workflow():
    if rt.context.get("action") == "greet":
        return await rt.call(greet, rt.context.get("name"))


@rt.session(context={"action": "farewell", "name": "Robert"}, logging_setting="QUIET")
async def second_workflow():
    if rt.context.get("action") == "farewell":
        return await rt.call(farewell, rt.context.get("name"))


# Run independently
result1, session1 = asyncio.run(first_workflow())
result2, _ = asyncio.run(second_workflow()) # if we don't want to work with Session Object
print(result1)  # "Hello, Diana!"
print(result2)  # "Bye, Robert!"

Important Notes

  • Async Only: The @rt.session decorator only works with async functions. Using it on sync functions raises a TypeError
  • Automatic Cleanup: Sessions automatically clean up resources when functions complete
  • Unique Identifiers: Each session gets a unique identifier for tracking and debugging. If you do use the same identifier in different flows, their unique saved states will overwrite with the warning: RT.Session : WARNING - File .railtracks/my-unique-run.json already exists, overwriting....
Session Context Manager

When to Use Context Managers

For more complex scenarios or when you need fine-grained control, use the context manager approach:

async def context_workflow():
    with rt.Session(
        timeout=30,  # 30 second timeout
        context={"user_id": "123"},  # Global context variables
        logging_setting="QUIET",  # Enable debug logging
        save_state=True,  # Save execution state to file
        name="my-unique-run",  # Custom session name
    ):
        result1 = await rt.call(greet, name="Bob")
        result2 = await rt.call(greet, name="Charlie")
        return [result1, result2]


result = asyncio.run(context_workflow())
print(result)  # ['Hello, Bob!', 'Hello, Charlie!']
More Examples

Error Handling

@rt.session(end_on_error=True)
async def safe_workflow():
    try:
        return await rt.call(sample_node)
    except Exception as e:
        print(f"Workflow failed: {e}")
        return None

API Workflows

@rt.session(context={"api_key": "secret", "region": "us-west"})
async def api_workflow():
    # Context variables are available to all nodes
    result = await rt.call(sample_node)
    return result

Tracked Execution

@rt.session(save_state=True, name="daily-report-v1")
async def daily_report():
    # Execution state saved to .railtracks/daily-report-v1.json
    return await rt.call(sample_node)