chancy.queue module

class chancy.queue.Queue(name: str, concurrency: int = 1, tags: set[str] = <factory>, state: ~chancy.queue.Queue.State = State.ACTIVE, executor: str = 'chancy.executors.process.ProcessExecutor', executor_options: dict = <factory>, polling_interval: int = 1, rate_limit: int | None = None, rate_limit_window: int | None = None)[source]

Bases: object

Queues are used to group jobs together and determine how they should be processed. Each queue has a name, concurrency level, and a set of tags that determine which workers can process jobs from the queue.

Queue’s must be declared using declare() before workers will be able to process jobs from them.

async with Chancy("postgresql://localhost/postgres") as chancy:
    await chancy.declare(Queue(name="default", concurrency=4))

By default, this queue will shortly be picked up by all running workers and begin processing jobs. If you want to instead apply it to specific workers, you can assign it using “tags”:

async with Chancy("postgresql://localhost/postgres") as chancy:
    await chancy.declare(Queue(name="default", concurrency=4, tags={"reporting"}))

This will only be picked up by workers that have the “reporting” tag:

async with Chancy("postgresql://localhost/postgres") as chancy:
    await Worker(chancy, tags={"reporting"}).start()

Queues can have a global rate limit applied to them, which will be enforced across all workers processing jobs from the queue:

async with Chancy("postgresql://localhost/postgres") as chancy:
    await chancy.declare(
        Queue(name="default", rate_limit=10, rate_limit_window=60)
    )

This will limit the queue to processing 10 jobs per minute across all workers. If the rate limit is exceeded, jobs will be skipped until the rate limit window has passed. Combined with the AsyncExecutor, this can be a very easy way to work with external APIs.

Note

Rate limiting is done with a fixed window algorithm for simplicity. If you need to do something custom, subclass the worker and re-implement fetch_jobs().

class State(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

ACTIVE = 'active'[source]

The queue is active and jobs can be processed.

PAUSED = 'paused'[source]

The queue is paused and no jobs will be processed.

concurrency: int = 1[source]

The number of jobs that can be processed concurrently per worker.

executor: str = 'chancy.executors.process.ProcessExecutor'[source]

The import path to the executor that should be used to process jobs in this queue.

executor_options: dict[source]

The options to pass to the executor’s constructor.

name: str[source]

A globally unique identifier for the queue.

pack() dict[source]

Pack the queue into a dictionary that can be serialized and used to recreate the queue later.

polling_interval: int = 1[source]

The number of seconds to wait between polling the queue for new jobs.

rate_limit: int | None = None[source]

An optional global rate limit to apply to this queue. All workers processing jobs from this queue will be subject to this limit.

rate_limit_window: int | None = None[source]

The period of time over which the rate limit applies (in seconds).

state: State = 'active'[source]

The state of the queue.

tags: set[str][source]

The tags that determine which workers will process this queue.

classmethod unpack(data: dict) Queue[source]

Unpack a serialized queue object into a Queue instance.