Compare commits

..

1 Commits

Author SHA1 Message Date
44ce81947a Merge pull request 'fix' (#64) from metric into dev
Reviewed-on: #64
2025-06-15 05:07:48 +03:00
2 changed files with 18 additions and 13 deletions

View File

@@ -22,6 +22,7 @@ services:
networks: networks:
- configurator - configurator
- queues-development - queues-development
- monitoring
environment: environment:
STAGE: "development" STAGE: "development"
command: mailbox command: mailbox
@@ -38,3 +39,5 @@ networks:
external: true external: true
queues-development: queues-development:
external: true external: true
monitoring:
external: true

View File

@@ -1,8 +1,6 @@
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import datetime import datetime
import json
import os import os
import uuid
import zoneinfo import zoneinfo
import requests import requests
import time import time
@@ -15,18 +13,17 @@ else:
QUEUES_URL = 'http://queues:1239' QUEUES_URL = 'http://queues:1239'
executor = ThreadPoolExecutor(max_workers=1)
class QueuesException(Exception): class QueuesException(Exception):
... ...
class TasksHandlerMixin: class TasksHandlerMixin:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.executor = ThreadPoolExecutor(max_workers=1)
def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool):
def send(): try:
requests.post(f'{QUEUES_URL}/api/v1/metric', json={ resp = requests.post('http://monitoring:1237/api/v1/metrics/task', json={
'service': 'botalka', 'service': 'botalka',
'queue': self.queue_name, 'queue': self.queue_name,
'success': success, 'success': success,
@@ -35,8 +32,15 @@ class TasksHandlerMixin:
"execution_time_ms": (end - start).microseconds // 1000, "execution_time_ms": (end - start).microseconds // 1000,
"environment": stage, "environment": stage,
}) })
if resp.status_code == 202:
print("Metric ok")
else:
print(f'metric not ok: {resp.status_code}')
except Exception as e:
print(f"Error sending metric: {e}")
self.executor.submit(send) def send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool):
executor.submit(self._send_metric, start, end, success)
def poll(self): def poll(self):
while True: while True:
@@ -52,7 +56,6 @@ class TasksHandlerMixin:
continue continue
start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow"))
try: try:
print(f'process task with id {task["id"]}, attempt {task["attempt"]}')
self.process(task['payload']) self.process(task['payload'])
success = True success = True
except Exception as exc: except Exception as exc:
@@ -64,10 +67,9 @@ class TasksHandlerMixin:
resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']})
if resp.status_code != 202: if resp.status_code != 202:
raise QueuesException raise QueuesException
print(f'finish task with id {task["id"]}')
except: except:
print(f'Failed to finish task id={task["id"]}') print(f'Failed to finish task id={task["id"]}')
self._send_metric(start, end, success) self.send_metric(start, end, success)
@property @property
def queue_name(self): def queue_name(self):