fix
This commit is contained in:
		@@ -46,8 +46,29 @@ services:
 | 
			
		||||
        parallelism: 1
 | 
			
		||||
        order: start-first
 | 
			
		||||
 | 
			
		||||
  bot:
 | 
			
		||||
    image: mathwave/sprint-repo:b-jokes
 | 
			
		||||
    environment:
 | 
			
		||||
      MONGO_HOST: "mongo.develop.sprinthub.ru"
 | 
			
		||||
      MONGO_PASSWORD: $MONGO_PASSWORD_DEV
 | 
			
		||||
    command: bot
 | 
			
		||||
    networks:
 | 
			
		||||
      - configurator
 | 
			
		||||
      - queues-development
 | 
			
		||||
    deploy:
 | 
			
		||||
      mode: replicated
 | 
			
		||||
      restart_policy:
 | 
			
		||||
        condition: any
 | 
			
		||||
      update_config:
 | 
			
		||||
        parallelism: 1
 | 
			
		||||
        order: start-first
 | 
			
		||||
 | 
			
		||||
networks:
 | 
			
		||||
  b-jokes-net:
 | 
			
		||||
    driver: overlay
 | 
			
		||||
  common-infra-nginx:
 | 
			
		||||
    external: true
 | 
			
		||||
    external: true
 | 
			
		||||
  configurator:
 | 
			
		||||
    external: true
 | 
			
		||||
  queues-development:
 | 
			
		||||
    external: true
 | 
			
		||||
 
 | 
			
		||||
@@ -60,8 +60,29 @@ services:
 | 
			
		||||
        parallelism: 1
 | 
			
		||||
        order: start-first
 | 
			
		||||
 | 
			
		||||
  bot:
 | 
			
		||||
    image: mathwave/sprint-repo:b-jokes
 | 
			
		||||
    environment:
 | 
			
		||||
      MONGO_HOST: "mongo.sprinthub.ru"
 | 
			
		||||
      MONGO_PASSWORD: $MONGO_PASSWORD_PROD
 | 
			
		||||
    command: bot
 | 
			
		||||
    networks:
 | 
			
		||||
      - configurator
 | 
			
		||||
      - queues
 | 
			
		||||
    deploy:
 | 
			
		||||
      mode: replicated
 | 
			
		||||
      restart_policy:
 | 
			
		||||
        condition: any
 | 
			
		||||
      update_config:
 | 
			
		||||
        parallelism: 1
 | 
			
		||||
        order: start-first
 | 
			
		||||
 | 
			
		||||
networks:
 | 
			
		||||
  b-jokes-net:
 | 
			
		||||
    driver: overlay
 | 
			
		||||
  common-infra-nginx:
 | 
			
		||||
    external: true
 | 
			
		||||
  configurator:
 | 
			
		||||
    external: true
 | 
			
		||||
  queues:
 | 
			
		||||
    external: true
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										84
									
								
								helpers/configurator.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										84
									
								
								helpers/configurator.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,84 @@
 | 
			
		||||
import json
 | 
			
		||||
import urllib.parse
 | 
			
		||||
from threading import Thread
 | 
			
		||||
from time import sleep
 | 
			
		||||
 | 
			
		||||
