initial
This commit is contained in:
96
BaseLib/queue.py
Normal file
96
BaseLib/queue.py
Normal file
@@ -0,0 +1,96 @@
|
||||
import json
|
||||
from json import JSONDecodeError
|
||||
from time import sleep
|
||||
|
||||
import pika
|
||||
from django import db
|
||||
from BaseLib.BaseDaemon import BaseDaemon
|
||||
from pika.adapters.utils.connection_workflow import AMQPConnectorException
|
||||
|
||||
from allinvest import settings
|
||||
|
||||
|
||||
def blocking_connection():
|
||||
return pika.BlockingConnection(
|
||||
pika.ConnectionParameters(
|
||||
host=settings.RABBITMQ_HOST,
|
||||
credentials=pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASSWORD)
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def send_to_queue(routing_key, payload):
|
||||
with blocking_connection() as connection:
|
||||
channel = connection.channel()
|
||||
queue_name = f"{settings.RABBITMQ_EXCHANGE}_{routing_key}"
|
||||
channel.exchange_declare(
|
||||
exchange=settings.RABBITMQ_EXCHANGE,
|
||||
exchange_type='direct'
|
||||
)
|
||||
channel.queue_declare(queue_name)
|
||||
channel.queue_bind(
|
||||
exchange=settings.RABBITMQ_EXCHANGE,
|
||||
queue=queue_name,
|
||||
routing_key=routing_key
|
||||
)
|
||||
channel.basic_publish(
|
||||
exchange=settings.RABBITMQ_EXCHANGE,
|
||||
routing_key=routing_key,
|
||||
body=json.dumps(payload).encode("UTF-8")
|
||||
)
|
||||
|
||||
|
||||
class MessagingSupport(BaseDaemon):
|
||||
queue_name = None
|
||||
attempts = 3
|
||||
payload = dict()
|
||||
|
||||
def process(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def consume(self, ch, method, properties, body):
|
||||
try:
|
||||
data = json.loads(body.decode('utf-8'))
|
||||
except JSONDecodeError:
|
||||
print("Message is not JSON decodable")
|
||||
return
|
||||
print(f"Got {data}, processing...")
|
||||
finished = False
|
||||
|
||||
for attempt in range(1, self.attempts + 1):
|
||||
print("attempt", attempt)
|
||||
try:
|
||||
db.close_old_connections()
|
||||
self.payload = data
|
||||
self.process()
|
||||
finished = True
|
||||
except Exception as e:
|
||||
print("failed with", str(e))
|
||||
raise
|
||||
sleep(1.5)
|
||||
if finished:
|
||||
break
|
||||
if finished:
|
||||
print("Process finished successfully")
|
||||
else:
|
||||
print("Process failed")
|
||||
|
||||
def handle(self, *args, **options):
|
||||
if self.queue_name is None:
|
||||
raise NotImplementedError("Queue name must be declared")
|
||||
print("start listening " + self.queue_name)
|
||||
while True:
|
||||
try:
|
||||
with pika.BlockingConnection(
|
||||
pika.ConnectionParameters(
|
||||
host=settings.RABBITMQ_HOST,
|
||||
credentials=pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASSWORD)
|
||||
)
|
||||
) as connection:
|
||||
channel = connection.channel()
|
||||
queue = f"{settings.RABBITMQ_EXCHANGE}_{self.queue_name}"
|
||||
channel.queue_declare(queue=queue)
|
||||
channel.basic_consume(queue=queue, on_message_callback=self.consume, auto_ack=True)
|
||||
channel.start_consuming()
|
||||
except AMQPConnectorException:
|
||||
print("connection to rabbit failed: reconnecting")
|
Reference in New Issue
Block a user