Compare commits

..

10 Commits

Author SHA1 Message Date
2a7ad6345f Merge pull request 'fix' (#46) from master into prod
Reviewed-on: #46
2024-12-28 14:23:39 +03:00
39bca8ead9 Merge pull request 'fix' (#45) from master into prod
Reviewed-on: #45
2024-12-28 14:19:04 +03:00
43184acb16 Merge pull request 'fix' (#44) from master into prod
Reviewed-on: #44
2024-12-28 13:50:12 +03:00
fb762e62e9 Merge pull request 'master' (#43) from master into prod
Reviewed-on: #43
2024-12-22 23:55:57 +03:00
56eb2f60ee Merge pull request 'add' (#40) from master into prod
Reviewed-on: #40
2024-12-21 22:21:38 +03:00
cf1f92dcc9 Merge pull request 'master' (#38) from master into prod
Reviewed-on: #38
2024-12-08 19:57:40 +03:00
48489e607e Merge pull request 'master' (#25) from master into prod
Reviewed-on: #25
2024-11-30 15:13:59 +03:00
185ce0c5ce Merge pull request 'master' (#22) from master into prod
Reviewed-on: #22
2024-11-29 20:35:37 +03:00
cab8e15ba8 Merge pull request 'master' (#19) from master into prod
Reviewed-on: #19
2024-11-28 23:14:36 +03:00
d3d92f56ee Merge pull request 'master' (#16) from master into prod
Reviewed-on: #16
2024-11-27 18:44:03 +03:00
2 changed files with 7 additions and 40 deletions

View File

@@ -1,7 +1,6 @@
import telebot import telebot
import multiprocessing import multiprocessing
import time import time
import json
from daemons import base from daemons import base
from utils import platform from utils import platform

View File

@@ -1,10 +1,4 @@
from concurrent.futures import ThreadPoolExecutor
import datetime
import json
import os import os
import traceback
import uuid
import zoneinfo
import requests import requests
import time import time
@@ -21,24 +15,6 @@ class QueuesException(Exception):
class TasksHandlerMixin: class TasksHandlerMixin:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.executor = ThreadPoolExecutor(max_workers=1)
def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool):
def send():
requests.post(f'{QUEUES_URL}/api/v1/metric', json={
'service': 'botalka',
'queue': self.queue_name,
'success': success,
'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z",
"success": success,
"execution_time_ms": (end - start).microseconds // 1000,
"environment": stage,
})
self.executor.submit(send)
def poll(self): def poll(self):
while True: while True:
try: try:
@@ -51,25 +27,17 @@ class TasksHandlerMixin:
if not task: if not task:
time.sleep(0.2) time.sleep(0.2)
continue continue
start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow"))
try: try:
print(f'process task with id {task["id"]}, attempt {task["attempt"]}')
self.process(task['payload']) self.process(task['payload'])
success = True
except Exception as exc: except Exception as exc:
print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}')
traceback.print_stack() continue
success = False
end = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow"))
if success:
try: try:
resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']})
if resp.status_code != 202: if resp.status_code != 202:
raise QueuesException raise QueuesException
print(f'finish task with id {task["id"]}')
except: except:
print(f'Failed to finish task id={task["id"]}') print(f'Failed to finish task id={task["id"]}')
self._send_metric(start, end, success)
@property @property
def queue_name(self): def queue_name(self):