39203d34e9 | ||
---|---|---|
kroketmq | ||
.gitignore | ||
MANIFEST.in | ||
README.md | ||
requirements.txt | ||
setup.cfg | ||
setup.py |
README.md
kroketmq
Pydantic models over AMQP.
- asyncio AMQP client (
aio_pika
) - declarative exchange/queue definitions
- automatic serialization/deserialization of Pydantic models
Usage
- Define some exchanges/queues
Exchange(name=..., exchange_type=...)
exchange.queue_add(my_queue)
client.add_exchange(exchange)
- Connect to RabbitMQ
client.setup()
- Listen and/or publish some messages
client.consume(queue=..., callback=...)
client.send(exchange=..., messages=..., routing_key=...)
Code example
import asyncio
from uuid import UUID
from pydantic import BaseModel
import aio_pika
from kroketmq import (
AMQPMessageModel,
AMQPClient,
AMQPCredentials,
Exchange,
Queue,
MessageAck)
credentials = AMQPCredentials(
host="127.0.0.1",
passwd="guest",
user="guest",
vhost="/")
# define an exchange
my_exchange = Exchange(
name="my_app",
exchange_type=aio_pika.ExchangeType.TOPIC)
class ItemModel(AMQPMessageModel):
foo: str
bar: str
class NestedModelExample(AMQPMessageModel):
uuid: UUID
test: str
nested: ItemModel
CoolProcessQueue = Queue(
name="VeryCoolQueue",
routing_key="my_app.bla.*",
message_types=[NestedModelExample],
durable=True)
my_exchange.queue_add(CoolProcessQueue)
amqp_client = AMQPClient(
app_id='some_random_name',
credentials=credentials)
async def example_callback(routing_key: str, message: BaseModel) -> MessageAck:
# Note: this callback **must** manually ack/nack/reject messages
# and upon exception, the default behaviour is to reject
# the message (auto-delete message from queue).
[...]
return MessageAck.ACK
async def main():
amqp_client.add_exchange(my_exchange)
await amqp_client.setup()
# listen for incoming messages
await amqp_client.consume(
queue=CoolProcessQueue,
callback=example_callback)
asyncio.run(main())
Publishing a message
item_msg = ItemModel(
foo="nested string",
bar="nested string"
)
nested_msg = NestedModelExample(
uuid='9e803ccc-012a-45e8-b892-2d20b44d8e73',
test='test string',
nested=item_msg
)
await amqp_client.send(
exchange=my_exchange,
messages=nested_msg,
routing_key="my_app.bla.test")
Exchange types
aio_pika.ExchangeTypes
- Direct
- Fanout
- Topic
License
MIT