grpc
All checks were successful
Deploy Dev / Build (pull_request) Successful in 33s
Deploy Dev / Push (pull_request) Successful in 12s
Deploy Dev / Deploy dev (pull_request) Successful in 23s

This commit is contained in:
2024-12-08 21:12:32 +03:00
parent 4b63faabef
commit 5b9c1a18b7
16 changed files with 67 additions and 106 deletions

46
utils/mongo.py Normal file
View File

@@ -0,0 +1,46 @@
import pymongo
import os
MONGO_USER = os.getenv("MONGO_USER", "mongo")
MONGO_PASSWORD = os.getenv("MONGO_PASSWORD", "password")
MONGO_HOST = os.getenv("MONGO_HOST", "localhost")
class Mongo:
def __init__(self):
url = f"mongodb://{MONGO_USER}:{MONGO_PASSWORD}@{MONGO_HOST}:27017/"
self.client = pymongo.MongoClient(url)
self.database = self.client.get_database("pizda-bot")
self.chats_collection.create_index([
("chat_id", 1)
])
self.counter_collection.create_index([
("chat_id", 1),
("username", 1),
])
def __getitem__(self, item):
return self.database.get_collection(item)
@property
def chats_collection(self):
return self["chats"]
@property
def counter_collection(self):
return self["counter"]
def inc(self, username, chat_id):
if self.counter_collection.find_one({"chat_id": chat_id, "username": username}):
self.counter_collection.update_one({"chat_id": chat_id, "username": username}, {"$inc": {"count": 1}})
else:
self.counter_collection.insert_one({"chat_id": chat_id, "username": username, "count": 1})
def inc_points(self, username, chat_id):
if self.counter_collection.find_one({"chat_id": chat_id, "username": username}):
self.counter_collection.update_one({"chat_id": chat_id, "username": username}, {"$inc": {"points": 1}})
else:
self.counter_collection.insert_one({"chat_id": chat_id, "username": username, "points": 1})
mongo = Mongo()

View File

@@ -1,14 +1,8 @@
import json
import os
import requests
import time
from queues import tasks_pb2_grpc
from queues import tasks_pb2
stage = os.getenv("STAGE", 'local')
if stage == 'local':
QUEUES_URL = 'http://localhost:1239'
else:
QUEUES_URL = 'http://queues:1239'
from google.protobuf import json_format
class QueuesException(Exception):
@@ -18,22 +12,21 @@ class QueuesException(Exception):
class TasksHandlerMixin:
def poll(self):
while True:
response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json()
task = response.get('task')
response: tasks_pb2.TakeResponse = self.stub.Take(tasks_pb2.TakeRequest(queue=self.queue_name))
task = response.task
if not task:
time.sleep(0.2)
continue
try:
self.process(task['payload'])
payload = json_format.MessageToDict(task.payload)
self.process(payload)
except Exception as exc:
print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}')
print(f'Error processing message id={task.id}, payload={payload}, exc={exc}')
continue
try:
resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']})
if resp.status_code != 202:
raise QueuesException
self.stub.Finish(tasks_pb2.FinishRequest(id=task.id))
except:
print(f'Failed to finish task id={task["id"]}')
print(f'Failed to finish task id={task.id}')
@property
def queue_name(self):
@@ -42,12 +35,12 @@ class TasksHandlerMixin:
def process(self, payload):
raise NotImplemented
def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None):
resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name}, json={
'payload': payload,
'seconds_to_execute': seconds_to_execute,
'delay': delay,
})
if resp.status_code != 202:
raise QueuesException
def set_task(stub: tasks_pb2_grpc.TasksStub, queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None):
stub.Put(
tasks_pb2.PutRequest(
queue=queue_name,
seconds_to_execute=seconds_to_execute,
delay=delay,
payload=payload
)
)

114
utils/sprint_platform.py Normal file
View File

