chancy.app module

class chancy.app.Chancy(dsn: str, *, plugins: list[Plugin] = None, prefix: str = 'chancy_', min_connection_pool_size: int = 1, max_connection_pool_size: int = 10, poll_reconnect_timeout: int = 300, notifications: bool = True, log: Logger | None = None)[source]

Bases: object

A Chancy application, along with all of its configuration and common functionality.

Chancy is asyncio-first, and all of its methods are async by default:

import asyncio
from chancy import Chancy, Job, Queue

def my_job():
    print("Hello, world!")

async def main():
    async with Chancy("postgresql://localhost/chancy") as chancy:
        await chancy.migrate()
        await chancy.declare(Queue("default"))
        await chancy.push(Job.from_func(my_job))

asyncio.run(main())

Chancy does also provide a minimal synchronous interface for pushing jobs onto the queue for codebases that are not using asyncio:

from chancy import Chancy, Job, Queue

def my_job():
    print("Hello, world!")

with Chancy("postgresql://localhost/chancy") as chancy:
    chancy.sync_push(Job.from_func(my_job))

And of course, it’s just Postgres under the hood, so you can always just insert jobs directly into the database if you’re feeling brave.

Parameters:
  • dsn – The DSN to connect to the database.

  • plugins – The plugins to use with the application.

  • prefix – A prefix appended to all table names, which can be used to namespace the tables for multiple applications or tenants.

  • min_connection_pool_size – The minimum number of connections to keep in the pool.

  • max_connection_pool_size – The maximum number of connections to keep in the pool.

  • poll_reconnect_timeout – The number of seconds to wait before attempting to reconnect to the database if a connection is lost.

  • notifications – Enables or disables emitting notifications using postgres’s NOTIFY/LISTEN feature.

  • log – The logger to use for all application logging. If not provided, a default logger will be set up.

