Triggers

class chancy.plugins.trigger.Trigger[source]

Bases: Plugin

Install database triggers on non-Chancy tables that create jobs when rows change.

The Trigger plugin allows you to automatically create jobs in response to database changes on any table. It uses PostgreSQL statement-level triggers for optimal performance with bulk operations. Since these triggers are run by the database itself, jobs will always be created - even if all Chancy workers are offline.

Tip

Database triggers should be avoided for extremely high-frequency operations or tables with very high write volumes, as they can introduce significant overhead.

Usage

You can register a job to run when a specific table is modified by creating a trigger. For example, to run a job whenever a row in the users table is inserted or updated, you can do the following:

from chancy import Chancy, Worker, Queue, job
from chancy.plugins.trigger import Trigger

@job(queue="user_events")
def process_user_change():
    pass

async with Chancy(
    "postgresql://localhost/postgres",
    plugins=[Trigger()]
) as chancy:
    trigger_id = await Trigger.register_trigger(
        chancy,
        table_name="users",
        operations=["INSERT", "UPDATE"],
        job_template=process_user_change,
    )

More information about what triggered the job can be found in the job’s metadata:

from chancy import job, QueuedJob

@job(queue="user_events")
def process_user_change(*, context: QueuedJob):
    print(context.meta["trigger"])

Would output something like:

{
    'operation': 'INSERT',
    'timestamp': '2025-05-29T05:51:26.777476+00:00',
    'table_name': 'test_users',
    'schema_name': 'public',
    'trigger_name': 'chancy_trigger_675ed409_4da2_4967_bfc7_7bc641f1cc92_insert'
}

The timestamp is particularly useful, as you can use it to select all recently changed rows when combined with a timestamp column in the table.

async classmethod disable_trigger(chancy: Chancy, trigger_id: UUID | str) bool[source]

Disable an enabled trigger.

Drops the database triggers but keeps the configuration, allowing the trigger to be re-enabled later. If the trigger is already disabled, this is a no-op.

Parameters:
  • chancy – The Chancy application instance.

  • trigger_id – The unique trigger identifier (UUID or string).

Returns:

True if the trigger was disabled, False if it was already disabled.

Raises:

ValueError – If the trigger_id does not exist.

async classmethod disable_trigger_ex(cursor: AsyncCursor[Dict[str, Any]], chancy: Chancy, trigger_id: UUID | str) bool[source]

Disable an enabled trigger (transaction-aware version).

Parameters:
  • cursor – The database cursor to use for the query.

  • chancy – The Chancy application instance.

  • trigger_id – The unique trigger identifier (UUID or string).

Returns:

True if the trigger was disabled, False if it was already disabled.

Raises:

ValueError – If the trigger_id does not exist.

async classmethod enable_trigger(chancy: Chancy, trigger_id: UUID | str) bool[source]

Enable a disabled trigger.

Re-creates the database triggers for a previously disabled trigger. If the trigger is already enabled, this is a no-op.

Parameters:
  • chancy – The Chancy application instance.

  • trigger_id – The unique trigger identifier (UUID or string).

Returns:

True if the trigger was enabled, False if it was already enabled.

Raises:

ValueError – If the trigger_id does not exist.

async classmethod enable_trigger_ex(cursor: AsyncCursor[Dict[str, Any]], chancy: Chancy, trigger_id: UUID | str) bool[source]

Enable a disabled trigger (transaction-aware version).

Parameters:
  • cursor – The database cursor to use for the query.

  • chancy – The Chancy application instance.

  • trigger_id – The unique trigger identifier (UUID or string).

Returns:

True if the trigger was enabled, False if it was already enabled.

Raises:

ValueError – If the trigger_id does not exist.

static get_identifier() str[source]

Returns a unique identifier for this plugin.

This identifier should be unique across all active plugins. If a custom plugin provides compatible functionality to a built-in plugin, it may use the same identifier as the built-in plugin.

get_tables() list[str][source]

Get the names of all tables this plugin is responsible for.

By default, returns an empty list.

async classmethod get_triggers(chancy: Chancy, *, trigger_ids: List[UUID | str] | None = None) dict[UUID, TriggerConfig][source]

Get registered triggers by their IDs.

Retrieves trigger configurations from the database. Can fetch all triggers or filter by specific IDs.

Example:
>>> # Get all triggers
>>> all_triggers = await Trigger.get_triggers(chancy)
>>>
>>> # Get specific triggers
>>> triggers = await Trigger.get_triggers(
>>>     chancy,
>>>     trigger_ids=[trigger_id1, trigger_id2]
>>> )
>>> for trigger_id, config in triggers.items():
>>>     print(f"{config.table_name}: {config.enabled}")
Parameters:
  • chancy – The Chancy application instance.

  • trigger_ids – Optional list of trigger IDs to filter by (UUID or string).

