Files
pizda-bot/daemons/worker.py
emmatveev 47d3e1b044
All checks were successful
Deploy Prod / Build (pull_request) Successful in 4s
Deploy Prod / Push (pull_request) Successful in 8s
Deploy Prod / Deploy prod (pull_request) Successful in 11s
Update daemons/worker.py
2025-12-28 01:21:49 +03:00

216 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import datetime
import os
from random import randrange, choice
from daemons import base
from utils import queues
from telebot.types import Message
import json
from utils.mongo import mongo
from utils.sprint_platform import PlatformClient
from utils.storage import set_values, get_chat_info
security_token = os.getenv("PLATFORM_SECURITY_TOKEN")
stage = os.getenv("STAGE", 'local')
client = PlatformClient(
security_token,
'Pizda Bot',
stage,
['constants', 'answers', 'replies'],
[],
need_poll=True,
)
all_letters = "йцукенгшщзхъёфывапролджэячсмитьбюЙЦУКЕНГШЩЗХЪЁФЫВАПРОЛДЖЭЯЧСМИТЬБЮQWERTYUIOPASDFGHJKLZXCVBNMqwertyuiopasdfghjklzxcvbnm1234567890 "
def get_self_name():
return 'answer_da_bot'
def get_answers():
return {
"200": "Отсоси на месте!",
"300": "Отсоси у тракториста!",
"a": "Хуй на!",
"da": "Pizda!",
"а": "Хуй на!",
"я": "Головка от хуя!",
"dа": "Pizda!",
"gde": "V pizde!",
"net": "Pidora otvet!",
"yes": "Hues!",
"дa": "Пизда!",
"чe": "Хуй через плечо!",
"чo": "Хуй через плечо!",
"gdе": "V pizde!",
"heт": "Пидора ответ!",
"nеt": "Pidora otvet!",
"да": "Пизда!",
"ды": "Получишь пизды!",
"ес": "Хуес!",
"ты": "Попой нюхаешь цветы!",
"че": "Хуй через плечо!",
"чо": "Хуй через плечо!",
"чё": "Хуй через плечо!",
"hет": "Пидора ответ!",
"гдe": "В пизде!",
"нeт": "Пидора ответ!",
"ага": "В жопе нога!",
"ало": "Хуем по лбу не дало?",
"бот": "Я ебал твой рот!",
"вот": "Хуй те в рот!",
"где": "В пизде!",
"как": "Жопой об косяк!",
"кто": "Конь в пальто!",
"мне": "Хуем по губе!",
"нет": "Пидора ответ!",
"ога": "В жопе нога!",
"опа": "Срослась пизда и жопа!",
"три": "Жопу подотри!",
"угу": "Иди в пизду!",
"хех": "В жопе орех!",
"алло": "Хуем по лбу не дало?",
"неть": "Хуеть!",
"слыш": "За углом поссышь!",
"когда": "Иди на!",
"ладно": "В трусах прохладно!",
"слышь": "За углом поссышь!",
"двести": "Отсоси на месте!",
"делать": "Хуй ко лбу приделать!",
"именно": "Хуименно!",
"триста": "Тебе нужна помощь специалиста!",
"здрасте": "Пизду покрасьте!",
"грустный": "Хуй сосал невкусный!",
"здрасьте": "Пизду покрасьте!",
"правильно": "Хуявильно!",
"случилась": "Рубаха в жопу засучилась!",
"случилось": "Рубаха в жопу засучилась!"
}
def get_ignored_users():
return []
def get_replies():
return client.get_config('replies')
class Daemon(base.Daemon, queues.TasksHandlerMixin):
@property
def queue_name(self):
return 'pizda_bot_worker'
def execute(self):
self.poll()
def reply(self, text: str, chat_id: int, message_id: int):
queues.set_task(
'botalka_mailbox',
{
'project': 'pizda-bot',
'name': 'telegram-bot',
'body': {
'text': text,
'reply_to_message_id': message_id,
'chat_id': chat_id,
}
},
1,
)
def send(self, text: str, chat_id: int):
queues.set_task(
'botalka_mailbox',
{
'project': 'pizda-bot',
'name': 'telegram-bot',
'body': {
'text': text,
'chat_id': chat_id,
}
},
1,
)
def process_command(self, message: Message):
if message.text.startswith('/pointers'):
rating = list(mongo.counter_collection.find({"chat_id": message.chat.id, "points": {"$exists": True}}).sort("points", -1))
if not rating:
self.send('Ебаллы пока никому не начислялись', message.chat.id)
return
text = "Рейтинг Ебальников:\n"
rating_arr = list(enumerate(rating))
total = 0
for _, value in rating_arr:
total += value['points']
for index, value in rating_arr:
text += f"{index + 1}. @{value['username']} - {value['points']}\n"
self.send(text, message.chat.id)
elif message.text.startswith('/point'):
if not message.reply_to_message:
self.reply('Чтобы начислить Ебаллы, нужно прописать команду ответом на чье-то сообщение', message.chat.id, message.message_id)
return
username = message.reply_to_message.from_user.username
if username == get_self_name():
self.reply('Ты конченый? Как я себе Ебаллы то начислю, мудила? Мамке своей Ебаллы начисляй, пидор!', message.chat.id, message.message_id)
return
mongo.inc_points(username, message.chat.id)
self.reply('Ты конченый? Как я себе Ебаллы то начислю, мудила? Мамке своей Ебаллы начисляй, пидор!', message.chat.id, message.reply_to_message.message_id)
elif message.text.startswith('/rating'):
rating = list(mongo.counter_collection.find({"chat_id": message.chat.id}).sort("count", -1))
if not rating:
self.send('В этом чате я пока никому не парировал', message.chat.id)
return
text = "Вот кому я парировал:\n"
rating_arr = list(enumerate(rating))
total = 0
for _, value in rating_arr:
total += value['count']
for index, value in rating_arr:
text += f"{index + 1}. @{value['username']} - {value['count']} ({int(value['count'] / total * 100)}%)\n"
self.send(text, message.chat.id)
elif message.text.startswith('/setprobability'):
self.send('Отправь одно число - вероятность парирования', message.chat.id)
set_values(message.chat.id, state="set_probability")
def process(self, payload):
message: Message = Message.de_json(json.dumps(payload))
if not message.text:
return
if message.text.startswith('/'):
self.process_command(message)
return
if message.reply_to_message:
if message.reply_to_message.from_user.username == get_self_name():
self.reply(choice(get_replies()), message.chat.id, message.message_id)
return
info = get_chat_info(message.chat.id)
if message.text == '#debug' and client.is_staff(telegram_id=message.from_user.id):
self.reply(f'chat id: {message.chat.id}\nprobability: {info["probability"]}', message.chat.id, message.message_id)
return
if info['state'] == "set_probability":
try:
value = int(message.text)
if value < 0 or value > 100:
self.reply('Число не попадает в диапозон от 0 до 100!', message.chat.id, message.message_id)
else:
set_values(message.chat.id, probability=value, state="default")
self.reply('Ок! Установил', message.chat.id, message.message_id)
except ValueError:
self.reply('Это не число!', message.chat.id, message.message_id)
return
convert_text = ''.join([letter for letter in message.text if letter in all_letters]).lower().split()
if len(convert_text) > 0:
convert_text = convert_text[-1]
else:
return
ans = get_answers().get(convert_text)
if ans is not None and randrange(1, 101) <= info["probability"] and message.from_user.id not in get_ignored_users():
self.reply(ans, message.chat.id, message.message_id)
mongo.inc(message.from_user.username, message.chat.id)