Extending the HIL Abstract Class
The Human-in-the-Loop (HIL) interface allows you to create custom communication channels between your Railtracks agents and users. This guide shows you how to implement your own HIL interface by extending the HIL
abstract class.
Overview
The HIL
abstract class defines a contract for bidirectional communication with users. Any implementation must provide four key methods:
connect()
- Initialize the communication channeldisconnect()
- Clean up resourcessend_message()
- Send messages to the userreceive_message()
- Receive input from the user
The HIL Interface: human_in_the_loop.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict
@dataclass
class HILMessage:
content: str
metadata: Dict[str, Any] | None = None
class HIL(ABC):
@abstractmethod
async def connect(self) -> None:
"""
Creates or initializes the user interface component.
"""
pass
@abstractmethod
async def disconnect(self) -> None:
"""
Disconnects the user interface component.
"""
pass
@abstractmethod
async def send_message(
self, content: HILMessage, timeout: float | None = None
) -> bool:
"""
HIL uses this function to send a message to the user through the interface.
Args:
content: The message content to send.
timeout: The maximum time in seconds to wait for the message to be sent.
Returns:
True if the message was sent successfully, False otherwise.
"""
pass
@abstractmethod
async def receive_message(self, timeout: float | None = None) -> HILMessage | None:
"""
HIL uses this function to wait for the user to provide input.
This method should block until input is received or the timeout is reached.
Args:
timeout: The maximum time in seconds to wait for input.
Returns:
The user input if received within the timeout period, None otherwise.
"""
pass
Implementation Guide
Basic Structure
Create a class that inherits from HIL
and initialize the necessary state for your communication channel. At minimum, you'll need to track connection status and set up mechanisms for bidirectional communication (such as queues or event handlers).
Below are some of our suggestions for such implementation, however, your way of defining it is completely up to you and your system design choices.
Suggested Steps
1. Implement connect()
The connect()
method initializes all resources needed for communication with users. This is where you:
- Start any servers or services (web servers, WebSocket connections, messaging clients)
- Initialize communication channels
- Set up authentication or session management if needed
- Update the connection state to indicate the channel is ready
Key considerations:
- Track connection state explicitly
- Raise appropriate exceptions if initialization fails
- Make the method safe to call multiple times if possible
2. Implement disconnect()
The disconnect()
method performs cleanup of all resources. This should:
- Update connection state immediately
- Close servers, connections, or file handles
- Cancel any running background tasks
- Clean up gracefully even if resources weren't fully initialized
Key considerations:
- Set connection state to
False
at the start - Don't raise exceptions during cleanup - log errors instead
- Make it safe to call multiple times
3. Implement send_message()
This method sends messages from your agent to the user through your communication channel. It should:
- Verify the connection is active before attempting to send
- Format the message appropriately for your channel
- Transmit the message through your communication mechanism
- Return
True
on success,False
on any failure - Respect the timeout parameter if provided
Key considerations:
- Always check connection state first
- Return
False
rather than raising exceptions on failure - Handle timeouts and queue full conditions gracefully
- Log warnings or errors for debugging
4. Implement receive_message()
This method waits for and receives input from the user. It should:
- Verify the connection is active
- Wait for user input through your communication channel
- Handle shutdown events to allow clean termination
- Return a
HILMessage
when input is received - Return
None
on timeout, disconnection, or shutdown
Key considerations:
- Return
None
for timeout, disconnection, or shutdown scenarios - Handle multiple concurrent events (input arrival and shutdown signals)
- Always cancel pending tasks to prevent resource leaks
- Respect timeout parameters and handle timeout exceptions
Reference Implementation
For a complete example, see the ChatUI
class in local_chat_ui.py
The ChatUI
implementation demonstrates:
- FastAPI server with SSE for real-time updates
- Proper queue management with size limits
- Clean shutdown handling
- Static file serving for the UI
- Tool invocation updates (additional feature)
- Port availability checking
- Browser auto-opening
Common Pitfalls
1. Blocking Operations
Don't block the event loop:
async def receive_message(self, timeout=None):
return input("Enter message: ") # WRONG: Blocks event loop
Use asyncio.to_thread for blocking I/O:
async def receive_message(self, timeout=None):
return await asyncio.to_thread(input, "Enter message: ")
2. Not Handling Disconnection
Don't forget to check connection state:
async def send_message(self, content, timeout=None):
await self.queue.put(content) # May fail if disconnected
return True
Always check first:
async def send_message(self, content, timeout=None):
if not self.is_connected:
return False
await self.queue.put(content)
return True
3. Not Canceling Tasks
Don't leave tasks running:
async def receive_message(self, timeout=None):
task1 = asyncio.create_task(self.queue.get())
task2 = asyncio.create_task(self.event.wait())
done, pending = await asyncio.wait([task1, task2], ...)
return done.pop().result() # Pending tasks still running!
Always cancel pending tasks:
Last step: Updating the interactive
method
Currently, the interactive
method only supports ChatUI
implementation, but you could easily modify it, append to it, or completely write new logic to work with your specific child class of HIL
.
if not issubclass(node, LLMBase):
raise ValueError(
"Interactive sessions only support nodes that are children of LLMBase."
)
response = None
try:
logger.info("Connecting with Local Chat Session")
chat_ui = ChatUI(**chat_ui_kwargs)
await chat_ui.connect()
response = await _chat_ui_interactive(
chat_ui,
node,
initial_message_to_user,
initial_message_to_agent,
turns,
*args,
**kwargs,
)
except Exception as e:
logger.error(f"Error during interactive session: {e}")
finally:
return response # type: ignore
Share your work
We're excited to see what implemenations you come up with and welcome incorporating your suggested changes or new implementations to the framework!
Next Steps
- See Local Chat UI for documentation on using the built-in ChatUI
- Check the Human-in-the-Loop Overview for integration patterns