Returns:

Dictionary mapping trigger UUID to TriggerConfig.

async classmethod get_triggers_ex(cursor: AsyncCursor[Dict[str, Any]], chancy: Chancy, *, trigger_ids: List[UUID | str] | None = None) dict[UUID, TriggerConfig][source]

Get registered triggers by their IDs (transaction-aware version).

Parameters:
  • cursor – The database cursor to use for the query.

  • chancy – The Chancy application instance.

  • trigger_ids – Optional list of trigger IDs to filter by (UUID or string).

Returns:

Dictionary mapping trigger UUID to TriggerConfig.

migrate_key() str | None[source]

Get the migration key for this plugin, if it has any.

migrate_package() str | None[source]

Get the package name that contains the migration scripts for this plugin, if it has any.

async classmethod register_trigger(chancy: Chancy, *, table_name: str, operations: List[str], job_template: Job | IsAJob, schema_name: str = 'public', enabled: bool = True) UUID[source]

Register a database trigger that creates jobs when table rows change.

This method creates a PostgreSQL trigger on the specified table that will automatically queue jobs when the specified DML operations occur. The trigger is statement-level, meaning one job is created per SQL statement regardless of how many rows are affected.

Example:
>>> from chancy import Chancy, job
>>> from chancy.plugins.trigger import Trigger
>>>
>>> @job(queue="events")
>>> def handle_user_change():
>>>     pass
>>>
>>> async with Chancy(..., plugins=[Trigger()]) as app:
>>>     trigger_id = await Trigger.register_trigger(
>>>         app,
>>>         table_name="users",
>>>         operations=["INSERT", "UPDATE"],
>>>         job_template=handle_user_change
>>>     )
Parameters:
  • chancy – The Chancy application instance.

  • table_name – Name of the table to monitor.

  • operations – List of operations to monitor. Must be one or more of: ‘INSERT’, ‘UPDATE’, ‘DELETE’. Case-insensitive.

  • job_template – Job template to create when trigger fires.

  • schema_name – Schema containing the table (default: ‘public’).

  • enabled – Whether the trigger should be enabled immediately (default: True).

Returns:

The unique trigger identifier (UUID).

Raises:

ValueError – If operations list is empty or contains invalid operations.

async classmethod register_trigger_ex(cursor: AsyncCursor[Dict[str, Any]], chancy: Chancy, table_name: str, operations: List[str], job_template: Job | IsAJob, *, schema_name: str = 'public', enabled: bool = True) UUID[source]

Register a database trigger (transaction-aware version).

This is a lower-level version of register_trigger that accepts an existing cursor object, allowing it to be used within transactions.

Parameters:
  • cursor – The database cursor to use for the query.

  • chancy – The Chancy application instance.

  • table_name – Name of the table to monitor.

  • operations – List of operations to monitor (‘INSERT’, ‘UPDATE’, ‘DELETE’).

  • job_template – Job template to create when trigger fires.

  • schema_name – Schema containing the table (default: ‘public’).

  • enabled – Whether the trigger should be enabled immediately (default: True).

Returns:

The unique trigger identifier (UUID).

Raises:

ValueError – If operations list is empty or contains invalid operations.

async classmethod unregister_trigger(chancy: Chancy, trigger_id: UUID | str) None[source]

Permanently remove a trigger.

This removes both the database triggers and the trigger configuration. Once unregistered, the trigger cannot be re-enabled and must be re-registered if needed again.

Parameters:
  • chancy – The Chancy application instance.

  • trigger_id – The unique trigger identifier to remove (UUID or string).

Raises:

ValueError – If the trigger_id does not exist.

async classmethod unregister_trigger_ex(cursor: AsyncCursor[Dict[str, Any]], chancy: Chancy, trigger_id: UUID | str) None[source]

Permanently remove a trigger (transaction-aware version).

Parameters:
  • cursor – The database cursor to use for the query.

  • chancy – The Chancy application instance.

  • trigger_id – The unique trigger identifier to remove (UUID or string).

Raises:

ValueError – If the trigger_id does not exist.

class chancy.plugins.trigger.TriggerConfig(id: UUID, table_name: str, schema_name: str, trigger_name: str, operations: List[str], job_template: Job, enabled: bool, created_at: datetime)[source]

Bases: object

Configuration for a registered trigger.

created_at: datetime[source]
enabled: bool[source]
id: UUID[source]
job_template: Job[source]
operations: List[str][source]
schema_name: str[source]
table_name: str[source]
trigger_name: str[source]