Metadata-Version: 2.3
Name: tchu
Version: 0.1.0
Summary: A simple RabbitMQ/Pika wrapper for publishing and consuming events
License: MIT
Keywords: rabbitmq,pika,amqp,messaging
Author: David Sigley
Author-email: djsigley@gmail.com
Requires-Python: >=3.7,<4.0
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Requires-Dist: pika (>=1.2.0,<2.0.0)
Project-URL: Documentation, https://github.com/sigularusrex/tchu
Project-URL: Homepage, https://github.com/sigularusrex/tchu
Project-URL: Repository, https://github.com/sigularusrex/tchu
Description-Content-Type: text/markdown

# tchu

A lightweight Python wrapper around Pika/RabbitMQ for easy event publishing and consuming.

## Installation

	chu @ git+https://github.com/Sigularusrex/tchu@main


make a management command like this:




	from tchu.consumer import ThreadedConsumer
	from django.core.management.base import BaseCommand

	from gc_api.subscribers.tchu_callback import tchu_callback
	from settings import RABBITMQ_BROKER_URL


	class Command(BaseCommand):
		help = "Launches Listener for Service A events: RabbitMQ"

		def handle(self, *args, **options):
			consumer = ThreadedConsumer(
				amqp_url=RABBITMQ_BROKER_URL,
				exchange="exchange-name",
				exchange_type="topic",
				threads=5,
				routing_keys=["event_topic.*"],
				callback=tchu_callback,
			)
			# Start consuming messages
			consumer.run()
Provide it with a callback function (Mine just blindly publishes to Celery :smile: )
import json

	import celery_pubsub


	def tchu_callback(ch, method, properties, body):
		try:
			print("External message received in Service A")
			data = json.loads(body)
			celery_pubsub.publish(method.routing_key, data)
			print("Message published in Service A")
		except Exception as e:
			print(f"Error publishing message: {e}")

...and you're good to go.

