Skip to content

Utils

This module provides a FastAPI server for managing commands and replies in a distributed system.

ATTRIBUTE DESCRIPTION
CONNECTION_ERRORS

Tuple of exceptions that indicate connection errors.

TYPE: tuple

CLASS DESCRIPTION
FastAPIWorkerClient

Client for managing worker connections to the FastAPI server.

FastAPIUserClient

Client for managing user connections to the FastAPI server.

FUNCTION DESCRIPTION
create_fastapi_user

Create a FastAPI client instance for user interaction.

create_fastapi_worker

Create a FastAPI client instance for worker interaction.

Documentation last updated: 2025-06-11

FastAPIUserClient

FastAPIUserClient(host: str, port: int = 8000)

FastAPIUserClient is a singleton class that manages user connections to a FastAPI server. It allows users to join a hub, send commands, and receive replies from the hub. It maintains a registry of users and their associated request IDs, allowing for communication between users and the hub.

ATTRIBUTE DESCRIPTION
url

The URL of the FastAPI server.

TYPE: str

users

A dictionary mapping user addresses to user controllers.

TYPE: dict[str, Controller]

request_ids

A dictionary mapping request IDs to user controllers.

TYPE: dict[str, Controller]

METHOD DESCRIPTION
join_hub

Join a hub with the user controller.

send_command

Send a command to the hub.

get_reply

Get a reply from the hub based on a request ID.

Initialize the FastAPIUserClient with the host and port.

PARAMETER DESCRIPTION
host

The host address for the FastAPI server.

TYPE: str

port

The port number for the FastAPI server, defaults to 8000.

TYPE: int DEFAULT: 8000

get_reply staticmethod

get_reply(
    request_id: str,
    url: str,
    terminate: Event | None = None,
) -> dict[str, Any]

Get a reply from the hub.

PARAMETER DESCRIPTION
request_id

The ID of the request to get the reply for.

TYPE: str

url

The URL of the FastAPI server.

TYPE: str

terminate

An event to signal termination, defaults to None.

TYPE: Event | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: The reply from the hub.

join_hub

join_hub(user: Controller) -> dict[str, Any]

Join a hub.

PARAMETER DESCRIPTION
user

The user controller to join the hub.

TYPE: Controller

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: The registry of the hub.

send_command staticmethod

send_command(
    command: str | bytes,
    url: str,
    request_ids: dict[str, Controller],
    users: dict[str, Controller],
) -> dict[str, Any]

Send a command to the hub.

PARAMETER DESCRIPTION
command

The command to send.

TYPE: str | bytes

url

The URL of the FastAPI server.

TYPE: str

request_ids

A dictionary mapping request IDs to users.

TYPE: dict[str, Controller]

users

A dictionary mapping user addresses to user controllers.

TYPE: dict[str, Controller]

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: The request ID returned by the hub.

FastAPIWorkerClient

FastAPIWorkerClient(host: str, port: int = 8000)

FastAPIWorkerClient is a singleton class that manages worker connections to a FastAPI server. It allows workers to register with the hub, send commands, and receive replies from the hub. It maintains a registry of workers and their associated terminate and pause events, allowing for communication between workers and the hub.

ATTRIBUTE DESCRIPTION
url

The URL of the FastAPI server.

TYPE: str

workers

A dictionary mapping worker addresses to worker controllers.

TYPE: dict[str, Controller]

terminate_events

A dictionary mapping worker addresses to terminate events.

TYPE: dict[str, Event]

pause_events

A dictionary mapping worker addresses to pause events.

TYPE: dict[str, Event]

METHOD DESCRIPTION
update_registry

Register a worker with the hub.

get_command

Get a command from the hub for a specific worker.

send_reply

Send a reply to the hub.

create_listen_loop

Create a loop for the worker to listen for commands from the hub.

Initialize the FastAPIWorkerClient with the host and port.

PARAMETER DESCRIPTION
host

The host address for the FastAPI server.

TYPE: str

port

The port number for the FastAPI server, defaults to 8000.

TYPE: int DEFAULT: 8000

create_listen_loop staticmethod

create_listen_loop(
    worker: Controller,
    sender: str | None = None,
    terminate: Event | None = None,
    pause: Event | None = None,
) -> Callable

Create a loop for the worker to listen for commands from the hub.

PARAMETER DESCRIPTION
worker

The worker controller to listen for commands.

TYPE: Controller

sender

The address of the sender, defaults to None.

TYPE: str | None DEFAULT: None

terminate

An event to signal termination, defaults to None.

TYPE: Event | None DEFAULT: None

pause

An event to signal pause, defaults to None.

TYPE: Event | None DEFAULT: None

RETURNS DESCRIPTION
Callable

A function that runs the loop for the worker.

TYPE: Callable

get_command staticmethod

get_command(
    target: str, url: str, terminate: Event | None = None
) -> dict[str, Any]

Get a reply from the hub.

PARAMETER DESCRIPTION
target

The address of the target worker.

TYPE: str

url

The URL of the FastAPI server.

TYPE: str

terminate

An event to signal termination, defaults to None.

TYPE: Event | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: The command from the hub.

send_reply staticmethod

send_reply(reply: str | bytes, url: str) -> dict[str, Any]

Send a reply to the hub.

PARAMETER DESCRIPTION
reply

The reply to send.

TYPE: str | bytes

url

The URL of the FastAPI server.

TYPE: str

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: The reply ID returned by the hub.

update_registry

update_registry(
    worker: Controller, terminate: Event | None = None
) -> dict[str, Any]

Register a worker with the hub.

PARAMETER DESCRIPTION
worker

The worker controller to register.

TYPE: Controller

terminate

An event to signal termination, defaults to None.

TYPE: Event | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any]

dict[str, Any]: The registry of the hub.

create_fastapi_user

create_fastapi_user(
    host: str, port: int, address: str | None = None
) -> tuple[Controller, dict[str, Any]]

Create a FastAPI client instance.

PARAMETER DESCRIPTION
host

The host address for the FastAPI server.

TYPE: str

port

The port number for the FastAPI server.

TYPE: int

address

The address of the user, defaults to None.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
tuple[Controller, dict[str, Any]]

tuple[Controller, dict[str, Any]]: A tuple containing the Controller instance and a dictionary with the client.

create_fastapi_worker

create_fastapi_worker(
    host: str, port: int, address: str | None = None
) -> tuple[Controller, dict[str, Any]]

Create a FastAPI client instance.

PARAMETER DESCRIPTION
host

The host address for the FastAPI server.

TYPE: str

port

The port number for the FastAPI server.

TYPE: int

address

The address of the worker, defaults to None.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
tuple[Controller, dict[str, Any]]

tuple[Controller, dict[str, Any]]: A tuple containing the Controller instance and a dictionary with the client.