@@ -1,4 +1,7 @@
 | 
			
		||||
from concurrent.futures import ThreadPoolExecutor
 | 
			
		||||
import datetime
 | 
			
		||||
import os
 | 
			
		||||
import zoneinfo
 | 
			
		||||
import requests
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
@@ -15,24 +18,54 @@ class QueuesException(Exception):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TasksHandlerMixin:
 | 
			
		||||
    def __init__(self, *args, **kwargs):
 | 
			
		||||
        super().__init__(*args, **kwargs)
 | 
			
		||||
        self.executor = ThreadPoolExecutor(max_workers=1)
 | 
			
		||||
 | 
			
		||||
    def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool):
 | 
			
		||||
        def send():
 | 
			
		||||
            requests.post(f'{QUEUES_URL}/api/v1/metric', json={
 | 
			
		||||
                'service': 'botalka',
 | 
			
		||||
                'queue': self.queue_name,
 | 
			
		||||
                'success': success,
 | 
			
		||||
                'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z",
 | 
			
		||||
                "success": success,
 | 
			
		||||
                "execution_time_ms": (end - start).microseconds // 1000,
 | 
			
		||||
                "environment": stage,
 | 
			
		||||
            })
 | 
			
		||||
        
 | 
			
		||||
        self.executor.submit(send)
 | 
			
		||||
 | 
			
		||||
    def poll(self):
 | 
			
		||||
        while True:
 | 
			
		||||
            response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json()
 | 
			
		||||
            try:
 | 
			
		||||
                response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json()
 | 
			
		||||
            except requests.JSONDecodeError:
 | 
			
		||||
                print('Unable to decode json')
 | 
			
		||||
                time.sleep(3)
 | 
			
		||||
                continue
 | 
			
		||||
            task = response.get('task')
 | 
			
		||||
            if not task:
 | 
			
		||||
                time.sleep(0.2)
 | 
			
		||||
                continue
 | 
			
		||||
            start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow"))
 | 
			
		||||
            try:
 | 
			
		||||
                print(f'process task with id {task["id"]}, attempt {task["attempt"]}')
 | 
			
		||||
                self.process(task['payload'])
 | 
			
		||||
                success = True
 | 
			
		||||
            except Exception as exc:
 | 
			
		||||
                print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}')
 | 
			
		||||
                continue
 | 
			
		||||
            try:
 | 
			
		||||
                resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']})
 | 
			
		||||
                if resp.status_code != 202:
 | 
			
		||||
                    raise QueuesException
 | 
			
		||||
            except:
 | 
			
		||||
                print(f'Failed to finish task id={task["id"]}')
 | 
			
		||||
                success = False
 | 
			
		||||
            end = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow"))
 | 
			
		||||
            if success:
 | 
			
		||||
                try:
 | 
			
		||||
                    resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']})
 | 
			
		||||
                    if resp.status_code != 202:
 | 
			
		||||
                        raise QueuesException
 | 
			
		||||
                    print(f'finish task with id {task["id"]}')
 | 
			
		||||
                except:
 | 
			
		||||
                    print(f'Failed to finish task id={task["id"]}')
 | 
			
		||||
            self._send_metric(start, end, success)
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def queue_name(self):
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user