from requests import get
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ConfiguratorClient:
 | 
			
		||||
    def __init__(self, app_name: str, stage: str, need_poll: bool = True):
 | 
			
		||||
        self.app_name = app_name
 | 
			
		||||
        self.stage = stage
 | 
			
		||||
        self.endpoint = 'http://configurator/'
 | 
			
		||||
        self.fetch_url = urllib.parse.urljoin(self.endpoint, '/api/v1/fetch')
 | 
			
		||||
        self.config_storage = {}
 | 
			
		||||
        self.experiment_storage = {}
 | 
			
		||||
        self.staff_storage = {}
 | 
			
		||||
        self.poll_data()
 | 
			
		||||
        if need_poll:
 | 
			
		||||
            self.poll_data_in_thread()
 | 
			
		||||
 | 
			
		||||
    def poll_data_in_thread(self):
 | 
			
		||||
        def inner():
 | 
			
		||||
            while True:
 | 
			
		||||
                sleep(30)
 | 
			
		||||
                self.fetch()
 | 
			
		||||
 | 
			
		||||
        Thread(target=inner, daemon=True).start()
 | 
			
		||||
 | 
			
		||||
    def poll_data(self):
 | 
			
		||||
        self.fetch(with_exception=True)
 | 
			
		||||
 | 
			
		||||
    def request_with_retries(self, url, params, with_exception=False, retries_count=3):
 | 
			
		||||
        exception_to_throw = None
 | 
			
		||||
        for _ in range(retries_count):
 | 
			
		||||
            try:
 | 
			
		||||
                response = get(
 | 
			
		||||
                    url,
 | 
			
		||||
                    params=params
 | 
			
		||||
                )
 | 
			
		||||
                if response.status_code == 200:
 | 
			
		||||
                    return response.json()
 | 
			
		||||
                print(f'Failed to request {url}, status_code={response.status_code}')
 | 
			
		||||
                exception_to_throw = Exception('Not 200 status')
 | 
			
		||||
            except Exception as exc:
 | 
			
		||||
                print(exc)
 | 
			
		||||
                exception_to_throw = exc
 | 
			
		||||
            sleep(1)
 | 
			
		||||
        print(f'Failed fetching with retries: {url}, {params}')
 | 
			
		||||
        if with_exception:
 | 
			
		||||
            raise exception_to_throw
 | 
			
		||||
 | 
			
		||||
    def fetch(self, with_exception=False):
 | 
			
		||||
        if self.stage == 'local':
 | 
			
		||||
            local_platform = json.loads(open('local_platform.json', 'r').read())
 | 
			
		||||
            self.config_storage = local_platform['configs']
 | 
			
		||||
            self.experiment_storage = local_platform['experiments']
 | 
			
		||||
            self.staff_storage = {
 | 
			
		||||
                key: set(value)
 | 
			
		||||
                for key, value in local_platform['platform_staff'].items()
 | 
			
		||||
            }
 | 
			
		||||
            return
 | 
			
		||||
        response_data = self.request_with_retries(self.fetch_url, {
 | 
			
		||||
            'project': self.app_name,
 | 
			
		||||
            'stage': self.stage,
 | 
			
		||||
        }, with_exception)
 | 
			
		||||
        self.config_storage = response_data['configs']
 | 
			
		||||
        self.experiment_storage = response_data['experiments']
 | 
			
		||||
        self.staff_storage = {
 | 
			
		||||
            key: set(value)
 | 
			
		||||
            for key, value in response_data['platform_staff'].items()
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    def is_staff(self, **kwargs):
 | 
			
		||||
        for key, value in kwargs.items():
 | 
			
		||||
            if value in self.staff_storage[key]:
 | 
			
		||||
                return True
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
    def get_config(self, name):
 | 
			
		||||
        return self.config_storage[name]
 | 
			
		||||
 | 
			
		||||
    def get_experiment(self, name):
 | 
			
		||||
        return self.experiment_storage[name]
 | 
			
		||||
							
								
								
									
										85
									
								
								helpers/queues.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										85
									
								
								helpers/queues.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,85 @@
 | 
			
		||||
from concurrent.futures import ThreadPoolExecutor
 | 
			
		||||
import datetime
 | 
			
		||||
import os
 | 
			
		||||
import zoneinfo
 | 
			
		||||
import requests
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
stage = os.getenv("STAGE", 'local')
 | 
			
		||||
if stage == 'local':
 | 
			
		||||
    QUEUES_URL = 'http://localhost:1239'
 | 
			
		||||
else:
 | 
			
		||||
    QUEUES_URL = 'http://queues:1239'
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class QueuesException(Exception):
 | 
			
		||||
    ...
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TasksHandlerMixin:
 | 
			
		||||
    def __init__(self, *args, **kwargs):
 | 
			
		||||
        super().__init__(*args, **kwargs)
 | 
			
		||||
        self.executor = ThreadPoolExecutor(max_workers=1)
 | 
			
		||||
 | 
			
		||||
    def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool):
 | 
			
		||||
        def send():
 | 
			
		||||
            requests.post(f'{QUEUES_URL}/api/v1/metric', json={
 | 
			
		||||
                'service': 'b-jokes',
 | 
			
		||||
                'queue': self.queue_name,
 | 
			
		||||
                'success': success,
 | 
			
		||||
                'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z",
 | 
			
		||||
                "success": success,
 | 
			
		||||
                "execution_time_ms": (end - start).microseconds // 1000,
 | 
			
		||||
                "environment": stage,
 | 
			
		||||
            })
 | 
			
		||||
        
 | 
			
		||||
        self.executor.submit(send)
 | 
			
		||||
 | 
			
		||||
    def poll(self):
 | 
			
		||||
        while True:
 | 
			
		||||
            try:
 | 
			
		||||
                response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json()
 | 
			
		||||
            except requests.JSONDecodeError:
 | 
			
		||||
                print('Unable to decode json')
 | 
			
		||||
                time.sleep(3)
 | 
			
		||||
                continue
 | 
			
		||||
            task = response.get('task')
 | 
			
		||||
            if not task:
 | 
			
		||||
                time.sleep(0.2)
 | 
			
		||||
                continue
 | 
			
		||||
            start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow"))
 | 
			
		||||
            try:
 | 
			
		||||
                print(f'process task with id {task["id"]}, attempt {task["attempt"]}')
 | 
			
		||||
                self.process(task['payload'])
 | 
			
		||||
                success = True
 | 
			
		||||
            except Exception as exc:
 | 
			
		||||
                print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}')
 | 
			
		||||
                success = False
 | 
			
		||||
            end = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow"))
 | 
			
		||||
            if success:
 | 
			
		||||
                try:
 | 
			
		||||
                    resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']})
 | 
			
		||||
                    if resp.status_code != 202:
 | 
			
		||||
                        raise QueuesException
 | 
			
		||||
                    print(f'finish task with id {task["id"]}')
 | 
			
		||||
                except:
 | 
			
		||||
                    print(f'Failed to finish task id={task["id"]}')
 | 
			
		||||
            self._send_metric(start, end, success)
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def queue_name(self):
 | 
			
		||||
        raise NotImplemented
 | 
			
		||||
 | 
			
		||||
    def process(self, payload):
 | 
			
		||||
        raise NotImplemented
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None):
 | 
			
		||||
    resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name}, json={
 | 
			
		||||
        'payload': payload,
 | 
			
		||||
        'seconds_to_execute': seconds_to_execute,
 | 
			
		||||
        'delay': delay,
 | 
			
		||||
    })
 | 
			
		||||
    if resp.status_code != 202:
 | 
			
		||||
        raise QueuesException
 | 
			
		||||
							
								
								
									
										36
									
								
								main.py
									
									
									
									
									
								
							
							
						
						
									
										36
									
								
								main.py
									
									
									
									
									
								
							@@ -1,5 +1,9 @@
 | 
			
		||||
