Merge pull request 'master' (#40) from master into dev
Reviewed-on: #40
This commit is contained in:
		
							
								
								
									
										7
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										7
									
								
								Makefile
									
									
									
									
									
								
							@@ -1,7 +0,0 @@
 | 
			
		||||
gen:
 | 
			
		||||
	pip install grpcio grpcio-tools
 | 
			
		||||
	curl https://platform.sprinthub.ru/generator >> generator.py
 | 
			
		||||
	python generator.py
 | 
			
		||||
	rm generator.py
 | 
			
		||||
run:
 | 
			
		||||
	python ./server.py
 | 
			
		||||
@@ -1,6 +1,4 @@
 | 
			
		||||
import os
 | 
			
		||||
import grpc
 | 
			
		||||
from queues import tasks_pb2_grpc
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
stage = os.getenv("STAGE", 'local')
 | 
			
		||||
@@ -11,8 +9,5 @@ else:
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Daemon:
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self.channel = grpc.insecure_channel(QUEUES_URL)
 | 
			
		||||
        self.stub = tasks_pb2_grpc.TasksStub(channel=self.channel)     
 | 
			
		||||
    def execute(self):
 | 
			
		||||
        raise NotImplemented
 | 
			
		||||
 
 | 
			
		||||
@@ -49,7 +49,6 @@ class Daemon(base.Daemon, queues.TasksHandlerMixin):
 | 
			
		||||
 | 
			
		||||
    def reply(self, text: str, chat_id: int, message_id: int):
 | 
			
		||||
        queues.set_task(
 | 
			
		||||
            self.stub,
 | 
			
		||||
            'botalka_mailbox', 
 | 
			
		||||
            {
 | 
			
		||||
                'project': 'pizda-bot',
 | 
			
		||||
@@ -65,7 +64,6 @@ class Daemon(base.Daemon, queues.TasksHandlerMixin):
 | 
			
		||||
 | 
			
		||||
    def send(self, text: str, chat_id: int):
 | 
			
		||||
        queues.set_task(
 | 
			
		||||
            self.stub,
 | 
			
		||||
            'botalka_mailbox', 
 | 
			
		||||
            {
 | 
			
		||||
                'project': 'pizda-bot',
 | 
			
		||||
 
 | 
			
		||||
@@ -1,8 +1,13 @@
 | 
			
		||||
import os
 | 
			
		||||
import requests
 | 
			
		||||
import time
 | 
			
		||||
from queues import tasks_pb2_grpc
 | 
			
		||||
from queues import tasks_pb2
 | 
			
		||||
 | 
			
		||||
from google.protobuf import json_format
 | 
			
		||||
 | 
			
		||||
stage = os.getenv("STAGE", 'local')
 | 
			
		||||
if stage == 'local':
 | 
			
		||||
    QUEUES_URL = 'http://localhost:1239'
 | 
			
		||||
else:
 | 
			
		||||
    QUEUES_URL = 'http://queues:1239'
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class QueuesException(Exception):
 | 
			
		||||
@@ -12,22 +17,22 @@ class QueuesException(Exception):
 | 
			
		||||
class TasksHandlerMixin:
 | 
			
		||||
    def poll(self):
 | 
			
		||||
        while True:
 | 
			
		||||
            response: tasks_pb2.TakeResponse = self.stub.Take(tasks_pb2.TakeRequest(queue=self.queue_name))
 | 
			
		||||
            task = response.task
 | 
			
		||||
            response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json()
 | 
			
		||||
            task = response.get('task')
 | 
			
		||||
            if not task:
 | 
			
		||||
                if response.retry_after:
 | 
			
		||||
                    time.sleep(response.retry_after)
 | 
			
		||||
                time.sleep(0.2)
 | 
			
		||||
                continue
 | 
			
		||||
            try:
 | 
			
		||||
                payload = json_format.MessageToDict(task.payload)
 | 
			
		||||
                self.process(payload)
 | 
			
		||||
                self.process(task['payload'])
 | 
			
		||||
            except Exception as exc:
 | 
			
		||||
                print(f'Error processing message id={task.id}, payload={payload}, exc={exc}')
 | 
			
		||||
                print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}')
 | 
			
		||||
                continue
 | 
			
		||||
            try:
 | 
			
		||||
                self.stub.Finish(tasks_pb2.FinishRequest(id=task.id))
 | 
			
		||||
                resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']})
 | 
			
		||||
                if resp.status_code != 202:
 | 
			
		||||
                    raise QueuesException
 | 
			
		||||
            except:
 | 
			
		||||
                print(f'Failed to finish task id={task.id}')
 | 
			
		||||
                print(f'Failed to finish task id={task["id"]}')
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def queue_name(self):
 | 
			
		||||
@@ -36,12 +41,12 @@ class TasksHandlerMixin:
 | 
			
		||||
    def process(self, payload):
 | 
			
		||||
        raise NotImplemented
 | 
			
		||||
 | 
			
		||||
def set_task(stub: tasks_pb2_grpc.TasksStub, queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None):
 | 
			
		||||
    stub.Put(
 | 
			
		||||
        tasks_pb2.PutRequest(
 | 
			
		||||
            queue=queue_name, 
 | 
			
		||||
            seconds_to_execute=seconds_to_execute, 
 | 
			
		||||
            delay=delay, 
 | 
			
		||||
            payload=payload
 | 
			
		||||
        )
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None):
 | 
			
		||||
    resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name}, json={
 | 
			
		||||
        'payload': payload,
 | 
			
		||||
        'seconds_to_execute': seconds_to_execute,
 | 
			
		||||
        'delay': delay,
 | 
			
		||||
    })
 | 
			
		||||
    if resp.status_code != 202:
 | 
			
		||||
        raise QueuesException
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user