Compare commits

..

2 Commits

Author SHA1 Message Date
81ca035eda Merge pull request 'fix' (#65) from metric into dev
Reviewed-on: #65
2025-06-15 05:11:51 +03:00
44ce81947a Merge pull request 'fix' (#64) from metric into dev
Reviewed-on: #64
2025-06-15 05:07:48 +03:00
3 changed files with 22 additions and 15 deletions

View File

@@ -22,6 +22,7 @@ services:
networks: networks:
- configurator - configurator
- queues-development - queues-development
- monitoring
environment: environment:
STAGE: "development" STAGE: "development"
command: mailbox command: mailbox
@@ -38,3 +39,5 @@ networks:
external: true external: true
queues-development: queues-development:
external: true external: true
monitoring:
external: true

View File

@@ -22,6 +22,7 @@ services:
networks: networks:
- configurator - configurator
- queues - queues
- monitoring
environment: environment:
STAGE: "production" STAGE: "production"
command: mailbox command: mailbox
@@ -38,3 +39,5 @@ networks:
external: true external: true
queues: queues:
external: true external: true
monitoring:
external: true

View File

@@ -1,9 +1,6 @@
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import datetime import datetime
import json
import os import os
import traceback
import uuid
import zoneinfo import zoneinfo
import requests import requests
import time import time
@@ -16,18 +13,17 @@ else:
QUEUES_URL = 'http://queues:1239' QUEUES_URL = 'http://queues:1239'
executor = ThreadPoolExecutor(max_workers=1)
class QueuesException(Exception): class QueuesException(Exception):
... ...
class TasksHandlerMixin: class TasksHandlerMixin:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.executor = ThreadPoolExecutor(max_workers=1)
def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool):
def send(): try:
requests.post(f'{QUEUES_URL}/api/v1/metric', json={ resp = requests.post('http://monitoring:1237/api/v1/metrics/task', json={
'service': 'botalka', 'service': 'botalka',
'queue': self.queue_name, 'queue': self.queue_name,
'success': success, 'success': success,
@@ -36,8 +32,16 @@ class TasksHandlerMixin:
"execution_time_ms": (end - start).microseconds // 1000, "execution_time_ms": (end - start).microseconds // 1000,
"environment": stage, "environment": stage,
}) })
if resp.status_code == 202:
print("Metric ok")
else:
print(f'metric not ok: {resp.status_code}')
except Exception as e:
print(f"Error sending metric: {e}")
self.executor.submit(send) def send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool):
# executor.submit(self._send_metric, start, end, success)
self._send_metric(start, end, success)
def poll(self): def poll(self):
while True: while True:
@@ -53,12 +57,10 @@ class TasksHandlerMixin:
continue continue
start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow"))
try: try:
print(f'process task with id {task["id"]}, attempt {task["attempt"]}')
self.process(task['payload']) self.process(task['payload'])
success = True success = True
except Exception as exc: except Exception as exc:
print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}')
traceback.print_stack()
success = False success = False
end = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) end = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow"))
if success: if success:
@@ -66,10 +68,9 @@ class TasksHandlerMixin:
resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']})
if resp.status_code != 202: if resp.status_code != 202:
raise QueuesException raise QueuesException
print(f'finish task with id {task["id"]}')
except: except:
print(f'Failed to finish task id={task["id"]}') print(f'Failed to finish task id={task["id"]}')
self._send_metric(start, end, success) self.send_metric(start, end, success)
@property @property
def queue_name(self): def queue_name(self):