Pydantic models over AMQP
Go to file
Kroket Ltd 39203d34e9 Update README 2023-03-18 20:42:59 +02:00
kroketmq Initial commit 2023-03-14 19:47:50 +02:00
.gitignore Initial commit 2023-03-14 19:47:50 +02:00
MANIFEST.in Initial commit 2023-03-14 19:47:50 +02:00
README.md Update README 2023-03-18 20:42:59 +02:00
requirements.txt Initial commit 2023-03-14 19:47:50 +02:00
setup.cfg Initial commit 2023-03-14 19:47:50 +02:00
setup.py Initial commit 2023-03-14 19:47:50 +02:00

README.md

kroketmq

Pydantic models over AMQP.

  • asyncio AMQP client (aio_pika)
  • declarative exchange/queue definitions
  • automatic serialization/deserialization of Pydantic models

Usage

  1. Define some exchanges/queues
    • Exchange(name=..., exchange_type=...)
    • exchange.queue_add(my_queue)
    • client.add_exchange(exchange)
  2. Connect to RabbitMQ
    • client.setup()
  3. Listen and/or publish some messages
    • client.consume(queue=..., callback=...)
    • client.send(exchange=..., messages=..., routing_key=...)

Code example

import asyncio
from uuid import UUID
from pydantic import BaseModel

import aio_pika

from kroketmq import (
    AMQPMessageModel, 
    AMQPClient, 
    AMQPCredentials, 
    Exchange, 
    Queue, 
    MessageAck)


credentials = AMQPCredentials(
    host="127.0.0.1", 
    passwd="guest", 
    user="guest", 
    vhost="/")

# define an exchange
my_exchange = Exchange(
    name="my_app", 
    exchange_type=aio_pika.ExchangeType.TOPIC)


class ItemModel(AMQPMessageModel):
    foo: str
    bar: str


class NestedModelExample(AMQPMessageModel):
    uuid: UUID
    test: str
    nested: ItemModel


CoolProcessQueue = Queue(
    name="VeryCoolQueue",
    routing_key="my_app.bla.*",
    message_types=[NestedModelExample],
    durable=True)
my_exchange.queue_add(CoolProcessQueue)


amqp_client = AMQPClient(
    app_id='some_random_name', 
    credentials=credentials)


async def example_callback(routing_key: str, message: BaseModel) -> MessageAck:
    # Note: this callback **must** manually ack/nack/reject messages 
    # and upon exception, the default behaviour is to reject 
    # the message (auto-delete message from queue).
    [...]
    return MessageAck.ACK


async def main():
    amqp_client.add_exchange(my_exchange)
    await amqp_client.setup()

    # listen for incoming messages
    await amqp_client.consume(
        queue=CoolProcessQueue, 
        callback=example_callback)

asyncio.run(main())

Publishing a message

item_msg = ItemModel(
    foo="nested string",
    bar="nested string"
)

nested_msg = NestedModelExample(
    uuid='9e803ccc-012a-45e8-b892-2d20b44d8e73',
    test='test string',
    nested=item_msg
)

await amqp_client.send(
    exchange=my_exchange,
    messages=nested_msg,
    routing_key="my_app.bla.test")

Exchange types

aio_pika.ExchangeTypes

  • Direct
  • Fanout
  • Topic

License

MIT