class Executor(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: StrEnum

Shortcuts for the built-in executors.

Async = 'chancy.executors.asyncex.AsyncExecutor'[source]

The asyncio executor, which runs jobs in an asyncio event loop.

Process = 'chancy.executors.process.ProcessExecutor'[source]

The default executor, which runs jobs in a process pool.

SubInterpreter = 'chancy.executors.sub.SubInterpreterExecutor'[source]

The subprocess executor, which runs jobs in an experimental subinterpreter.

Threaded = 'chancy.executors.thread.ThreadedExecutor'[source]

The threaded executor, which runs jobs in a thread pool.

async cancel_job(ref: Reference)[source]

Cancel a job by reference.

This will attempt to cancel a job that is currently running, if it is possible to do so. Notifications must be enabled for this to work.

Parameters:

ref – The reference to the job to cancel.

async declare(queue: Queue, *, upsert: bool = False) Queue[source]

Declare a queue in the database.

Queues in Chancy are the primary mechanism for organizing and executing jobs. They can be configured with various options to control how jobs are processed, including the concurrency level, rate limiting, and tags to restrict which workers can process jobs from the queue.

Queues exist globally, configured at runtime via the database and can be updated, reassigned, or deleted at any time without requiring a restart of the workers.

This will create a new queue in the database with the provided configuration. If the queue already exists, no changes will be made unless the upsert parameter is also set to True.

async with Chancy("postgresql://localhost/chancy") as chancy:
    await chancy.declare(Queue("default"))
Parameters:
  • queue – The queue to declare.

  • upsert – If True, the queue will be updated if it already exists. Defaults to False.

Returns:

The queue as it exists in the database.

async declare_ex(cursor: AsyncCursor, queue: Queue, *, upsert: bool = False) Queue[source]

Declare a queue in the database using a specific cursor.

This is a low-level method that allows for more control over the database connection and transaction management. It is recommended to use the higher-level declare method in most cases.

Parameters:
  • cursor – The cursor to use for the operation.

  • queue – The queue to declare.

  • upsert – If True, the queue will be updated if it already exists. Defaults to False.

async delete_queue(name: str, *, purge_jobs: bool = True)[source]

Delete a queue by name.

By default, also deletes all jobs in the queue. If you want to keep the jobs, set purge_jobs to False.

Parameters:
  • name – The name of the queue to delete.

  • purge_jobs – If True, all jobs in the queue will be deleted along with the queue. If False, the jobs will be left in the database.

dsn[source]

The DSN to connect to the database.

async get_all_queues() list[Queue][source]

Get all queues known to the cluster, regardless of their status or if they’re assigned to any workers.

async get_all_workers() list[dict[str, Any]][source]

Get all workers known to the cluster, regardless of their status.

async get_job(ref: Reference) QueuedJob | None[source]

Resolve a reference to a job instance.

If the job no longer exists, returns None.

Parameters:

ref – The reference to the job to retrieve.

async get_queue(name: str) Queue[source]

Get a specific queue by name.

Parameters:

name – The name of the queue to retrieve.

async is_up_to_date() bool[source]

Check if the database is up to date.

This will check if the database schema is up to date with the latest available migrations. If the database is up to date, returns True.

log[source]

The logger to use for all application logging.

max_connection_pool_size[source]

The maximum number of connections to keep in the pool.

async migrate(*, to_version: int | None = None)[source]

Migrate the database to the latest version.

This will migrate the core database tables as well as any enabled plugins. If a version is provided, the database will be migrated up _or_ down to that version as necessary.

async with Chancy("postgresql://localhost/chancy") as chancy:
    await chancy.migrate()

Can also be run with the CLI:

chancy --app worker.chancy misc migrate

Warning

You should never run migrations in production without first testing them in a development environment. Migrations can be destructive and may cause data loss if not written carefully, or take a long time to complete if the database is large.

Parameters:

to_version – The version to migrate to. If not provided, the database will be migrated to the latest version.

min_connection_pool_size[source]

The minimum number of connections to keep in the pool.

notifications[source]

Enables or disables emitting notifications using postgres’s NOTIFY/LISTEN feature.

async notify(cursor: AsyncCursor, event: str, payload: dict[str, Any])[source]

Send a notification via Postgres NOTIFY/LISTEN to all other listening workers.

If notifications are disabled on the associated Chancy app, this method will do nothing.

See also

sync_notify() for a synchronous version of this method.

plugins[source]

The plugins to use with the application.

poll_reconnect_timeout[source]

The number of seconds to wait before attempting to reconnect to the database if a connection is lost.

property pool: AsyncConnectionPool[source]

The asyncio connection pool used to interact with the database.

prefix[source]

A prefix appended to all table names, which can be used to namespace the tables for multiple applications or tenants.

async push(job: Job | IsAJob[..., Any]) Reference[source]

Push a job onto the queue.

This method will push a job onto the queue, making it available for processing by workers. A Reference object is returned that can be used to track the progress of the job and retrieve the result when it is complete.

async with Chancy("postgresql://localhost/chancy") as chancy:
    await chancy.push(Job.from_func(my_job))

See also

sync_push() for a synchronous version of this method.

Parameters:

job – The job to push onto the queue.

Returns:

A reference to the job in the queue.

async push_many(jobs: list[Job], *, batch_size: int = 1000) Iterator[list[Reference]][source]

Push multiple jobs onto the queue.

This method will push multiple jobs onto the queue, making them available for processing by workers. Jobs are committed in batches, with the size of each batch controlled by the batch_size parameter. Each batch is committed within its own transaction to limit peak memory usage.

Returns an iterator of lists of references to the jobs in the queue, one list per batch:

async for references in chancy.push_many(jobs):
    print(references)

See also

sync_push_many() for a synchronous version of this method.

Parameters:
  • jobs – The jobs to push onto the queue.

  • batch_size – The number of jobs to push in each batch. Defaults to 1000.

Returns:

An iterator of lists of references to the jobs in the queue.

async push_many_ex(cursor: AsyncCursor, jobs: list[Job | IsAJob[..., Any]]) list[Reference][source]

Push multiple jobs onto the queue using a specific cursor.

This is a low-level method that allows for more control over the database connection and transaction management. It is recommended to use the higher-level push_many method in most cases.

The main advantage of using this function is to ensure your jobs are only pushed if some other operation is successful. For example, you only want to send an email confirmation if account creation actually succeeds. For example:

See also

sync_push_many_ex() for a synchronous version of this method.

Parameters:
  • cursor – The cursor to use for the operation.

  • jobs – The jobs to push onto the queue.

Returns:

A list of references to the jobs in the queue.

sync_notify(cursor: Cursor, event: str, payload: dict[str, Any])[source]

Send a notification via Postgres NOTIFY/LISTEN to all other listening workers.

If notifications are disabled on the associated Chancy app, this method will do nothing.

See also

notify() for an asynchronous version of this method.

property sync_pool: ConnectionPool[source]

The synchronous connection pool used to interact with the database.

sync_push(job: Job) Reference[source]

Synchronously push a job onto the queue.

This method provides a synchronous interface to push a job onto the queue.

with Chancy("postgresql://localhost/chancy") as chancy:
    chancy.sync_push(Job.from_func(my_job))

See also

push() for an asynchronous version of this method.

Parameters:

job – The job to push onto the queue.

Returns:

A reference to the job in the queue.

sync_push_many(jobs: list[Job | IsAJob[..., Any]], *, batch_size: int = 1000) Iterator[list[Reference]][source]

Synchronously push multiple jobs onto the queue.

This method provides a synchronous interface to push multiple jobs onto the queue.

Returns an iterator of lists of references to the jobs in the queue, one list per batch.

See also

push_many() for an asynchronous version of this method.

Parameters:
  • jobs – The jobs to push onto the queue.

  • batch_size – The number of jobs to push in each batch. Defaults to 1000.

Returns:

An iterator of lists of references to the jobs in the queue.

sync_push_many_ex(cursor: Cursor, jobs: list[Job]) list[Reference][source]

Synchronously push multiple jobs onto the queue using a specific cursor.

This is a low-level method that allows for more control over the database connection and transaction management. It is recommended to use the higher-level sync_push_many method in most cases.

See also

push_many_ex() for an asynchronous version of this method.

Parameters:
  • cursor – The cursor to use for the operation.

  • jobs – The jobs to push onto the queue.

Returns:

A list of references to the jobs in the queue.

async wait_for_job(ref: Reference, *, interval: int = 1, timeout: float | int | None = None) QueuedJob | None[source]

Wait for a job to complete.

This method will loop until the job referenced by the provided reference has completed. The interval parameter controls how often the job status is checked. This will not block the event loop, so other tasks can run while waiting for the job to complete.

If the job no longer exists, returns None.

Parameters:
  • ref – The reference to the job to wait for.

  • interval – The number of seconds to wait between checks.

  • timeout – The maximum number of seconds to wait for the job to complete. If not provided, the method will wait indefinitely.

chancy.app.setup_default_logger(level: int = 20)[source]

Set up the default logger for the application.

This method will configure the logger for the application, setting the log level and adding a stream handler to log to the console.

Parameters:

level – The log level to set on the logger. Defaults to DEBUG.