Compare commits

..

2 Commits

Author SHA1 Message Date
034f19aea9 Merge pull request 'deploy' (#4) from master into prod
Reviewed-on: #4
2024-10-11 07:17:13 +03:00
8a10b1d180 Merge pull request 'deploy' (#3) from master into prod
Reviewed-on: #3
2024-10-11 07:14:07 +03:00
18 changed files with 219 additions and 294 deletions

View File

@@ -3,16 +3,15 @@ version: "3.4"
services: services:
worker: bot:
image: mathwave/sprint-repo:ruz-bot image: mathwave/sprint-repo:ruz-bot
environment: environment:
MONGO_HOST: "mongo.develop.sprinthub.ru"
STAGE: "development" STAGE: "development"
MONGO_PASSWORD: $MONGO_PASSWORD_DEV MONGO_PASSWORD: $MONGO_PASSWORD_DEV
networks: TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV
- queues-development PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
- configurator command: bot
- mongo-development
command: worker
deploy: deploy:
mode: replicated mode: replicated
restart_policy: restart_policy:
@@ -24,11 +23,11 @@ services:
fetch: fetch:
image: mathwave/sprint-repo:ruz-bot image: mathwave/sprint-repo:ruz-bot
environment: environment:
MONGO_HOST: "mongo.develop.sprinthub.ru"
STAGE: "development" STAGE: "development"
MONGO_PASSWORD: $MONGO_PASSWORD_DEV MONGO_PASSWORD: $MONGO_PASSWORD_DEV
networks: TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV
- queues-development PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
- mongo-development
command: fetch command: fetch
deploy: deploy:
mode: replicated mode: replicated
@@ -41,11 +40,11 @@ services:
notify: notify:
image: mathwave/sprint-repo:ruz-bot image: mathwave/sprint-repo:ruz-bot
environment: environment:
MONGO_HOST: "mongo.develop.sprinthub.ru"
STAGE: "development" STAGE: "development"
MONGO_PASSWORD: $MONGO_PASSWORD_DEV MONGO_PASSWORD: $MONGO_PASSWORD_DEV
networks: TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV
- queues-development PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
- mongo-development
command: notify command: notify
deploy: deploy:
mode: replicated mode: replicated
@@ -58,12 +57,13 @@ services:
ruz-bot-nginx: ruz-bot-nginx:
image: mathwave/sprint-repo:ruz-bot image: mathwave/sprint-repo:ruz-bot
networks: networks:
- common-infra-nginx-development - common-infra-nginx
- queues-development
- mongo-development
environment: environment:
MONGO_HOST: "mongo.develop.sprinthub.ru"
STAGE: "development" STAGE: "development"
MONGO_PASSWORD: $MONGO_PASSWORD_DEV MONGO_PASSWORD: $MONGO_PASSWORD_DEV
TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
command: api command: api
deploy: deploy:
mode: replicated mode: replicated
@@ -74,11 +74,5 @@ services:
order: start-first order: start-first
networks: networks:
common-infra-nginx-development: common-infra-nginx:
external: true
queues-development:
external: true
configurator:
external: true
mongo-development:
external: true external: true

View File

@@ -3,20 +3,24 @@ version: "3.4"
services: services:
worker: bot:
image: mathwave/sprint-repo:ruz-bot image: mathwave/sprint-repo:ruz-bot
environment: environment:
MONGO_HOST: "mongo.sprinthub.ru"
STAGE: "production" STAGE: "production"
MONGO_PASSWORD: $MONGO_PASSWORD_PROD MONGO_PASSWORD: $MONGO_PASSWORD_PROD
networks: TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD
- queues PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
- configurator DEBUG: "false"
- mongo command: bot
command: worker
deploy: deploy:
mode: replicated mode: replicated
restart_policy: restart_policy:
condition: any condition: any
placement:
constraints:
- node.role == worker
- node.labels.zone == ru
update_config: update_config:
parallelism: 1 parallelism: 1
order: start-first order: start-first
@@ -24,18 +28,21 @@ services:
fetch: fetch:
image: mathwave/sprint-repo:ruz-bot image: mathwave/sprint-repo:ruz-bot
environment: environment:
MONGO_HOST: "mongo.sprinthub.ru"
STAGE: "production" STAGE: "production"
MONGO_PASSWORD: $MONGO_PASSWORD_PROD MONGO_PASSWORD: $MONGO_PASSWORD_PROD
TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
DEBUG: "false" DEBUG: "false"
networks:
- queues
- configurator
- mongo
command: fetch command: fetch
deploy: deploy:
mode: replicated mode: replicated
restart_policy: restart_policy:
condition: any condition: any
placement:
constraints:
- node.role == worker
- node.labels.zone == ru
update_config: update_config:
parallelism: 1 parallelism: 1
order: start-first order: start-first
@@ -43,18 +50,21 @@ services:
notify: notify:
image: mathwave/sprint-repo:ruz-bot image: mathwave/sprint-repo:ruz-bot
environment: environment:
MONGO_HOST: "mongo.sprinthub.ru"
STAGE: "production" STAGE: "production"
MONGO_PASSWORD: $MONGO_PASSWORD_PROD MONGO_PASSWORD: $MONGO_PASSWORD_PROD
TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
DEBUG: "false" DEBUG: "false"
networks:
- queues
- configurator
- mongo
command: notify command: notify
deploy: deploy:
mode: replicated mode: replicated
restart_policy: restart_policy:
condition: any condition: any
placement:
constraints:
- node.role == worker
- node.labels.zone == ru
update_config: update_config:
parallelism: 1 parallelism: 1
order: start-first order: start-first
@@ -63,17 +73,22 @@ services:
image: mathwave/sprint-repo:ruz-bot image: mathwave/sprint-repo:ruz-bot
networks: networks:
- common-infra-nginx - common-infra-nginx
- configurator
- mongo
environment: environment:
MONGO_HOST: "mongo.sprinthub.ru"
STAGE: "production" STAGE: "production"
MONGO_PASSWORD: $MONGO_PASSWORD_PROD MONGO_PASSWORD: $MONGO_PASSWORD_PROD
TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
DEBUG: "false" DEBUG: "false"
command: api command: api
deploy: deploy:
mode: replicated mode: replicated
restart_policy: restart_policy:
condition: any condition: any
placement:
constraints:
- node.role == worker
- node.labels.zone == ru
update_config: update_config:
parallelism: 1 parallelism: 1
order: start-first order: start-first
@@ -81,9 +96,3 @@ services:
networks: networks:
common-infra-nginx: common-infra-nginx:
external: true external: true
queues:
external: true
configurator:
external: true
mongo:
external: true

View File

@@ -28,7 +28,7 @@ jobs:
run: docker push mathwave/sprint-repo:ruz-bot run: docker push mathwave/sprint-repo:ruz-bot
deploy-dev: deploy-dev:
name: Deploy dev name: Deploy dev
runs-on: [prod] runs-on: [dev]
needs: push needs: push
steps: steps:
- name: login - name: login
@@ -41,4 +41,5 @@ jobs:
env: env:
TELEGRAM_TOKEN_DEV: ${{ secrets.TELEGRAM_TOKEN_DEV }} TELEGRAM_TOKEN_DEV: ${{ secrets.TELEGRAM_TOKEN_DEV }}
MONGO_PASSWORD_DEV: ${{ secrets.MONGO_PASSWORD_DEV }} MONGO_PASSWORD_DEV: ${{ secrets.MONGO_PASSWORD_DEV }}
run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml ruz-bot-development PLATFORM_SECURITY_TOKEN: ${{ secrets.PLATFORM_SECURITY_TOKEN }}
run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml ruz-bot

View File

@@ -7,7 +7,6 @@ RUN apt-get install -y locales locales-all
ENV LANGUAGE ru_RU.UTF-8 ENV LANGUAGE ru_RU.UTF-8
ENV LANG ru_RU.UTF-8 ENV LANG ru_RU.UTF-8
ENV LC_ALL ru_RU.UTF-8 ENV LC_ALL ru_RU.UTF-8
ENV PYTHONUNBUFFERED=1
COPY requirements.txt requirements.txt COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt RUN pip install -r requirements.txt
COPY . . COPY . .

View File

@@ -3,11 +3,9 @@ from flask import Flask, request
import settings import settings
from helpers.alice import Processor from helpers.alice import Processor
from helpers.mongo import mongo from helpers.mongo import mongo
from daemons import base
class Daemon(base.Daemon): def api():
def execute(self):
app = Flask(__name__) app = Flask(__name__)
@app.route('/stats/json', methods=['GET']) @app.route('/stats/json', methods=['GET'])

View File

@@ -1,3 +0,0 @@
class Daemon:
def execute(self):
raise NotImplementedError

20
daemons/bot.py Normal file
View File

@@ -0,0 +1,20 @@
import os
import telebot
from telebot.types import Message
from helpers.mongo import mongo
bot = telebot.TeleBot(os.getenv("TELEGRAM_TOKEN"))
@bot.message_handler(commands=['start'])
def on_start(message: Message):
mongo.users_collection.delete_many({"chat_id": message.chat.id})
do_action(message)
@bot.message_handler()
def do_action(message: Message):
from helpers.answer import Answer
Answer(message).process()

View File

@@ -6,8 +6,6 @@ from helpers import now, campus_timdelta
from helpers.mongo import mongo from helpers.mongo import mongo
from helpers.ruz import ruz from helpers.ruz import ruz
from daemons import base
def fetch_schedule_for_user(user: dict): def fetch_schedule_for_user(user: dict):
today = now(user) today = now(user)
@@ -77,8 +75,7 @@ def delete_old():
mongo.lessons_collection.delete_many({"end": {"$lte": datetime.datetime.now() - datetime.timedelta(days=1)}}) mongo.lessons_collection.delete_many({"end": {"$lte": datetime.datetime.now() - datetime.timedelta(days=1)}})
class Daemon(base.Daemon): def fetch():
def execute(self):
while True: while True:
logging.info("fetch start") logging.info("fetch start")
begin = datetime.datetime.now() begin = datetime.datetime.now()

View File

@@ -2,12 +2,12 @@ import datetime
import logging import logging
from time import sleep from time import sleep
from telebot.apihelper import ApiTelegramException
from daemons.bot import bot
from helpers import now from helpers import now
from helpers.mongo import mongo from helpers.mongo import mongo
from helpers.ruz import ruz from helpers.ruz import ruz
from daemons import base
from utils import queues
def process(): def process():
@@ -25,7 +25,13 @@ def process():
ans += f"🧑‍🏫 {(lesson['lecturer'] or 'Неизвестно')}\n" ans += f"🧑‍🏫 {(lesson['lecturer'] or 'Неизвестно')}\n"
if lesson.get('link', None): if lesson.get('link', None):
ans += f"🔗 {lesson['link']}" ans += f"🔗 {lesson['link']}"
queues.set_task('botalka_mailbox', {'project': 'ruz-bot', 'name': 'telegram-bot', 'body': {'text': f"Через {user['notify_minutes']} минут у тебя занятие!\n" + ans, 'chat_id': user["chat_id"], 'parse_mode': 'Markdown'}}, 1) try:
bot.send_message(
user["chat_id"],
f"Через {user['notify_minutes']} минут у тебя занятие!\n" + ans
)
except ApiTelegramException:
pass
mongo.lessons_collection.update_one({"_id": lesson['_id']}, {"$set": {"notified": True}}) mongo.lessons_collection.update_one({"_id": lesson['_id']}, {"$set": {"notified": True}})
time_now = datetime.datetime.now() time_now = datetime.datetime.now()
for user in mongo.users_collection.find({"next_daily_notify_time": {"$lte": time_now}}): for user in mongo.users_collection.find({"next_daily_notify_time": {"$lte": time_now}}):
@@ -40,7 +46,11 @@ def process():
else: else:
text = ruz.schedule_builder(lessons) text = ruz.schedule_builder(lessons)
try: try:
queues.set_task('botalka_mailbox', {'project': 'ruz-bot', 'name': 'telegram-bot', 'body': {'text': f"Уведомляю о занятиях! Твое расписание на {'сегодня' if user.get('daily_notify_today', True) else 'завтра'}:\n" + text, 'chat_id': user["chat_id"], 'parse_mode': 'Markdown'}}, 1) bot.send_message(
user["chat_id"],
f"Уведомляю о занятиях! Твое расписание на {'сегодня' if user.get('daily_notify_today', True) else 'завтра'}:\n" + text,
parse_mode='Markdown'
)
except: except:
pass pass
mongo.users_collection.update_one( mongo.users_collection.update_one(
@@ -62,6 +72,7 @@ def process():
ans += f"🧑‍🏫 {(lesson['lecturer'] or 'Неизвестно')}\n" ans += f"🧑‍🏫 {(lesson['lecturer'] or 'Неизвестно')}\n"
if lesson.get('link', None): if lesson.get('link', None):
ans += f"🔗 {lesson['link']}" ans += f"🔗 {lesson['link']}"
try:
mess = "Пары начутся через " mess = "Пары начутся через "
if user['first_lesson_notify'] == 30: if user['first_lesson_notify'] == 30:
mess += "30 минут" mess += "30 минут"
@@ -72,14 +83,18 @@ def process():
else: else:
mess += "12 часов" mess += "12 часов"
mess += "!\n\nТвоя первая пара:\n\n" + ans mess += "!\n\nТвоя первая пара:\n\n" + ans
queues.set_task('botalka_mailbox', {'project': 'ruz-bot', 'name': 'telegram-bot', 'body': {'text': mess, 'chat_id': user["chat_id"], 'parse_mode': 'Markdown'}}, 1) bot.send_message(
user["chat_id"],
mess
)
except ApiTelegramException:
pass
start_of_day = datetime.datetime(year=time_now.year, month=time_now.month, day=time_now.day) start_of_day = datetime.datetime(year=time_now.year, month=time_now.month, day=time_now.day)
mongo.lessons_collection.update_many({"begin": {"$gte": start_of_day, "$lt": (start_of_day + datetime.timedelta(days=1))}, "user_email": user["email"]}, {"$set": {"notified_today": True}}) mongo.lessons_collection.update_many({"begin": {"$gte": start_of_day, "$lt": (start_of_day + datetime.timedelta(days=1))}, "user_email": user["email"]}, {"$set": {"notified_today": True}})
break break
class Daemon(base.Daemon): def notify():
def execute(self):
while True: while True:
logging.info("notify start") logging.info("notify start")
begin = datetime.datetime.now() begin = datetime.datetime.now()

View File

@@ -1,19 +0,0 @@
from daemons import base
from utils import queues
import json
from telebot.types import Message
class Daemon(base.Daemon, queues.TasksHandlerMixin):
@property
def queue_name(self):
return 'ruz_bot_worker'
def execute(self):
self.poll()
def process(self, payload):
message: Message = Message.de_json(json.dumps(payload))
from helpers.answer import Answer
Answer(message).process()

View File

@@ -2,6 +2,10 @@ import logging.config
import sys import sys
import settings import settings
from daemons.api import api
from daemons.bot import bot
from daemons.fetch import fetch
from daemons.notify import notify
import locale import locale
@@ -9,25 +13,17 @@ logging.config.dictConfig(settings.logging_config)
locale.setlocale(locale.LC_TIME, 'ru_RU.UTF-8') locale.setlocale(locale.LC_TIME, 'ru_RU.UTF-8')
arg = sys.argv[-1] arg = sys.argv[-1]
if arg == "poll": if arg == "bot":
logging.info("poll is starting") logging.info("bot is starting")
from daemons.poll import Daemon bot.polling()
elif arg == 'worker':
logging.info("worker is starting")
from daemons.worker import Daemon
elif arg == 'mailbox':
logging.info("mailbox is starting")
from daemons.mailbox import Daemon
elif arg == "fetch": elif arg == "fetch":
logging.info("fetch is starting") logging.info("fetch is starting")
from daemons.fetch import Daemon fetch()
elif arg == "notify": elif arg == "notify":
logging.info("notify is starting") logging.info("notify is starting")
from daemons.notify import Daemon notify()
elif arg == "api": elif arg == "api":
logging.info("api is starting") logging.info("api is starting")
from daemons.api import Daemon api()
else: else:
raise ValueError(f"Unknown param {arg}") raise ValueError(f"Unknown param {arg}")
Daemon().execute()

View File

@@ -1,9 +1,9 @@
import logging import logging
from typing import Optional from typing import Optional
from daemons.bot import bot
from helpers import now from helpers import now
from helpers.mongo import mongo from helpers.mongo import mongo
from utils import queues
def try_parse(message: str) -> Optional[int]: def try_parse(message: str) -> Optional[int]:
@@ -84,7 +84,7 @@ class Processor:
} }
else: else:
mongo.users_collection.update_one({"yandex_code": code}, {"$set": {"yandex_id": self.user_id, "yandex_code": None}}) mongo.users_collection.update_one({"yandex_code": code}, {"$set": {"yandex_id": self.user_id, "yandex_code": None}})
queues.set_task('ruz_bot_mailbox', {'text': "Алиса успешно подключена!", 'chat_id': user["chat_id"]}, 1) bot.send_message(user['chat_id'], "Алиса успешно подключена!")
lesson = self.get_lesson_for_user(user['chat_id']) lesson = self.get_lesson_for_user(user['chat_id'])
if lesson is None: if lesson is None:
return { return {

View File

@@ -4,6 +4,7 @@ from random import choice
from telebot.types import Message, ReplyKeyboardRemove from telebot.types import Message, ReplyKeyboardRemove
from daemons.bot import bot
from daemons.fetch import fetch_schedule_for_user from daemons.fetch import fetch_schedule_for_user
from helpers import get_next_daily_notify_time from helpers import get_next_daily_notify_time
from helpers.keyboards import main_keyboard, notify_keyboard, yes_no_keyboard, again_keyboard, no_daily_notify, \ from helpers.keyboards import main_keyboard, notify_keyboard, yes_no_keyboard, again_keyboard, no_daily_notify, \
@@ -11,7 +12,6 @@ from helpers.keyboards import main_keyboard, notify_keyboard, yes_no_keyboard, a
from helpers.mongo import mongo from helpers.mongo import mongo
from helpers.sprint_platform import platform from helpers.sprint_platform import platform
from helpers.ruz import ruz from helpers.ruz import ruz
from utils import queues
class User: class User:
@@ -32,8 +32,6 @@ class Answer:
def __init__(self, message: Message): def __init__(self, message: Message):
self.message = message self.message = message
self.message_text = message.text or message.caption or "" self.message_text = message.text or message.caption or ""
if self.message_text.startswith('/start'):
mongo.users_collection.delete_many({"chat_id": message.chat.id})
user = mongo.users_collection.find_one({"chat_id": message.chat.id}) user = mongo.users_collection.find_one({"chat_id": message.chat.id})
if user is None: if user is None:
user = { user = {
@@ -55,6 +53,13 @@ class Answer:
def process(self): def process(self):
user = User(self.user['chat_id']) user = User(self.user['chat_id'])
try:
bot_enabled_exp = platform.get_experiment('bot_enabled')
if not bot_enabled_exp['enabled'] or not eval(bot_enabled_exp['condition']):
return
except Exception as exc:
logging.info(exc)
return
getattr( getattr(
self, self,
"handle_state_" + self.user['state'], "handle_state_" + self.user['state'],
@@ -67,10 +72,7 @@ class Answer:
def send_message(self, text, reply_markup=None, remove_keyboard=True, **kwargs): def send_message(self, text, reply_markup=None, remove_keyboard=True, **kwargs):
if reply_markup is None and remove_keyboard: if reply_markup is None and remove_keyboard:
reply_markup = ReplyKeyboardRemove() reply_markup = ReplyKeyboardRemove()
body = {'text': text, 'chat_id': self.user['chat_id'], 'parse_mode': 'Markdown'} bot.send_message(self.user['chat_id'], text, reply_markup=reply_markup, **kwargs)
if reply_markup:
body['reply_markup'] = reply_markup.to_json()
queues.set_task('botalka_mailbox', {'project': 'ruz-bot', 'name': 'telegram-bot', 'body': body}, 5)
def set_state(self, state: str): def set_state(self, state: str):
self.user['state'] = state self.user['state'] = state

View File

@@ -9,7 +9,7 @@ from helpers import now
class Mongo: class Mongo:
def __init__(self): def __init__(self):
url = f"mongodb://{settings.MONGO_USER}:{settings.MONGO_PASSWORD}@mongo:27017/" url = f"mongodb://{settings.MONGO_USER}:{settings.MONGO_PASSWORD}@{settings.MONGO_HOST}:27017/"
self.client = pymongo.MongoClient(url) self.client = pymongo.MongoClient(url)
self.database = self.client.get_database("ruz-bot") self.database = self.client.get_database("ruz-bot")
self.users_collection.create_index([ self.users_collection.create_index([

View File

@@ -12,8 +12,7 @@ fields = [
'date_start', 'date_start',
'date_end', 'date_end',
'lecturer_profiles', 'lecturer_profiles',
'stream_links', 'stream_links'
'type',
] ]

View File

@@ -15,8 +15,11 @@ class PlatformClient:
self.stage = stage self.stage = stage
self.configs = configs self.configs = configs
self.experiments = experiments self.experiments = experiments
self.endpoint = 'http://configurator/' self.endpoint = 'https://platform.sprinthub.ru/'
self.fetch_url = urllib.parse.urljoin(self.endpoint, '/api/v1/fetch') self.configs_url = urllib.parse.urljoin(self.endpoint, 'configs/get')
self.experiments_url = urllib.parse.urljoin(self.endpoint, 'experiments/get')
self.staff_url = urllib.parse.urljoin(self.endpoint, 'is_staff')
self.fetch_url = urllib.parse.urljoin(self.endpoint, 'fetch')
self.config_storage = {} self.config_storage = {}
self.experiment_storage = {} self.experiment_storage = {}
self.staff_storage = {} self.staff_storage = {}
@@ -41,6 +44,7 @@ class PlatformClient:
try: try:
response = get( response = get(
url, url,
headers={'X-Security-Token': self.platform_security_token},
params=params params=params
) )
if response.status_code == 200: if response.status_code == 200:

View File

View File

@@ -1,87 +0,0 @@
from concurrent.futures import ThreadPoolExecutor
import datetime
import os
import traceback
import zoneinfo
import requests
import time
stage = os.getenv("STAGE", 'local')
if stage == 'local':
QUEUES_URL = 'http://localhost:1239'
else:
QUEUES_URL = 'http://queues:1239'
class QueuesException(Exception):
...
class TasksHandlerMixin:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.executor = ThreadPoolExecutor(max_workers=1)
def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool):
def send():
requests.post(f'{QUEUES_URL}/api/v1/metric', json={
'service': 'ruz-bot',
'queue': self.queue_name,
'success': success,
'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z",
"success": success,
"execution_time_ms": (end - start).microseconds // 1000,
"environment": stage,
})
self.executor.submit(send)
def poll(self):
while True:
try:
response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json()
except requests.JSONDecodeError:
print('Unable to decode json')
time.sleep(3)
continue
task = response.get('task')
if not task:
time.sleep(0.2)
continue
start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow"))
try:
print(f'process task with id {task["id"]}, attempt {task["attempt"]}')
self.process(task['payload'])
success = True
except Exception as exc:
print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}')
traceback.print_exc()
success = False
end = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow"))
if success:
try:
resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']})
if resp.status_code != 202:
raise QueuesException
print(f'finish task with id {task["id"]}')
except:
print(f'Failed to finish task id={task["id"]}')
self._send_metric(start, end, success)
@property
def queue_name(self):
raise NotImplemented
def process(self, payload):
raise NotImplemented
def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None):
resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name}, json={
'payload': payload,
'seconds_to_execute': seconds_to_execute,
'delay': delay,
})
if resp.status_code != 202:
raise QueuesException