Compare commits

..

4 Commits

Author SHA1 Message Date
a105ca4422 Merge pull request 'fix' (#72) from metric into dev
Reviewed-on: #72
2025-06-15 14:53:37 +03:00
23d6dedb01 Merge pull request 'fix' (#71) from metric into dev
Reviewed-on: #71
2025-06-15 14:46:47 +03:00
2de0fc5d78 Merge pull request 'fix' (#70) from metric into dev
Reviewed-on: #70
2025-06-15 14:43:12 +03:00
9e72357600 Merge pull request 'metric' (#69) from metric into dev
Reviewed-on: #69
2025-06-15 14:40:44 +03:00
3 changed files with 16 additions and 10 deletions

View File

@@ -22,6 +22,7 @@ services:
networks: networks:
- configurator - configurator
- queues-development - queues-development
- monitoring
environment: environment:
STAGE: "development" STAGE: "development"
command: mailbox command: mailbox
@@ -38,3 +39,5 @@ networks:
external: true external: true
queues-development: queues-development:
external: true external: true
monitoring:
external: true

View File

@@ -22,6 +22,7 @@ services:
networks: networks:
- configurator - configurator
- queues - queues
- monitoring
environment: environment:
STAGE: "production" STAGE: "production"
command: mailbox command: mailbox
@@ -38,3 +39,5 @@ networks:
external: true external: true
queues: queues:
external: true external: true
monitoring:
external: true

View File

@@ -1,9 +1,6 @@
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import datetime import datetime
import json
import os import os
import traceback
import uuid
import zoneinfo import zoneinfo
import requests import requests
import time import time
@@ -23,11 +20,11 @@ class QueuesException(Exception):
class TasksHandlerMixin: class TasksHandlerMixin:
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.executor = ThreadPoolExecutor(max_workers=1) self.executor = ThreadPoolExecutor(max_workers=4)
def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool):
def send(): try:
requests.post(f'{QUEUES_URL}/api/v1/metric', json={ resp = requests.post('http://monitoring:1237/api/v1/metrics/task', json={
'service': 'botalka', 'service': 'botalka',
'queue': self.queue_name, 'queue': self.queue_name,
'success': success, 'success': success,
@@ -36,8 +33,12 @@ class TasksHandlerMixin:
"execution_time_ms": (end - start).microseconds // 1000, "execution_time_ms": (end - start).microseconds // 1000,
"environment": stage, "environment": stage,
}) })
if resp.status_code == 202:
self.executor.submit(send) print("Metric ok")
else:
print(f'metric not ok: {resp.status_code}')
except Exception as e:
print(f"Error sending metric: {e}")
def poll(self): def poll(self):
while True: while True:
@@ -58,7 +59,6 @@ class TasksHandlerMixin:
success = True success = True
except Exception as exc: except Exception as exc:
print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}')
traceback.print_stack()
success = False success = False
end = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) end = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow"))
if success: if success:
@@ -69,7 +69,7 @@ class TasksHandlerMixin:
print(f'finish task with id {task["id"]}') print(f'finish task with id {task["id"]}')
except: except:
print(f'Failed to finish task id={task["id"]}') print(f'Failed to finish task id={task["id"]}')
self._send_metric(start, end, success) self.executor.submit(self._send_metric, start, end, success)
@property @property
def queue_name(self): def queue_name(self):