Compare commits

..

2 Commits

Author SHA1 Message Date
90378be7c5 Merge pull request 'queues' (#5) from queues into dev
Reviewed-on: #5
2024-11-17 22:56:27 +03:00
95e4f5e8da Merge pull request 'deploy' (#2) from master into dev
Reviewed-on: #2
2024-10-11 07:05:29 +03:00
15 changed files with 202 additions and 113 deletions

View File

@@ -3,15 +3,29 @@ version: "3.4"
services: services:
worker: poll:
image: mathwave/sprint-repo:ruz-bot image: mathwave/sprint-repo:ruz-bot
environment: environment:
STAGE: "development"
TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV
QUEUES_TOKEN: $QUEUES_TOKEN_DEV
command: poll
deploy:
mode: replicated
restart_policy:
condition: any
update_config:
parallelism: 1
order: start-first
worker:
image: mathwave/sprint-repo:ruz-bot
environment:
MONGO_HOST: "mongo.develop.sprinthub.ru"
STAGE: "development" STAGE: "development"
MONGO_PASSWORD: $MONGO_PASSWORD_DEV MONGO_PASSWORD: $MONGO_PASSWORD_DEV
networks: PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
- queues-development QUEUES_TOKEN: $QUEUES_TOKEN_DEV
- configurator
- mongo-development
command: worker command: worker
deploy: deploy:
mode: replicated mode: replicated
@@ -21,14 +35,29 @@ services:
parallelism: 1 parallelism: 1
order: start-first order: start-first
mailbox:
image: mathwave/sprint-repo:ruz-bot
environment:
STAGE: "development"
TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV
QUEUES_TOKEN: $QUEUES_TOKEN_DEV
command: mailbox
deploy:
mode: replicated
restart_policy:
condition: any
update_config:
parallelism: 1
order: start-first
fetch: fetch:
image: mathwave/sprint-repo:ruz-bot image: mathwave/sprint-repo:ruz-bot
environment: environment:
MONGO_HOST: "mongo.develop.sprinthub.ru"
STAGE: "development" STAGE: "development"
MONGO_PASSWORD: $MONGO_PASSWORD_DEV MONGO_PASSWORD: $MONGO_PASSWORD_DEV
networks: PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
- queues-development QUEUES_TOKEN: $QUEUES_TOKEN_DEV
- mongo-development
command: fetch command: fetch
deploy: deploy:
mode: replicated mode: replicated
@@ -41,11 +70,11 @@ services:
notify: notify:
image: mathwave/sprint-repo:ruz-bot image: mathwave/sprint-repo:ruz-bot
environment: environment:
MONGO_HOST: "mongo.develop.sprinthub.ru"
STAGE: "development" STAGE: "development"
MONGO_PASSWORD: $MONGO_PASSWORD_DEV MONGO_PASSWORD: $MONGO_PASSWORD_DEV
networks: PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
- queues-development QUEUES_TOKEN: $QUEUES_TOKEN_DEV
- mongo-development
command: notify command: notify
deploy: deploy:
mode: replicated mode: replicated
@@ -58,12 +87,13 @@ services:
ruz-bot-nginx: ruz-bot-nginx:
image: mathwave/sprint-repo:ruz-bot image: mathwave/sprint-repo:ruz-bot
networks: networks:
- common-infra-nginx-development - common-infra-nginx
- queues-development
- mongo-development
environment: environment:
MONGO_HOST: "mongo.develop.sprinthub.ru"
STAGE: "development" STAGE: "development"
MONGO_PASSWORD: $MONGO_PASSWORD_DEV MONGO_PASSWORD: $MONGO_PASSWORD_DEV
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
QUEUES_TOKEN: $QUEUES_TOKEN_DEV
command: api command: api
deploy: deploy:
mode: replicated mode: replicated
@@ -74,11 +104,5 @@ services:
order: start-first order: start-first
networks: networks:
common-infra-nginx-development: common-infra-nginx:
external: true
queues-development:
external: true
configurator:
external: true
mongo-development:
external: true external: true

View File

@@ -3,15 +3,29 @@ version: "3.4"
services: services:
worker: poll:
image: mathwave/sprint-repo:ruz-bot image: mathwave/sprint-repo:ruz-bot
environment: environment:
STAGE: "production"
TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD
QUEUES_TOKEN: $QUEUES_TOKEN_PROD
command: poll
deploy:
mode: replicated
restart_policy:
condition: any
update_config:
parallelism: 1
order: start-first
worker:
image: mathwave/sprint-repo:ruz-bot
environment:
MONGO_HOST: "mongo.sprinthub.ru"
STAGE: "production" STAGE: "production"
MONGO_PASSWORD: $MONGO_PASSWORD_PROD MONGO_PASSWORD: $MONGO_PASSWORD_PROD
networks: PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
- queues QUEUES_TOKEN: $QUEUES_TOKEN_PROD
- configurator
- mongo
command: worker command: worker
deploy: deploy:
mode: replicated mode: replicated
@@ -21,21 +35,39 @@ services:
parallelism: 1 parallelism: 1
order: start-first order: start-first
mailbox:
image: mathwave/sprint-repo:ruz-bot
environment:
STAGE: "production"
TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD
QUEUES_TOKEN: $QUEUES_TOKEN_PROD
command: mailbox
deploy:
mode: replicated
restart_policy:
condition: any
update_config:
parallelism: 1
order: start-first
fetch: fetch:
image: mathwave/sprint-repo:ruz-bot image: mathwave/sprint-repo:ruz-bot
environment: environment:
MONGO_HOST: "mongo.sprinthub.ru"
STAGE: "production" STAGE: "production"
MONGO_PASSWORD: $MONGO_PASSWORD_PROD MONGO_PASSWORD: $MONGO_PASSWORD_PROD
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
DEBUG: "false" DEBUG: "false"
networks: QUEUES_TOKEN: $QUEUES_TOKEN_PROD
- queues
- configurator
- mongo
command: fetch command: fetch
deploy: deploy:
mode: replicated mode: replicated
restart_policy: restart_policy:
condition: any condition: any
placement:
constraints:
- node.role == worker
- node.labels.zone == ru
update_config: update_config:
parallelism: 1 parallelism: 1
order: start-first order: start-first
@@ -43,18 +75,21 @@ services:
notify: notify:
image: mathwave/sprint-repo:ruz-bot image: mathwave/sprint-repo:ruz-bot
environment: environment:
MONGO_HOST: "mongo.sprinthub.ru"
STAGE: "production" STAGE: "production"
MONGO_PASSWORD: $MONGO_PASSWORD_PROD MONGO_PASSWORD: $MONGO_PASSWORD_PROD
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
DEBUG: "false" DEBUG: "false"
networks: QUEUES_TOKEN: $QUEUES_TOKEN_PROD
- queues
- configurator
- mongo
command: notify command: notify
deploy: deploy:
mode: replicated mode: replicated
restart_policy: restart_policy:
condition: any condition: any
placement:
constraints:
- node.role == worker
- node.labels.zone == ru
update_config: update_config:
parallelism: 1 parallelism: 1
order: start-first order: start-first
@@ -63,17 +98,21 @@ services:
image: mathwave/sprint-repo:ruz-bot image: mathwave/sprint-repo:ruz-bot
networks: networks:
- common-infra-nginx - common-infra-nginx
- configurator
- mongo
environment: environment:
MONGO_HOST: "mongo.sprinthub.ru"
STAGE: "production" STAGE: "production"
MONGO_PASSWORD: $MONGO_PASSWORD_PROD MONGO_PASSWORD: $MONGO_PASSWORD_PROD
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
DEBUG: "false" DEBUG: "false"
command: api command: api
deploy: deploy:
mode: replicated mode: replicated
restart_policy: restart_policy:
condition: any condition: any
placement:
constraints:
- node.role == worker
- node.labels.zone == ru
update_config: update_config:
parallelism: 1 parallelism: 1
order: start-first order: start-first
@@ -81,9 +120,3 @@ services:
networks: networks:
common-infra-nginx: common-infra-nginx:
external: true external: true
queues:
external: true
configurator:
external: true
mongo:
external: true

View File

@@ -28,7 +28,7 @@ jobs:
run: docker push mathwave/sprint-repo:ruz-bot run: docker push mathwave/sprint-repo:ruz-bot
deploy-dev: deploy-dev:
name: Deploy dev name: Deploy dev
runs-on: [prod] runs-on: [dev]
needs: push needs: push
steps: steps:
- name: login - name: login
@@ -41,4 +41,6 @@ jobs:
env: env:
TELEGRAM_TOKEN_DEV: ${{ secrets.TELEGRAM_TOKEN_DEV }} TELEGRAM_TOKEN_DEV: ${{ secrets.TELEGRAM_TOKEN_DEV }}
MONGO_PASSWORD_DEV: ${{ secrets.MONGO_PASSWORD_DEV }} MONGO_PASSWORD_DEV: ${{ secrets.MONGO_PASSWORD_DEV }}
run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml ruz-bot-development PLATFORM_SECURITY_TOKEN: ${{ secrets.PLATFORM_SECURITY_TOKEN }}
QUEUES_TOKEN_DEV: ${{ secrets.QUEUES_TOKEN_DEV }}
run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml ruz-bot

View File

@@ -42,4 +42,5 @@ jobs:
TELEGRAM_TOKEN_PROD: ${{ secrets.TELEGRAM_TOKEN_PROD }} TELEGRAM_TOKEN_PROD: ${{ secrets.TELEGRAM_TOKEN_PROD }}
MONGO_PASSWORD_PROD: ${{ secrets.MONGO_PASSWORD_PROD }} MONGO_PASSWORD_PROD: ${{ secrets.MONGO_PASSWORD_PROD }}
PLATFORM_SECURITY_TOKEN: ${{ secrets.PLATFORM_SECURITY_TOKEN }} PLATFORM_SECURITY_TOKEN: ${{ secrets.PLATFORM_SECURITY_TOKEN }}
QUEUES_TOKEN_PROD: ${{ secrets.QUEUES_TOKEN_PROD }}
run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-prod.yaml ruz-bot run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-prod.yaml ruz-bot

View File

@@ -7,7 +7,6 @@ RUN apt-get install -y locales locales-all
ENV LANGUAGE ru_RU.UTF-8 ENV LANGUAGE ru_RU.UTF-8
ENV LANG ru_RU.UTF-8 ENV LANG ru_RU.UTF-8
ENV LC_ALL ru_RU.UTF-8 ENV LC_ALL ru_RU.UTF-8
ENV PYTHONUNBUFFERED=1
COPY requirements.txt requirements.txt COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt RUN pip install -r requirements.txt
COPY . . COPY . .

28
daemons/mailbox.py Normal file
View File

@@ -0,0 +1,28 @@
import telebot
import os
from daemons import base
from utils import queues
class Daemon(base.BaseDaemon, queues.TasksHandlerMixin):
def __init__(self):
super().__init__()
self.bot = telebot.TeleBot(os.getenv("TELEGRAM_TOKEN"))
@property
def queue_name(self):
return 'pizda_bot_mailbox'
def execute(self):
self.poll()
def process(self, payload):
body = {
'chat_id': payload['chat_id'],
'text': payload['text'],
}
reply_markup = payload.get('reply_markup')
if reply_markup:
body['reply_markup'] = reply_markup
self.bot.send_message(**body, parse_mode='Markdown')

View File

@@ -25,7 +25,7 @@ def process():
ans += f"🧑‍🏫 {(lesson['lecturer'] or 'Неизвестно')}\n" ans += f"🧑‍🏫 {(lesson['lecturer'] or 'Неизвестно')}\n"
if lesson.get('link', None): if lesson.get('link', None):
ans += f"🔗 {lesson['link']}" ans += f"🔗 {lesson['link']}"
queues.set_task('botalka_mailbox', {'project': 'ruz-bot', 'name': 'telegram-bot', 'body': {'text': f"Через {user['notify_minutes']} минут у тебя занятие!\n" + ans, 'chat_id': user["chat_id"], 'parse_mode': 'Markdown'}}, 1) queues.set_task('ruz_bot_mailbox', {'text': f"Через {user['notify_minutes']} минут у тебя занятие!\n" + ans, 'chat_id': user["chat_id"]}, 1)
mongo.lessons_collection.update_one({"_id": lesson['_id']}, {"$set": {"notified": True}}) mongo.lessons_collection.update_one({"_id": lesson['_id']}, {"$set": {"notified": True}})
time_now = datetime.datetime.now() time_now = datetime.datetime.now()
for user in mongo.users_collection.find({"next_daily_notify_time": {"$lte": time_now}}): for user in mongo.users_collection.find({"next_daily_notify_time": {"$lte": time_now}}):
@@ -40,7 +40,7 @@ def process():
else: else:
text = ruz.schedule_builder(lessons) text = ruz.schedule_builder(lessons)
try: try:
queues.set_task('botalka_mailbox', {'project': 'ruz-bot', 'name': 'telegram-bot', 'body': {'text': f"Уведомляю о занятиях! Твое расписание на {'сегодня' if user.get('daily_notify_today', True) else 'завтра'}:\n" + text, 'chat_id': user["chat_id"], 'parse_mode': 'Markdown'}}, 1) queues.set_task('ruz_bot_mailbox', {'text': f"Уведомляю о занятиях! Твое расписание на {'сегодня' if user.get('daily_notify_today', True) else 'завтра'}:\n" + text, 'chat_id': user["chat_id"]}, 1)
except: except:
pass pass
mongo.users_collection.update_one( mongo.users_collection.update_one(
@@ -72,7 +72,7 @@ def process():
else: else:
mess += "12 часов" mess += "12 часов"
mess += "!\n\nТвоя первая пара:\n\n" + ans mess += "!\n\nТвоя первая пара:\n\n" + ans
queues.set_task('botalka_mailbox', {'project': 'ruz-bot', 'name': 'telegram-bot', 'body': {'text': mess, 'chat_id': user["chat_id"], 'parse_mode': 'Markdown'}}, 1) queues.set_task('ruz_bot_mailbox', {'text': mess, 'chat_id': user["chat_id"]}, 1)
start_of_day = datetime.datetime(year=time_now.year, month=time_now.month, day=time_now.day) start_of_day = datetime.datetime(year=time_now.year, month=time_now.month, day=time_now.day)
mongo.lessons_collection.update_many({"begin": {"$gte": start_of_day, "$lt": (start_of_day + datetime.timedelta(days=1))}, "user_email": user["email"]}, {"$set": {"notified_today": True}}) mongo.lessons_collection.update_many({"begin": {"$gte": start_of_day, "$lt": (start_of_day + datetime.timedelta(days=1))}, "user_email": user["email"]}, {"$set": {"notified_today": True}})
break break

15
daemons/poll.py Normal file
View File

@@ -0,0 +1,15 @@
import os
import telebot
from daemons import base
from telebot import types
from utils import queues
class Daemon(base.BaseDaemon):
def execute(self):
bot = telebot.TeleBot(os.getenv("TELEGRAM_TOKEN"))
@bot.message_handler()
def do_action(message: types.Message):
queues.set_task('ruz_bot_worker', message.json, 1)
bot.polling()

View File

@@ -5,7 +5,7 @@ import json
from telebot.types import Message from telebot.types import Message
class Daemon(base.Daemon, queues.TasksHandlerMixin): class Daemon(base.BaseDaemon, queues.TasksHandlerMixin):
@property @property
def queue_name(self): def queue_name(self):
return 'ruz_bot_worker' return 'ruz_bot_worker'

View File

@@ -1,9 +1,9 @@
import logging import logging
from typing import Optional from typing import Optional
from daemons.bot import bot
from helpers import now from helpers import now
from helpers.mongo import mongo from helpers.mongo import mongo
from utils import queues
def try_parse(message: str) -> Optional[int]: def try_parse(message: str) -> Optional[int]:
@@ -84,7 +84,7 @@ class Processor:
} }
else: else:
mongo.users_collection.update_one({"yandex_code": code}, {"$set": {"yandex_id": self.user_id, "yandex_code": None}}) mongo.users_collection.update_one({"yandex_code": code}, {"$set": {"yandex_id": self.user_id, "yandex_code": None}})
queues.set_task('ruz_bot_mailbox', {'text': "Алиса успешно подключена!", 'chat_id': user["chat_id"]}, 1) bot.send_message(user['chat_id'], "Алиса успешно подключена!")
lesson = self.get_lesson_for_user(user['chat_id']) lesson = self.get_lesson_for_user(user['chat_id'])
if lesson is None: if lesson is None:
return { return {
@@ -94,4 +94,4 @@ class Processor:
return { return {
"text": f'Отлично, теперь я могу подсказывать тебе расписание. Твое ближайшее занятие в {lesson["begin"].strftime("%A %d %B %H:%M")}: {lesson["discipline"].replace("(рус)", "").replace("(анг)", "")}', "text": f'Отлично, теперь я могу подсказывать тебе расписание. Твое ближайшее занятие в {lesson["begin"].strftime("%A %d %B %H:%M")}: {lesson["discipline"].replace("(рус)", "").replace("(анг)", "")}',
"end_session": True "end_session": True
} }

View File

@@ -55,6 +55,13 @@ class Answer:
def process(self): def process(self):
user = User(self.user['chat_id']) user = User(self.user['chat_id'])
try:
bot_enabled_exp = platform.get_experiment('bot_enabled')
if not bot_enabled_exp['enabled'] or not eval(bot_enabled_exp['condition']):
return
except Exception as exc:
logging.info(exc)
return
getattr( getattr(
self, self,
"handle_state_" + self.user['state'], "handle_state_" + self.user['state'],
@@ -67,10 +74,10 @@ class Answer:
def send_message(self, text, reply_markup=None, remove_keyboard=True, **kwargs): def send_message(self, text, reply_markup=None, remove_keyboard=True, **kwargs):
if reply_markup is None and remove_keyboard: if reply_markup is None and remove_keyboard:
reply_markup = ReplyKeyboardRemove() reply_markup = ReplyKeyboardRemove()
body = {'text': text, 'chat_id': self.user['chat_id'], 'parse_mode': 'Markdown'} body = {'text': text, 'chat_id': self.user['chat_id']}
if reply_markup: if reply_markup:
body['reply_markup'] = reply_markup.to_json() body['reply_markup'] = reply_markup.to_json()
queues.set_task('botalka_mailbox', {'project': 'ruz-bot', 'name': 'telegram-bot', 'body': body}, 5) queues.set_task('ruz_bot_mailbox', body, 1)
def set_state(self, state: str): def set_state(self, state: str):
self.user['state'] = state self.user['state'] = state

View File

@@ -9,7 +9,7 @@ from helpers import now
class Mongo: class Mongo:
def __init__(self): def __init__(self):
url = f"mongodb://{settings.MONGO_USER}:{settings.MONGO_PASSWORD}@mongo:27017/" url = f"mongodb://{settings.MONGO_USER}:{settings.MONGO_PASSWORD}@{settings.MONGO_HOST}:27017/"
self.client = pymongo.MongoClient(url) self.client = pymongo.MongoClient(url)
self.database = self.client.get_database("ruz-bot") self.database = self.client.get_database("ruz-bot")
self.users_collection.create_index([ self.users_collection.create_index([

View File

@@ -12,8 +12,7 @@ fields = [
'date_start', 'date_start',
'date_end', 'date_end',
'lecturer_profiles', 'lecturer_profiles',
'stream_links', 'stream_links'
'type',
] ]

View File

@@ -15,8 +15,11 @@ class PlatformClient:
self.stage = stage self.stage = stage
self.configs = configs self.configs = configs
self.experiments = experiments self.experiments = experiments
self.endpoint = 'http://configurator/' self.endpoint = 'https://platform.sprinthub.ru/'
self.fetch_url = urllib.parse.urljoin(self.endpoint, '/api/v1/fetch') self.configs_url = urllib.parse.urljoin(self.endpoint, 'configs/get')
self.experiments_url = urllib.parse.urljoin(self.endpoint, 'experiments/get')
self.staff_url = urllib.parse.urljoin(self.endpoint, 'is_staff')
self.fetch_url = urllib.parse.urljoin(self.endpoint, 'fetch')
self.config_storage = {} self.config_storage = {}
self.experiment_storage = {} self.experiment_storage = {}
self.staff_storage = {} self.staff_storage = {}
@@ -41,6 +44,7 @@ class PlatformClient:
try: try:
response = get( response = get(
url, url,
headers={'X-Security-Token': self.platform_security_token},
params=params params=params
) )
if response.status_code == 200: if response.status_code == 200:

View File

@@ -1,17 +1,18 @@
from concurrent.futures import ThreadPoolExecutor import json
import datetime
import os import os
import traceback
import zoneinfo
import requests import requests
import time import time
stage = os.getenv("STAGE", 'local') stage = os.getenv("STAGE", 'local')
if stage == 'local': if stage == 'development':
QUEUES_URL = 'http://localhost:1239' QUEUES_URL = 'https://queues.develop.sprinthub.ru'
elif stage == 'production':
QUEUES_URL = 'https://queues.sprinthub.ru'
else: else:
QUEUES_URL = 'http://queues:1239' QUEUES_URL = None
token = os.getenv('QUEUES_TOKEN')
class QueuesException(Exception): class QueuesException(Exception):
@@ -19,55 +20,31 @@ class QueuesException(Exception):
class TasksHandlerMixin: class TasksHandlerMixin:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.executor = ThreadPoolExecutor(max_workers=1)
def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool):
def send():
requests.post(f'{QUEUES_URL}/api/v1/metric', json={
'service': 'ruz-bot',
'queue': self.queue_name,
'success': success,
'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z",
"success": success,
"execution_time_ms": (end - start).microseconds // 1000,
"environment": stage,
})
self.executor.submit(send)
def poll(self): def poll(self):
while True: while True:
if QUEUES_URL is None:
data = {'payload': json.loads(input('Input message: '))}
else:
response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name, 'X-Queues-Token': token})
if response.status_code == 404:
time.sleep(0.2)
continue
if response.status_code == 403:
raise NotImplemented('QUEUE_TOKEN is incorrect')
data = response.json()
try: try:
response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json() self.process(data['payload'])
except requests.JSONDecodeError:
print('Unable to decode json')
time.sleep(3)
continue
task = response.get('task')
if not task:
time.sleep(0.2)
continue
start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow"))
try:
print(f'process task with id {task["id"]}, attempt {task["attempt"]}')
self.process(task['payload'])
success = True
except Exception as exc: except Exception as exc:
print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') print(f'Error processing message id={data["id"]}, payload={data["payload"]}, exc={exc}')
traceback.print_exc() continue
success = False if QUEUES_URL is None:
end = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) continue
if success: try:
try: resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': data['id']}, headers={'X-Queues-Token': token})
resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) if resp.status_code != 202:
if resp.status_code != 202: raise QueuesException
raise QueuesException except:
print(f'finish task with id {task["id"]}') print(f'Failed to finish task id={data["id"]}')
except:
print(f'Failed to finish task id={task["id"]}')
self._send_metric(start, end, success)
@property @property
def queue_name(self): def queue_name(self):
@@ -78,10 +55,10 @@ class TasksHandlerMixin:
def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None):
resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name}, json={ resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name, 'X-Queues-Token': token}, json={
'payload': payload, 'payload': payload,
'seconds_to_execute': seconds_to_execute, 'seconds_to_execute': seconds_to_execute,
'delay': delay, 'delay': delay,
}) })
if resp.status_code != 202: if resp.status_code != 202:
raise QueuesException raise QueuesException