chancy.worker module

class chancy.worker.Worker(chancy: Chancy, *, worker_id: str | None = None, heartbeat_poll_interval: int = 30, heartbeat_timeout: int = 90, queue_change_poll_interval: int = 10, shutdown_timeout: int = 30, tags: set[str] | None = None, register_signal_handlers: bool = True)[source]

Bases: object

The Worker is responsible for polling queues for new jobs, running any configured plugins, and internal management such as heartbeats.

Starting a worker is simple:

async with Chancy(
    "postgresql://localhost/postgres",
) as chancy:
    await chancy.migrate()
    async with Worker(chancy) as worker:
        await worker.wait_for_shutdown()

Worker Tags

Worker tags control which workers run which queues. A tag is a string (typically in the format key=value) assigned to a worker. Declare a queue with a list of tags, and only workers with matching tags will run that queue.

Every worker automatically gets some tags, like hostname, worker_id, python, arch, os, and * (wildcard). You can also add custom tags when creating a worker.

async with Worker(chancy, tags={"has=gpu", "has=large-disk"}) as worker:
    await worker.wait_for_shutdown()

You could then assign a queue to only run on workers with the has=gpu tag:

await chancy.declare(
    Queue(name="default", tags={"has=gpu"}, concurrency=10),
    upsert=True  # Replaces existing settings
)

Tags are regexes, allowing for flexible matching:

Queue(name="default", tags={r"python=3\.11\.[0-9]+"}, concurrency=10)
param chancy:

The Chancy application that the worker is associated with.

param worker_id:

The ID of the worker, which must be globally unique. If not provided, a random UUID will be generated.

param heartbeat_poll_interval:

The number of seconds between heartbeat poll intervals.

param heartbeat_timeout:

The number of seconds before a worker is considered to have lost connection to the cluster.

param queue_change_poll_interval:

The number of seconds between checks for changes to the queues that the worker should be processing.

param shutdown_timeout:

The number of seconds to wait for a clean shutdown before forcing the worker to stop.

param tags:

Extra tags to associate with the worker.

param register_signal_handlers:

Whether to register signal handlers.

async announce_worker(conn: AsyncConnection)[source]

Announce the worker to the cluster.

This will insert the worker into the workers table, or update the last_seen timestamp if the worker is already present.

Parameters:

conn – The connection to use for the announcement.

chancy[source]

The Chancy application that the worker is associated with.

async fetch_jobs(queue: Queue, conn: AsyncConnection, *, up_to: int = 1) list[QueuedJob][source]

Fetch jobs from the queue for processing.

This method will fetch up to up_to jobs from the queue, mark them as running, and return them as a list of QueuedJob objects. If no jobs are available, an empty list will be returned.

It’s safe to call this method concurrently, as the jobs will be locked for the duration of the transaction.

Parameters:
  • queue – The queue to fetch jobs from.

  • conn – The database connection to use.

  • up_to – The maximum number of jobs to fetch.

heartbeat_poll_interval[source]

The number of seconds between heartbeat poll intervals.

heartbeat_timeout[source]

The number of seconds before a worker is considered to have lost connection to the cluster.

hub[source]

An event hub with cluster and local telemetry, useful for plugins.

is_leader[source]

An event that is set when the worker is the leader. This functionality is not enabled by default - a leadership plugin must be used to enable this event.

manager[source]

The task manager for the worker, responsible for managing all the worker’s internal asyncio tasks.

async on_signal(signum: int)[source]

Called when a signal is received.

By default, handles SIGTERM and SIGINT by starting worker shutdown. If the signal is received again, it will force a shutdown.

outgoing: Queue[QueuedJob][source]

A queue of updates waiting to be sent to the database.

queue_change_poll_interval[source]

How frequently to check for changes to the queues that this worker should be processing.

async queue_update(update: QueuedJob)[source]

Enqueue an update to a job instance.

This method will queue an update to a job instance to be processed in periodic batches, reducing the number of transactions that need to be made. You can control the frequency of these updates by setting the send_outgoing_interval attribute on the worker.

Parameters:

update – The job instance to update.

register_signal_handlers[source]

Whether to register signal handlers on startup of the worker.

send_outgoing_interval[source]

The number of seconds between sending outgoing updates to the database.

shutdown_event[source]

An event that is set when the worker is shutting down due to receiving a signal.

shutdown_timeout[source]

The number of seconds to wait for a clean shutdown before forcing the worker to stop.

async start()[source]

Start the worker and begin processing jobs.

Unlike run, this method does not wait for the worker to complete and returns immediately.

async stop() bool[source]

Stop the worker.

Attempts to stop the worker gracefully, sending a CancelledError to all running tasks and waiting up to shutdown_timeout seconds for them to complete before returning.

Returns True if the worker was stopped cleanly, or False if the worker returned due to the timeout expiring.

async wait_for_shutdown()[source]

Wait until the worker is stopped.

worker_id[source]

The ID of the worker, which must be globally unique.

worker_tags() set[str][source]

Return the tags associated with the worker.

Tags are used to limit which Queues get assigned to a worker, and may also be used by some plugins.

Returns:

A set of tags associated with the worker.