chancy.queue module¶
- class chancy.queue.Queue(name: str, concurrency: int = 1, tags: set[str] = <factory>, state: ~chancy.queue.Queue.State = State.ACTIVE, executor: str = 'chancy.executors.process.ProcessExecutor', executor_options: dict = <factory>, polling_interval: int = 1, rate_limit: int | None = None, rate_limit_window: int | None = None)[source]¶
Bases:
object
Queues are used to group jobs together and determine how they should be processed. Each queue has a name, concurrency level, and a set of tags that determine which workers can process jobs from the queue.
Queue’s must be declared using
declare()
before workers will be able to process jobs from them.async with Chancy("postgresql://localhost/postgres") as chancy: await chancy.declare(Queue(name="default", concurrency=4))
By default, this queue will shortly be picked up by all running workers and begin processing jobs. If you want to instead apply it to specific workers, you can assign it using “tags”:
async with Chancy("postgresql://localhost/postgres") as chancy: await chancy.declare(Queue(name="default", concurrency=4, tags={"reporting"}))
This will only be picked up by workers that have the “reporting” tag:
async with Chancy("postgresql://localhost/postgres") as chancy: await Worker(chancy, tags={"reporting"}).start()
Queues can have a global rate limit applied to them, which will be enforced across all workers processing jobs from the queue:
async with Chancy("postgresql://localhost/postgres") as chancy: await chancy.declare( Queue(name="default", rate_limit=10, rate_limit_window=60) )
This will limit the queue to processing 10 jobs per minute across all workers. If the rate limit is exceeded, jobs will be skipped until the rate limit window has passed. Combined with the AsyncExecutor, this can be a very easy way to work with external APIs.
Note
Rate limiting is done with a fixed window algorithm for simplicity. If you need to do something custom, subclass the worker and re-implement
fetch_jobs()
.- class State(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Bases:
Enum
- executor: str = 'chancy.executors.process.ProcessExecutor'[source]¶
The import path to the executor that should be used to process jobs in this queue.
- pack() dict [source]¶
Pack the queue into a dictionary that can be serialized and used to recreate the queue later.
- polling_interval: int = 1[source]¶
The number of seconds to wait between polling the queue for new jobs.
- rate_limit: int | None = None[source]¶
An optional global rate limit to apply to this queue. All workers processing jobs from this queue will be subject to this limit.