@@ -0,0 +1,114 @@
import json
import typing
import urllib.parse
from threading import Thread
from time import sleep
from requests import get
class PlatformClient:
def __init__(self, platform_security_token: str, app_name: str, stage: str, configs: typing.List[str], experiments: typing.List[str], need_poll: bool = True):
self.platform_security_token = platform_security_token
self.app_name = app_name
self.stage = stage
self.configs = configs
self.experiments = experiments
self.endpoint = 'http://configurator/'
self.fetch_url = urllib.parse.urljoin(self.endpoint, '/api/v1/fetch')
self.config_storage = {}
self.experiment_storage = {}
self.staff_storage = {}
self.poll_data()
if need_poll:
self.poll_data_in_thread()
def poll_data_in_thread(self):
def inner():
while True:
sleep(30)
self.fetch()
Thread(target=inner, daemon=True).start()
def poll_data(self):
self.fetch(with_exception=True)
def request_with_retries(self, url, params, with_exception=False, retries_count=3):
exception_to_throw = None
for _ in range(retries_count):
try:
response = get(
url,
params=params
)
if response.status_code == 200:
return response.json()
print(f'Failed to request {url}, status_code={response.status_code}')
exception_to_throw = Exception('Not 200 status')
except Exception as exc:
print(exc)
exception_to_throw = exc
sleep(1)
print(f'Failed fetching with retries: {url}, {params}')
if with_exception:
raise exception_to_throw
def fetch(self, with_exception=False):
if self.stage == 'local':
local_platform = json.loads(open('local_platform.json', 'r').read())
self.config_storage = local_platform['configs']
self.experiment_storage = local_platform['experiments']
self.staff_storage = {
key: set(value)
for key, value in local_platform['platform_staff'].items()
}
return
response_data = self.request_with_retries(self.fetch_url, {
'project': self.app_name,
'stage': self.stage,
}, with_exception)
self.config_storage = response_data['configs']
self.experiment_storage = response_data['experiments']
self.staff_storage = {
key: set(value)
for key, value in response_data['platform_staff'].items()
}
def fetch_configs(self, with_exception=False):
if self.stage == 'local':
local_platform = json.loads(open('local_platform.json', 'r').read())
self.config_storage = local_platform['configs']
return
for config in self.configs:
response_data = self.request_with_retries(self.configs_url, {
'project': self.app_name,
'stage': self.stage,
'name': config
}, with_exception)
self.config_storage[config] = response_data
def fetch_experiments(self, with_exception=False):
if self.stage == 'local':
local_platform = json.loads(open('local_platform.json', 'r').read())
self.experiment_storage = local_platform['experiments']
return
for experiment in self.experiments:
response_data = self.request_with_retries(self.experiments_url, {
'project': self.app_name,
'stage': self.stage,
'name': experiment
}, with_exception)
self.experiment_storage[experiment] = response_data
def is_staff(self, **kwargs):
for key, value in kwargs.items():
if value in self.staff_storage[key]:
return True
return False
def get_config(self, name):
return self.config_storage[name]
def get_experiment(self, name):
return self.experiment_storage[name]

42
utils/storage.py Normal file
View File

@@ -0,0 +1,42 @@
from cachetools import TTLCache
import os
from utils.mongo import mongo
CACHE_SIZE = int(os.getenv("CACHE_SIZE", 1000))
CACHE_TTL = int(os.getenv("CACHE_TTL", 3600))
cache = TTLCache(CACHE_SIZE, CACHE_TTL)
def get_chat_info(chat_id: int) -> dict:
cached_info = cache.get(chat_id)
if cached_info is not None:
return cached_info
mongo_info = mongo.chats_collection.find_one({"chat_id": chat_id})
if mongo_info is not None:
cache[chat_id] = mongo_info
return mongo_info
chat_info = {"chat_id": chat_id, "state": "default", "probability": 100}
mongo.chats_collection.insert_one(chat_info)
cache[chat_id] = chat_info
return chat_info
def set_values(chat_id: int, **values):
cached_info = cache.get(chat_id)
if cached_info is None:
mongo_info = mongo.chats_collection.find_one({"chat_id": chat_id})
if mongo_info is None:
chat_info = {"chat_id": chat_id, "state": "default", "probability": 100}
chat_info.update(values)
mongo.chats_collection.insert_one(chat_info)
cache[chat_id] = chat_info
else:
mongo.chats_collection.update_one({"chat_id": chat_id}, {"$set": values})
mongo_info = dict(mongo_info)
mongo_info.update(values)
cache[chat_id] = mongo_info
else:
cached_info.update(values)
mongo.chats_collection.update_one({"chat_id": chat_id}, {"$set": values})