import os
 | 
			
		||||
from flask import Flask, request, make_response
 | 
			
		||||
 | 
			
		||||
from helpers.configurator import ConfiguratorClient
 | 
			
		||||
from helpers.jokes import get_random
 | 
			
		||||
from helpers.queues import TasksHandlerMixin, set_task
 | 
			
		||||
import settings
 | 
			
		||||
from helpers.events import events
 | 
			
		||||
from processor import Processor
 | 
			
		||||
@@ -38,3 +42,35 @@ def run():
 | 
			
		||||
        a = 1 / 0
 | 
			
		||||
 | 
			
		||||
    app.run(host="0.0.0.0", port=8000, debug=settings.DEBUG)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def bot():
 | 
			
		||||
    configurator = ConfiguratorClient("b-jokes", os.getenv("STAGE", "local"))
 | 
			
		||||
    class Bot(TasksHandlerMixin):
 | 
			
		||||
        @property
 | 
			
		||||
        def queue_name(self):
 | 
			
		||||
            return "b_jokes_worker"
 | 
			
		||||
 | 
			
		||||
        def process(self, payload):
 | 
			
		||||
            text = payload.get('text')
 | 
			
		||||
            if not text:
 | 
			
		||||
                return
 | 
			
		||||
            for word in configurator.get_config('words'):
 | 
			
		||||
                if word in text:
 | 
			
		||||
                    mes = 'Держи шутку!\n' + get_random()
 | 
			
		||||
                    set_task(
 | 
			
		||||
                        "botalka_mailbox", 
 | 
			
		||||
                        {
 | 
			
		||||
                            'project': 'b-jokes',
 | 
			
		||||
                            'name': 'telegram-bot',
 | 
			
		||||
                            'body': {
 | 
			
		||||
                                'text': mes,
 | 
			
		||||
                                'reply_to_message_id': payload['message_id'],
 | 
			
		||||
                                'chat_id': payload['chat']['id'],
 | 
			
		||||
                            }
 | 
			
		||||
                        },
 | 
			
		||||
                        1
 | 
			
		||||
                    )
 | 
			
		||||
                    return
 | 
			
		||||
    
 | 
			
		||||
    Bot().poll()
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user