Compare commits

..

9 Commits

Author SHA1 Message Date
e4915c126d Merge pull request 'redis' (#12) from master into dev
Reviewed-on: #12
2024-11-25 00:27:58 +03:00
db1c1a7140 Merge pull request 'master' (#11) from master into dev
Reviewed-on: #11
2024-11-23 22:24:10 +03:00
185bfd93a2 Merge pull request 'master' (#8) from master into dev
Reviewed-on: #8
2024-11-22 01:01:01 +03:00
b198715033 Merge pull request 'fix' (#6) from master into dev
Reviewed-on: #6
2024-11-17 12:09:17 +03:00
e183c184ac Merge pull request 'up' (#5) from master into dev
Reviewed-on: #5
2024-11-17 02:37:03 +03:00
2eba8e91e7 Merge pull request 'up' (#4) from master into dev
Reviewed-on: #4
2024-11-17 02:35:32 +03:00
9adc0d77ce Merge pull request 'master' (#3) from master into dev
Reviewed-on: #3
2024-11-17 02:33:09 +03:00
9e14ce2739 Merge pull request 'up' (#2) from master into dev
Reviewed-on: #2
2024-11-17 02:27:44 +03:00
474f9fd74a Merge pull request 'up' (#1) from master into dev
Reviewed-on: #1
2024-11-17 01:54:47 +03:00
12 changed files with 41 additions and 50 deletions

BIN
.DS_Store vendored

Binary file not shown.

View File

@@ -7,9 +7,10 @@ services:
networks: networks:
- queues-development - queues-development
environment: environment:
MONGO_HOST: "mongo.dev.chocomarsh.com" MONGO_HOST: "mongo.develop.sprinthub.ru"
MONGO_PASSWORD: $MONGO_PASSWORD_DEV MONGO_PASSWORD: $MONGO_PASSWORD_DEV
STAGE: "development" REDIS_HOST: "redis.develop.sprinthub.ru"
REDIS_PASSWORD: $REDIS_PASSWORD_DEV
deploy: deploy:
mode: replicated mode: replicated
restart_policy: restart_policy:

View File

@@ -7,9 +7,10 @@ services:
networks: networks:
- queues - queues
environment: environment:
MONGO_HOST: "mongo.chocomarsh.com" MONGO_HOST: "mongo.sprinthub.ru"
MONGO_PASSWORD: $MONGO_PASSWORD_PROD MONGO_PASSWORD: $MONGO_PASSWORD_PROD
STAGE: "production" REDIS_HOST: "redis.sprinthub.ru"
REDIS_PASSWORD: $REDIS_PASSWORD_PROD
deploy: deploy:
mode: replicated mode: replicated
restart_policy: restart_policy:

View File

@@ -40,4 +40,5 @@ jobs:
- name: deploy - name: deploy
env: env:
MONGO_PASSWORD_DEV: ${{ secrets.MONGO_PASSWORD_DEV }} MONGO_PASSWORD_DEV: ${{ secrets.MONGO_PASSWORD_DEV }}
REDIS_PASSWORD_DEV: ${{ secrets.REDIS_PASSWORD_DEV }}
run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml infra-development run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml infra-development

View File

@@ -40,4 +40,5 @@ jobs:
- name: deploy - name: deploy
env: env:
MONGO_PASSWORD_PROD: ${{ secrets.MONGO_PASSWORD_PROD }} MONGO_PASSWORD_PROD: ${{ secrets.MONGO_PASSWORD_PROD }}
REDIS_PASSWORD_PROD: ${{ secrets.REDIS_PASSWORD_PROD }}
run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-prod.yaml infra run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-prod.yaml infra

View File

@@ -8,6 +8,4 @@ RUN pip install -r requirements.txt
COPY . . COPY . .
ENV PYTHONUNBUFFERED=1
ENTRYPOINT ["python", "main.py"] ENTRYPOINT ["python", "main.py"]

View File

@@ -1,30 +1,24 @@
import asyncio
import collections
import fastapi import fastapi
import pydantic import pydantic
import typing import typing
from app.storage.mongo import tasks from app.storage.mongo import tasks
from app.storage.redis import lock
locks = collections.defaultdict(asyncio.Lock)
router = fastapi.APIRouter() router = fastapi.APIRouter()
class Task(pydantic.BaseModel): class Response(pydantic.BaseModel):
id: str id: str
attempt: int attempt: int
payload: dict payload: dict
class Response(pydantic.BaseModel):
task: Task|None
@router.get('/api/v1/take', responses={404: {'description': 'Not found'}}) @router.get('/api/v1/take', responses={404: {'description': 'Not found'}})
async def execute(queue: typing.Annotated[str, fastapi.Header()]) -> Response: async def execute(queue: typing.Annotated[str, fastapi.Header()]) -> Response:
async with locks[queue]: async with lock.acquire(queue):
task = await tasks.take_task(queue) task = await tasks.take_task(queue)
if not task: if not task:
return Response(task=None) raise fastapi.HTTPException(404)
return Response(task=Task(id=str(task._id), attempt=task.attempts, payload=task.payload)) return Response(id=str(task._id), attempt=task.attempts, payload=task.payload)

View File

@@ -1,6 +1,8 @@
import asyncio
import bson import bson
import datetime import datetime
import pydantic import pydantic
import typing
from app.storage.mongo import database from app.storage.mongo import database
from app.utils import time from app.utils import time
@@ -26,7 +28,7 @@ async def add_task(task: Task) -> str:
return result.inserted_id return result.inserted_id
async def take_task(queue: str) -> Task|None: async def take_task(queue: str) -> typing.Optional[Task]:
now = time.now() now = time.now()
async for raw_task in collection.find({'queue': queue, 'available_from': {'$lte': now}}): async for raw_task in collection.find({'queue': queue, 'available_from': {'$lte': now}}):
task = Task.model_validate(raw_task) task = Task.model_validate(raw_task)

View File

@@ -0,0 +1,15 @@
import os
import redis.asyncio
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD')
if REDIS_PASSWORD:
URL = f'redis://:{REDIS_PASSWORD}@{REDIS_HOST}:6379'
else:
URL = f'redis://{REDIS_HOST}:6379'
pool = redis.asyncio.ConnectionPool.from_url(URL)
database = redis.Redis.from_pool(pool)

View File

@@ -0,0 +1,8 @@
import contextlib
from app.storage import redis
@contextlib.contextmanager
def acquire(lock_name: str):
return redis.database.lock(lock_name)

10
main.py
View File

@@ -8,20 +8,14 @@ from app.routers import finish
from app.storage import mongo from app.storage import mongo
app = fastapi.FastAPI(debug=True) app = fastapi.FastAPI()
app.include_router(take.router) app.include_router(take.router)
app.include_router(put.router) app.include_router(put.router)
app.include_router(finish.router) app.include_router(finish.router)
@app.exception_handler(Exception)
async def unicorn_exception_handler(request: fastapi.Request, exc: Exception):
return fastapi.JSONResponse(
status_code=500,
content={"message": f"Oops! {exc.name} did something. There goes a rainbow... {exc}"},
)
mongo.create_indexes() mongo.create_indexes()
if __name__ == '__main__': if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=1239) uvicorn.run(app, host="0.0.0.0", port=1239)

View File

@@ -1,44 +1,20 @@
annotated-types==0.7.0 annotated-types==0.7.0
anyio==4.6.2.post1 anyio==4.6.2.post1
APScheduler==3.10.4 APScheduler==3.10.4
certifi==2024.12.14
charset-normalizer==3.4.1
click==8.1.7 click==8.1.7
dnspython==2.7.0 dnspython==2.7.0
email_validator==2.2.0
fastapi==0.115.4 fastapi==0.115.4
fastapi-cli==0.0.7
h11==0.14.0 h11==0.14.0
httpcore==1.0.7
httptools==0.6.4
httpx==0.28.1
idna==3.10 idna==3.10
Jinja2==3.1.5
markdown-it-py==3.0.0
MarkupSafe==3.0.2
mdurl==0.1.2
motor==3.6.0 motor==3.6.0
pydantic==2.9.2 pydantic==2.9.2
pydantic_core==2.23.4 pydantic_core==2.23.4
Pygments==2.18.0
pymongo==4.9.2 pymongo==4.9.2
python-dotenv==1.0.1
python-multipart==0.0.20
pytz==2024.2 pytz==2024.2
PyYAML==6.0.2
redis==5.2.0 redis==5.2.0
requests==2.32.3
rich==13.9.4
rich-toolkit==0.12.0
shellingham==1.5.4
six==1.16.0 six==1.16.0
sniffio==1.3.1 sniffio==1.3.1
starlette==0.41.2 starlette==0.41.2
typer==0.15.1
typing_extensions==4.12.2 typing_extensions==4.12.2
tzlocal==5.2 tzlocal==5.2
urllib3==2.3.0
uvicorn==0.32.0 uvicorn==0.32.0
uvloop==0.21.0
watchfiles==1.0.3
websockets==14.1