Compare commits

..

3 Commits

Author SHA1 Message Date
23d6dedb01 Merge pull request 'fix' (#71) from metric into dev
Reviewed-on: #71
2025-06-15 14:46:47 +03:00
2de0fc5d78 Merge pull request 'fix' (#70) from metric into dev
Reviewed-on: #70
2025-06-15 14:43:12 +03:00
9e72357600 Merge pull request 'metric' (#69) from metric into dev
Reviewed-on: #69
2025-06-15 14:40:44 +03:00
2 changed files with 14 additions and 9 deletions

View File

@@ -22,6 +22,7 @@ services:
networks: networks:
- configurator - configurator
- queues-development - queues-development
- monitoring
environment: environment:
STAGE: "development" STAGE: "development"
command: mailbox command: mailbox
@@ -38,3 +39,5 @@ networks:
external: true external: true
queues-development: queues-development:
external: true external: true
monitoring:
external: true

View File

@@ -1,8 +1,6 @@
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import datetime import datetime
import json
import os import os
import uuid
import zoneinfo import zoneinfo
import requests import requests
import time import time
@@ -22,11 +20,11 @@ class QueuesException(Exception):
class TasksHandlerMixin: class TasksHandlerMixin:
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.executor = ThreadPoolExecutor(max_workers=1) self.executor = ThreadPoolExecutor(max_workers=4)
def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool):
def send(): try:
requests.post(f'{QUEUES_URL}/api/v1/metric', json={ resp = requests.post('http://monitoring:1237/api/v1/metrics/task', json={
'service': 'botalka', 'service': 'botalka',
'queue': self.queue_name, 'queue': self.queue_name,
'success': success, 'success': success,
@@ -35,8 +33,12 @@ class TasksHandlerMixin:
"execution_time_ms": (end - start).microseconds // 1000, "execution_time_ms": (end - start).microseconds // 1000,
"environment": stage, "environment": stage,
}) })
if resp.status_code == 202:
self.executor.submit(send) print("Metric ok")
else:
print(f'metric not ok: {resp.status_code}')
except Exception as e:
print(f"Error sending metric: {e}")
def poll(self): def poll(self):
while True: while True:
@@ -52,7 +54,7 @@ class TasksHandlerMixin:
continue continue
start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow"))
try: try:
print(f'process task with id {task["id"]}, attempt {task["attempt"]}') print(f'process task with id {task["id"]}')
self.process(task['payload']) self.process(task['payload'])
success = True success = True
except Exception as exc: except Exception as exc:
@@ -67,7 +69,7 @@ class TasksHandlerMixin:
print(f'finish task with id {task["id"]}') print(f'finish task with id {task["id"]}')
except: except:
print(f'Failed to finish task id={task["id"]}') print(f'Failed to finish task id={task["id"]}')
self._send_metric(start, end, success) self.executor.submit(self._send_metric, start, end, success)
@property @property
def queue_name(self): def queue_name(self):