ProcessExecutor¶
- class chancy.executors.process.ProcessExecutor(worker, queue, *, maximum_jobs_per_worker: int = 100, mp_context: BaseContext | None = None)[source]¶
Bases:
ConcurrentExecutor
An Executor which uses a process pool to run its jobs.
This executor is useful for running jobs that are CPU-bound, avoiding the GIL (Global SubInterpreter Lock) that Python uses to ensure thread safety.
To use this executor, simply pass the import path to this class in the
executor
field of your queue configuration or use theExecutor
shortcut:async with Chancy("postgresql://localhost/postgres") as chancy: await chancy.declare( Queue( name="default", concurrency=10, executor=Chancy.Executor.Process, executor_options={ "maximum_jobs_per_worker": 100, } ) )
- Parameters:
queue – The queue that this executor is associated with.
maximum_jobs_per_worker – The maximum number of jobs that each worker can run before being replaced. Handy if you are stuck using a library with memory leaks.
mp_context – The multiprocessing context to use. If not provided, the default “spawn” context will be used, which is the safest option on all platforms.
- async cancel(ref: Reference)[source]¶
Make an attempt to cancel a running job.
It’s not guaranteed that the job will be cancelled, nor is it guaranteed that the job will be cancelled in a timely manner. For example if the job is running a long computation in a C extension, it may not be possible to interrupt it until it returns.
- Parameters:
ref – The reference to the job to cancel.
- static job_signal_handler(signum: int, frame)[source]¶
Handles signals sent to a running job process.
Subclasses can override this method to provide additional functionality or to change the way that signals are handled.
Note
Care should be taken when overriding this method, as it is called within a separate process and may not have access to the same resources as the main process.
- classmethod job_wrapper(job: QueuedJob, pids_for_job) tuple[QueuedJob, Any] [source]¶
This is the function that is actually started by the process pool executor. It’s responsible for setting up necessary signals and limits, running the job, and returning the result.
Subclasses can override this method to provide additional functionality or to change the way that jobs are run.
Note
Care should be taken when overriding this method, as it is called within a separate process and may not have access to the same resources as the main process.
- classmethod on_initialize_worker()[source]¶
This method is called in each worker process before it begins running jobs. It can be used to perform any necessary setup, such as loading NLTK datasets or calling
django.setup()
.This isn’t called once per job but once per worker process until
maximum_jobs_per_worker
is reached (if set). After that, the worker process is replaced with a new one.Note
Care should be taken when overriding this method, as it is called within a separate process and may not have access to the same resources as the main process.