Writing Queue Plugins

Event handlers

For each configured queue, Azafea needs an event handler to which it passes the pulled events.

An event handler is a Python module located somewhere Azafea can import it, which means it must be in Python’s sys.path.

An event handler needs at the very least a process() function at the top level of the module, defined as follows:

def process(dbsession: DbSession, record: bytes) -> None:
    ...

Azafea will call the function and pass it the following arguments:

dbsession

An instance of the azafea.model.DbSession class, itself a subclass of the sqlalchemy.orm.session.Session class.

The session is connected to the PostgreSQL database, and the handler can simply add() model instances, and they will be automatically inserted into the right table.

record

This contains the bytes representing the event as pulled from the Redis queue associated with this event handler.

The handler is free to do whatever it wants with this, but will typically deserialize the record, process the result and then instanciate a custom model to store the event in PostgreSQL.

Here is an example of a complete event handler module, which should just work if you copy-paste it:

import json
import logging

from sqlalchemy.orm.session import Session
from sqlalchemy.schema import Column
from sqlalchemy.types import DateTime, Integer, Unicode

from azafea.model import Base


log = logging.getLogger(__name__)


class MyEvent(Base):
    __tablename__ = 'my_event'

    id = Column(Integer, primary_key=True)
    name = Column(Unicode, nullable=False)
    timestamp = Column(DateTime(timezone=True), nullable=False)


def process(dbsession: Session, record: bytes) -> None:
    # Deserialize the record; for example it could be JSON
    record = json.loads(record.decode('utf-8'))

    # Create the event, and add it to the transaction; it will be automatically
    # committed by Azafea if no error is raised
    event = MyEvent(**record)
    dbsession.add(event)

    # Models can be pretty-printed, to make debugging easier
    log.debug('Inserting event record:\n%s', event)

Let’s unpack the example a bit. First come the imports, and then:

log = logging.getLogger(__name__)

This will allow logging things in your handler, to help you debug-print things, or just to log informative stuff or problems which could occur.

class MyEvent(Base):

This declares a model class. Making it inherit from azafea.model.Base is how the model is registered into SQLAlchemy, so its table can be created and so Azafea can be able to insert the created records into the database.

class MyEvent(Base):
    __tablename__ = 'my_event'

    id = Column(Integer, primary_key=True)
    name = Column(Unicode, nullable=False)
    timestamp = Column(DateTime(timezone=True), nullable=False)

This defines the model and its associated table in the database, with its 3 columns, eventual constraints and indices on them, etc. Refer to the SQLAlchemy ORM documentation on mappings for more details on defining models.

def process(dbsession: Session, record: bytes) -> None:
    # Deserialize the record; for example it could be JSON
    record = json.loads(record.decode('utf-8'))

    # Create the event, and add it to the transaction; it will be automatically
    # committed by Azafea if no error is raised
    event = MyEvent(**record)
    dbsession.add(event)

    # Models can be pretty-printed, to make debugging easier
    log.debug('Inserting event record:\n%s', event)

This is the entry-point function for the handler, it is what Azafea will look for in the module and what it will run. Azafea will call it passing it those exact arguments.

The Session can be used to query the database, or to insert instances of your custom model. Refer to the SQLAlchemy ORM documentation on sessions for more details on how to use the Session.

Do note that except in some very specific cases, you should never have to explicitly commit() or rollback() the transaction, Azafea will take care of this automatically for you once your method returns and if no error is raised.

The record is simply the byte string which was pulled from Redis. It can be anything as far as Azafea is concerned: you know what your system put in Redis, you know how to process it in your handler.

The example above assumes it is a valid UTF-8-encoded JSON string which can be directly used to construct the model instance. Your actual event handler can do any amount of processing here.

Custom subcommands

In addition to an event handler, each configured queue may optionally register its own subcommands, to be launched through the main azafea CLI.

Any configured handler can include a register_commands() function at the top level of the module, defined as follows:

def register_commands(subs: argparse._SubParsersAction) -> None:
    ...

Azafea will call the function and pass it the following argument:

subs

An instance of the argparse._SubParsersAction class.

Here is an example of a complete CLI registration module, which should just work if you copy-paste it:

import argparse

from azafea.config import Config


def register_commands(subs: argparse._SubParsersAction) -> None:
    something = subs.add_parser('do-something',
                                help='This will do something specific to the queue')
    something.set_defaults(subcommand=do_something)

    something_else = subs.add_parser('do-something-else',
                                     help='This will do something else specific to the queue')
    something_else.add_argument('-f', '--force', help='Forcefully do it')
    something_else.set_defaults(subcommand=do_something_else)


def do_something(config: Config, args: argparse.Namespace) -> None:
    print("We're doing something!")


def do_something_else(config: Config, args: argparse.Namespace) -> None:
    if not args.force:
        print("We're doing something else!")
    else:
        print("WE'RE DOING SOMETHING ELSE!!!")

The above registers the do-something and do-something-else subcommands, the latter with an optional --force argument and its -f shorthand.

Let’s say the configuration for the queue is:

[queues.some-queue]
handler = "another.python.module.processor"

Then the do-something subcommand becomes accessible to the user:

$ azafea -c path/to/config.toml some-queue -h
usage: azafea some-queue [-h] {do-something,do-something-else} ...

optional arguments:
  -h, --help            show this help message and exit

subcommands:
  {do-something,do-something-else}
    do-something        This will do something specific to the queue
    do-something-else   This will do something else specific to the queue
$ azafea -c config.toml some-queue do-something
We're doing something!
$ azafea -c config.toml some-queue do-something-else --force
WE'RE DOING SOMETHING ELSE!!!

As can be seen above, the custom subcommands specific to the some-queue queue are available to the CLI under a some-queue command, not directly at the root of the azafea command.

You can use any facility provided by Python’s argparse module when registering your subcommands.

Database Migrations

If your queue plugin has its own model, you will eventually want to update it.

Azafea provides a handy command to automatically generate migration scripts based on the difference between your model and the state of the database:

$ azafea make-migration some-queue

This uses Alembic under the hood, which will usually detect model changes correctly, but not everything, so you might need to manually adapt some migration scripts eventually.

Note that your queue plugin will need a migrations/ directory inside the handler module.

Alembic allows multiple branches to be deployed at the same time. It can for example happen when two pull requests are open and then merged, each one including a migration script. If the two migrations script are independent, they are both deployed without any problems: this is a powerful and harmless feature of Alembic. If these two migration scripts are conflicting, the second deployment will fail, but the database transaction used by Alembic will prevent anything wrong to happen.

When multiple branches are deployed, their commit numbers will all be included in the alembic_version table. It is then impossible to create a new deployment, as Alembic can’t know which head should be the parent. The solution is then to merge the branches, using a merge deployment script:

revision = '<random_hash>'
down_revision = ('<head_1_hash>', '<head_2_hash>', …)
branch_labels = None
depends_on = None

def upgrade():
    pass

def downgrade():
    pass

This deployment does not modify the database schema, but it replaces the old head hashes with the new one in the alembic_version table. Alembic is then back to normal, a new deployment can be added and will have the commit script as parent.