-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrabbitmq.py
More file actions
47 lines (41 loc) · 1.61 KB
/
rabbitmq.py
File metadata and controls
47 lines (41 loc) · 1.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import pika
import os
class RabbitMQ:
def __init__(self):
self.user = os.getenv("RABBITMQ_USER", "guest")
self.password = os.getenv("RABBITMQ_PASSWORD", "guest")
self.host = os.getenv("RABBITMQ_HOST", "localhost")
self.port = int(os.getenv("RABBITMQ_PORT", 5672))
self.connection = None
self.channel = None
self.connect()
def connect(self):
credentials = pika.PlainCredentials(self.user, self.password)
parameters = pika.ConnectionParameters(
host=self.host, port=self.port, credentials=credentials
)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
def close(self):
if self.connection and not self.connection.is_closed:
self.connection.close()
def consume(self, queue_name, callback):
if not self.channel:
raise Exception("Connection is not established.")
self.channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True
)
self.channel.start_consuming()
def publish(self, queue_name, message):
if not self.channel:
raise Exception("Connection is not established.")
self.channel.queue_declare(queue=queue_name, durable=True)
self.channel.basic_publish(
exchange="",
routing_key=queue_name,
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
),
)
print(f"Sent message to queue {queue_name}: {message}")