Compare commits

..

22 Commits

Author SHA1 Message Date
e02646a1a2 Merge pull request 'fix' (#26) from master into dev
Reviewed-on: #26
2024-12-02 21:57:09 +03:00
1b7e8686e4 Merge pull request 'fix' (#24) from master into dev
Reviewed-on: #24
2024-11-30 13:30:19 +03:00
cf12a0dca8 Merge pull request 'types' (#23) from master into dev
Reviewed-on: #23
2024-11-30 12:59:01 +03:00
0a97e56539 Merge pull request 'fix' (#21) from master into dev
Reviewed-on: #21
2024-11-29 20:31:45 +03:00
a5d3bd08c1 Merge pull request 'fix' (#20) from master into dev
Reviewed-on: #20
2024-11-29 20:25:38 +03:00
ad74b8de7a Merge pull request 'fix' (#18) from master into dev
Reviewed-on: #18
2024-11-28 23:09:59 +03:00
05d9cdc7b1 Merge pull request 'fix' (#17) from master into dev
Reviewed-on: #17
2024-11-28 22:58:47 +03:00
a209246513 Merge pull request 'fix' (#15) from master into dev
Reviewed-on: #15
2024-11-27 16:18:17 +03:00
0922b5a4a4 Merge pull request 'fix' (#14) from master into dev
Reviewed-on: #14
2024-11-27 16:14:35 +03:00
2ee74e70ac Merge pull request 'fix' (#13) from master into dev
Reviewed-on: #13
2024-11-27 14:46:14 +03:00
da93092232 Merge pull request 'fix' (#12) from master into dev
Reviewed-on: #12
2024-11-27 11:20:53 +03:00
60b933496f Merge pull request 'fix' (#11) from master into dev
Reviewed-on: #11
2024-11-27 11:15:30 +03:00
82b99ae803 Merge pull request 'fix' (#10) from master into dev
Reviewed-on: #10
2024-11-27 11:12:37 +03:00
c8f65a0ebb Merge pull request 'fix' (#9) from master into dev
Reviewed-on: #9
2024-11-27 11:10:47 +03:00
6401a40f11 Merge pull request 'fix' (#8) from master into dev
Reviewed-on: #8
2024-11-27 04:32:35 +03:00
32197fd699 Merge pull request 'fix' (#7) from master into dev
Reviewed-on: #7
2024-11-27 04:26:41 +03:00
e69ee8767a Merge pull request 'fix' (#6) from master into dev
Reviewed-on: #6
2024-11-27 04:19:26 +03:00
54f7581657 Merge pull request 'fix' (#5) from master into dev
Reviewed-on: #5
2024-11-27 04:14:58 +03:00
c6a2710087 Merge pull request 'fix' (#4) from master into dev
Reviewed-on: #4
2024-11-27 04:11:15 +03:00
42fc5552ab Merge pull request 'fix' (#3) from master into dev
Reviewed-on: #3
2024-11-27 04:09:16 +03:00
499eed49e0 Merge pull request 'fix' (#2) from master into dev
Reviewed-on: #2
2024-11-27 04:07:49 +03:00
349df7eb17 Merge pull request 'req' (#1) from master into dev
Reviewed-on: #1
2024-11-27 04:06:16 +03:00
8 changed files with 82 additions and 67 deletions

View File

@@ -22,6 +22,7 @@ services:
networks: networks:
- configurator - configurator
- queues-development - queues-development
- locks-development
environment: environment:
STAGE: "development" STAGE: "development"
command: mailbox command: mailbox
@@ -38,3 +39,5 @@ networks:
external: true external: true
queues-development: queues-development:
external: true external: true
locks-development:
external: true

View File

@@ -22,6 +22,7 @@ services:
networks: networks:
- configurator - configurator
- queues - queues
- locks
environment: environment:
STAGE: "production" STAGE: "production"
command: mailbox command: mailbox
@@ -38,3 +39,5 @@ networks:
external: true external: true
queues: queues:
external: true external: true
locks:
external: true

3
.gitignore vendored
View File

@@ -119,6 +119,3 @@ GitHub.sublime-settings
.history .history
local_platform.json local_platform.json
*pb2*
schemas

View File

@@ -5,6 +5,7 @@ from telebot import apihelper
from daemons import base from daemons import base
from utils import platform from utils import platform
from utils import queues from utils import queues
from utils import locks
class Message(pydantic.BaseModel): class Message(pydantic.BaseModel):
@@ -22,6 +23,12 @@ class Daemon(base.Daemon, queues.TasksHandlerMixin):
def queue_name(self): def queue_name(self):
return 'botalka_mailbox' return 'botalka_mailbox'
def before_execute(self, task: dict):
locks.acquire(task['id'])
def after_execute(self, task: dict):
locks.release(task['id'])
def process(self, payload: dict): def process(self, payload: dict):
message = Message.model_validate(payload) message = Message.model_validate(payload)
bot = platform.platform_client.get_config('bots')[message.project][message.name] bot = platform.platform_client.get_config('bots')[message.project][message.name]

View File

@@ -1,7 +1,6 @@
import telebot import telebot
import multiprocessing import threading
import time import time
import json
from daemons import base from daemons import base
from utils import platform from utils import platform
@@ -10,37 +9,45 @@ from utils import queues
class Daemon(base.Daemon): class Daemon(base.Daemon):
def __init__(self): def __init__(self):
self.processes: dict[str, multiprocessing.Process|None] = {} self.telegram_bots: dict[str, dict[str, telebot.TeleBot|None]] = {}
self.threads: dict[str, dict[str, threading.Thread|None]] = {}
def execute(self): def execute(self):
while True: while True:
bots = platform.platform_client.get_config('bots') bots = platform.platform_client.get_config('bots')
for project_name, project in bots.items(): for project_name, project in bots.items():
if project_name not in self.telegram_bots:
self.telegram_bots[project_name] = {}
self.threads[project_name] = {}
for bot_name, bot_info in project.items(): for bot_name, bot_info in project.items():
key = f'{project_name}_{bot_name}' if bot_name not in self.telegram_bots[project_name]:
proc = self.processes.get(key) self.telegram_bots[project_name][bot_name] = None
self.threads[project_name][bot_name] = None
bot = self.telegram_bots[project_name][bot_name]
if bot_info.get('poll_enabled'): if bot_info.get('poll_enabled'):
if proc and proc.is_alive(): if bot is not None and self.threads[project_name][bot_name].is_alive():
print(f'process for {project_name} {bot_name} is alive') print(f'process for {project_name} {bot_name} is alive')
continue continue
print(f'starting process for {project_name} {bot_name}') print(f'starting process for {project_name} {bot_name}')
process = multiprocessing.Process(target=self.start_polling, args=(bot_info['secrets']['telegram_token'], bot_info['queue'])) bot = telebot.TeleBot(bot_info['secrets']['telegram_token'])
process.start() thread = self.start_polling(bot, bot_info['queue'])
self.processes[key] = process self.telegram_bots[project_name][bot_name] = bot
self.threads[project_name][bot_name] = thread
print(f'started process for {project_name} {bot_name}') print(f'started process for {project_name} {bot_name}')
else: else:
if proc is None: if bot is None:
print(f'process for {project_name} {bot_name} is not alive') print(f'process for {project_name} {bot_name} is not alive')
continue continue
print(f'terminating process for {project_name} {bot_name}') print(f'terminating process for {project_name} {bot_name}')
proc.terminate() bot.stop_bot()
self.processes[key] = None self.telegram_bots[project_name][bot_name] = None
print(f'terminated process for {project_name} {bot_name}') print(f'terminated process for {project_name} {bot_name}')
time.sleep(10) time.sleep(10)
def start_polling(self, token: str, queue: str): def start_polling(self, bot: telebot.TeleBot, queue: str) -> threading.Thread:
bot = telebot.TeleBot(token)
@bot.message_handler(content_types=['audio', 'photo', 'voice', 'video', 'document', 'animation', 'text', 'location', 'contact', 'sticker', 'video_note']) @bot.message_handler(content_types=['audio', 'photo', 'voice', 'video', 'document', 'animation', 'text', 'location', 'contact', 'sticker', 'video_note'])
def do_action(message: telebot.types.Message): def do_action(message: telebot.types.Message):
queues.set_task(queue, message.json, 1) queues.set_task(queue, message.json, 1)
bot.polling() thread = threading.Thread(target=bot.polling)
thread.start()
return thread

View File

@@ -1,10 +1,10 @@
annotated-types==0.7.0 annotated-types==0.7.0
certifi==2024.12.14 certifi==2024.8.30
charset-normalizer==3.4.0 charset-normalizer==3.4.0
idna==3.10 idna==3.10
pydantic==2.10.4 pydantic==2.10.2
pydantic_core==2.27.2 pydantic_core==2.27.1
pyTelegramBotAPI==4.1.1 pyTelegramBotAPI==4.1.1
requests==2.32.3 requests==2.32.3
typing_extensions==4.12.2 typing_extensions==4.12.2
urllib3==2.3.0 urllib3==2.2.3

27
utils/locks.py Normal file
View File

@@ -0,0 +1,27 @@
import requests
LOCKS_URL = 'http://locks'
class Conflict(Exception):
pass
def acquire(name: str, ttl: int = 1):
resp = requests.post(f'{LOCKS_URL}/api/v1/acquire', json={
'name': name,
'ttl': ttl,
})
if resp.status_code == 409:
raise Conflict
if resp.status_code != 202:
raise Exception
def release(name: str):
resp = requests.post(f'{LOCKS_URL}/api/v1/release', json={
'name': name,
})
if resp.status_code != 202:
raise Exception

View File

@@ -1,10 +1,4 @@
from concurrent.futures import ThreadPoolExecutor
import datetime
import json
import os import os
import traceback
import uuid
import zoneinfo
import requests import requests
import time import time
@@ -21,55 +15,32 @@ class QueuesException(Exception):
class TasksHandlerMixin: class TasksHandlerMixin:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.executor = ThreadPoolExecutor(max_workers=1)
def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool):
def send():
requests.post(f'{QUEUES_URL}/api/v1/metric', json={
'service': 'botalka',
'queue': self.queue_name,
'success': success,
'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z",
"success": success,
"execution_time_ms": (end - start).microseconds // 1000,
"environment": stage,
})
self.executor.submit(send)
def poll(self): def poll(self):
while True: while True:
try: response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json()
response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json()
except requests.JSONDecodeError:
print('Unable to decode json')
time.sleep(3)
continue
task = response.get('task') task = response.get('task')
if not task: if not task:
time.sleep(0.2) time.sleep(0.2)
continue continue
start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow"))
try: try:
print(f'process task with id {task["id"]}, attempt {task["attempt"]}') self.before_execute(task)
self.process(task['payload']) self.process(task['payload'])
success = True
except Exception as exc: except Exception as exc:
print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}')
traceback.print_stack() continue
success = False try:
end = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']})
if success: if resp.status_code != 202:
try: raise QueuesException
resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) self.after_execute(task)
if resp.status_code != 202: except:
raise QueuesException print(f'Failed to finish task id={task["id"]}')
print(f'finish task with id {task["id"]}')
except: def before_execute(self, task: dict):
print(f'Failed to finish task id={task["id"]}') pass
self._send_metric(start, end, success)
def after_execute(self, task: dict):
pass
@property @property
def queue_name(self): def queue_name(self):