Triggers¶
- class chancy.plugins.trigger.Trigger[source]¶
Bases:
PluginInstall database triggers on non-Chancy tables that create jobs when rows change.
The Trigger plugin allows you to automatically create jobs in response to database changes on any table. It uses PostgreSQL statement-level triggers for optimal performance with bulk operations. Since these triggers are run by the database itself, jobs will always be created - even if all Chancy workers are offline.
Tip
Database triggers should be avoided for extremely high-frequency operations or tables with very high write volumes, as they can introduce significant overhead.
Usage¶
You can register a job to run when a specific table is modified by creating a trigger. For example, to run a job whenever a row in the
userstable is inserted or updated, you can do the following:from chancy import Chancy, Worker, Queue, job from chancy.plugins.trigger import Trigger @job(queue="user_events") def process_user_change(): pass async with Chancy( "postgresql://localhost/postgres", plugins=[Trigger()] ) as chancy: trigger_id = await Trigger.register_trigger( chancy, table_name="users", operations=["INSERT", "UPDATE"], job_template=process_user_change, )
More information about what triggered the job can be found in the job’s metadata:
from chancy import job, QueuedJob @job(queue="user_events") def process_user_change(*, context: QueuedJob): print(context.meta["trigger"])
Would output something like:
{ 'operation': 'INSERT', 'timestamp': '2025-05-29T05:51:26.777476+00:00', 'table_name': 'test_users', 'schema_name': 'public', 'trigger_name': 'chancy_trigger_675ed409_4da2_4967_bfc7_7bc641f1cc92_insert' }
The timestamp is particularly useful, as you can use it to select all recently changed rows when combined with a timestamp column in the table.
- async classmethod disable_trigger(chancy: Chancy, trigger_id: UUID | str) bool[source]¶
Disable an enabled trigger.
Drops the database triggers but keeps the configuration, allowing the trigger to be re-enabled later. If the trigger is already disabled, this is a no-op.
- Parameters:
chancy – The Chancy application instance.
trigger_id – The unique trigger identifier (UUID or string).
- Returns:
True if the trigger was disabled, False if it was already disabled.
- Raises:
ValueError – If the trigger_id does not exist.
- async classmethod disable_trigger_ex(cursor: AsyncCursor[Dict[str, Any]], chancy: Chancy, trigger_id: UUID | str) bool[source]¶
Disable an enabled trigger (transaction-aware version).
- Parameters:
cursor – The database cursor to use for the query.
chancy – The Chancy application instance.
trigger_id – The unique trigger identifier (UUID or string).
- Returns:
True if the trigger was disabled, False if it was already disabled.
- Raises:
ValueError – If the trigger_id does not exist.
- async classmethod enable_trigger(chancy: Chancy, trigger_id: UUID | str) bool[source]¶
Enable a disabled trigger.
Re-creates the database triggers for a previously disabled trigger. If the trigger is already enabled, this is a no-op.
- Parameters:
chancy – The Chancy application instance.
trigger_id – The unique trigger identifier (UUID or string).
- Returns:
True if the trigger was enabled, False if it was already enabled.
- Raises:
ValueError – If the trigger_id does not exist.
- async classmethod enable_trigger_ex(cursor: AsyncCursor[Dict[str, Any]], chancy: Chancy, trigger_id: UUID | str) bool[source]¶
Enable a disabled trigger (transaction-aware version).
- Parameters:
cursor – The database cursor to use for the query.
chancy – The Chancy application instance.
trigger_id – The unique trigger identifier (UUID or string).
- Returns:
True if the trigger was enabled, False if it was already enabled.
- Raises:
ValueError – If the trigger_id does not exist.
- static get_identifier() str[source]¶
Returns a unique identifier for this plugin.
This identifier should be unique across all active plugins. If a custom plugin provides compatible functionality to a built-in plugin, it may use the same identifier as the built-in plugin.
- get_tables() list[str][source]¶
Get the names of all tables this plugin is responsible for.
By default, returns an empty list.
- async classmethod get_triggers(chancy: Chancy, *, trigger_ids: List[UUID | str] | None = None) dict[UUID, TriggerConfig][source]¶
Get registered triggers by their IDs.
Retrieves trigger configurations from the database. Can fetch all triggers or filter by specific IDs.
- Example:
>>> # Get all triggers >>> all_triggers = await Trigger.get_triggers(chancy) >>> >>> # Get specific triggers >>> triggers = await Trigger.get_triggers( >>> chancy, >>> trigger_ids=[trigger_id1, trigger_id2] >>> ) >>> for trigger_id, config in triggers.items(): >>> print(f"{config.table_name}: {config.enabled}")
- Parameters:
chancy – The Chancy application instance.
trigger_ids – Optional list of trigger IDs to filter by (UUID or string).
- Returns:
Dictionary mapping trigger UUID to TriggerConfig.
- async classmethod get_triggers_ex(cursor: AsyncCursor[Dict[str, Any]], chancy: Chancy, *, trigger_ids: List[UUID | str] | None = None) dict[UUID, TriggerConfig][source]¶
Get registered triggers by their IDs (transaction-aware version).
- Parameters:
cursor – The database cursor to use for the query.
chancy – The Chancy application instance.
trigger_ids – Optional list of trigger IDs to filter by (UUID or string).
- Returns:
Dictionary mapping trigger UUID to TriggerConfig.
- migrate_package() str | None[source]¶
Get the package name that contains the migration scripts for this plugin, if it has any.
- async classmethod register_trigger(chancy: Chancy, *, table_name: str, operations: List[str], job_template: Job | IsAJob, schema_name: str = 'public', enabled: bool = True) UUID[source]¶
Register a database trigger that creates jobs when table rows change.
This method creates a PostgreSQL trigger on the specified table that will automatically queue jobs when the specified DML operations occur. The trigger is statement-level, meaning one job is created per SQL statement regardless of how many rows are affected.
- Example:
>>> from chancy import Chancy, job >>> from chancy.plugins.trigger import Trigger >>> >>> @job(queue="events") >>> def handle_user_change(): >>> pass >>> >>> async with Chancy(..., plugins=[Trigger()]) as app: >>> trigger_id = await Trigger.register_trigger( >>> app, >>> table_name="users", >>> operations=["INSERT", "UPDATE"], >>> job_template=handle_user_change >>> )
- Parameters:
chancy – The Chancy application instance.
table_name – Name of the table to monitor.
operations – List of operations to monitor. Must be one or more of: ‘INSERT’, ‘UPDATE’, ‘DELETE’. Case-insensitive.
job_template – Job template to create when trigger fires.
schema_name – Schema containing the table (default: ‘public’).
enabled – Whether the trigger should be enabled immediately (default: True).
- Returns:
The unique trigger identifier (UUID).
- Raises:
ValueError – If operations list is empty or contains invalid operations.
- async classmethod register_trigger_ex(cursor: AsyncCursor[Dict[str, Any]], chancy: Chancy, table_name: str, operations: List[str], job_template: Job | IsAJob, *, schema_name: str = 'public', enabled: bool = True) UUID[source]¶
Register a database trigger (transaction-aware version).
This is a lower-level version of register_trigger that accepts an existing cursor object, allowing it to be used within transactions.
- Parameters:
cursor – The database cursor to use for the query.
chancy – The Chancy application instance.
table_name – Name of the table to monitor.
operations – List of operations to monitor (‘INSERT’, ‘UPDATE’, ‘DELETE’).
job_template – Job template to create when trigger fires.
schema_name – Schema containing the table (default: ‘public’).
enabled – Whether the trigger should be enabled immediately (default: True).
- Returns:
The unique trigger identifier (UUID).
- Raises:
ValueError – If operations list is empty or contains invalid operations.
- async classmethod unregister_trigger(chancy: Chancy, trigger_id: UUID | str) None[source]¶
Permanently remove a trigger.
This removes both the database triggers and the trigger configuration. Once unregistered, the trigger cannot be re-enabled and must be re-registered if needed again.
- Parameters:
chancy – The Chancy application instance.
trigger_id – The unique trigger identifier to remove (UUID or string).
- Raises:
ValueError – If the trigger_id does not exist.
- async classmethod unregister_trigger_ex(cursor: AsyncCursor[Dict[str, Any]], chancy: Chancy, trigger_id: UUID | str) None[source]¶
Permanently remove a trigger (transaction-aware version).
- Parameters:
cursor – The database cursor to use for the query.
chancy – The Chancy application instance.
trigger_id – The unique trigger identifier to remove (UUID or string).
- Raises:
ValueError – If the trigger_id does not exist.