ProcessExecutor

class chancy.executors.process.ProcessExecutor(worker, queue, *, maximum_jobs_per_worker: int = 100, mp_context: BaseContext | None = None)[source]

Bases: ConcurrentExecutor

An Executor which uses a process pool to run its jobs.

This executor is useful for running jobs that are CPU-bound, avoiding the GIL (Global SubInterpreter Lock) that Python uses to ensure thread safety.

To use this executor, simply pass the import path to this class in the executor field of your queue configuration or use the Executor shortcut:

async with Chancy("postgresql://localhost/postgres") as chancy:
    await chancy.declare(
        Queue(
            name="default",
            concurrency=10,
            executor=Chancy.Executor.Process,
            executor_options={
                "maximum_jobs_per_worker": 100,
            }
        )
    )
Parameters:
  • queue – The queue that this executor is associated with.

  • maximum_jobs_per_worker – The maximum number of jobs that each worker can run before being replaced. Handy if you are stuck using a library with memory leaks.

  • mp_context – The multiprocessing context to use. If not provided, the default “spawn” context will be used, which is the safest option on all platforms.

async cancel(ref: Reference)[source]

Make an attempt to cancel a running job.

It’s not guaranteed that the job will be cancelled, nor is it guaranteed that the job will be cancelled in a timely manner. For example if the job is running a long computation in a C extension, it may not be possible to interrupt it until it returns.

Parameters:

ref – The reference to the job to cancel.

static job_signal_handler(signum: int, frame)[source]

Handles signals sent to a running job process.

Subclasses can override this method to provide additional functionality or to change the way that signals are handled.

Note

Care should be taken when overriding this method, as it is called within a separate process and may not have access to the same resources as the main process.

classmethod job_wrapper(job: QueuedJob, pids_for_job) tuple[QueuedJob, Any][source]

This is the function that is actually started by the process pool executor. It’s responsible for setting up necessary signals and limits, running the job, and returning the result.

Subclasses can override this method to provide additional functionality or to change the way that jobs are run.

Note

Care should be taken when overriding this method, as it is called within a separate process and may not have access to the same resources as the main process.

classmethod on_initialize_worker()[source]

This method is called in each worker process before it begins running jobs. It can be used to perform any necessary setup, such as loading NLTK datasets or calling django.setup().

This isn’t called once per job but once per worker process until maximum_jobs_per_worker is reached (if set). After that, the worker process is replaced with a new one.

Note

Care should be taken when overriding this method, as it is called within a separate process and may not have access to the same resources as the main process.

async push(job: QueuedJob) Future[source]

Push a job onto the job pool.

async stop()[source]

Stop the executor, giving it a chance to clean up any resources it may have allocated to running jobs.

It is not safe to use the executor after this method has been called.