commit c2659fb49c8bbb9bac8ce726b89693c1351fb1a9 Author: Egor Matveev Date: Tue Dec 31 00:53:40 2024 +0300 initial diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml new file mode 100644 index 0000000..302ba1d --- /dev/null +++ b/.deploy/deploy-dev.yaml @@ -0,0 +1,23 @@ +version: "3.4" + + +services: + queues: + image: mathwave/sprint-repo:queues + networks: + - queues-development + environment: + MONGO_HOST: "mongo.develop.sprinthub.ru" + MONGO_PASSWORD: $MONGO_PASSWORD_DEV + STAGE: "development" + deploy: + mode: replicated + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + +networks: + queues-development: + external: true diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml new file mode 100644 index 0000000..187e3db --- /dev/null +++ b/.deploy/deploy-prod.yaml @@ -0,0 +1,23 @@ +version: "3.4" + + +services: + queues: + image: mathwave/sprint-repo:queues + networks: + - queues + environment: + MONGO_HOST: "mongo.sprinthub.ru" + MONGO_PASSWORD: $MONGO_PASSWORD_PROD + STAGE: "production" + deploy: + mode: replicated + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + +networks: + queues: + external: true diff --git a/.gitea/workflows/deploy-dev.yaml b/.gitea/workflows/deploy-dev.yaml new file mode 100644 index 0000000..3efe3ac --- /dev/null +++ b/.gitea/workflows/deploy-dev.yaml @@ -0,0 +1,43 @@ +name: Deploy Dev + +on: + pull_request: + branches: + - dev + types: [closed] + +jobs: + build: + name: Build + runs-on: [ dev ] + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: dev + - name: build + run: docker build -t mathwave/sprint-repo:queues . + push: + name: Push + runs-on: [ dev ] + needs: build + steps: + - name: push + run: docker push mathwave/sprint-repo:queues + deploy-dev: + name: Deploy dev + runs-on: [prod] + needs: push + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: dev + - name: deploy + env: + MONGO_PASSWORD_DEV: ${{ secrets.MONGO_PASSWORD_DEV }} + run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml infra-development diff --git a/.gitea/workflows/deploy-prod.yaml b/.gitea/workflows/deploy-prod.yaml new file mode 100644 index 0000000..9a6c82f --- /dev/null +++ b/.gitea/workflows/deploy-prod.yaml @@ -0,0 +1,43 @@ +name: Deploy Prod + +on: + pull_request: + branches: + - prod + types: [closed] + +jobs: + build: + name: Build + runs-on: [ dev ] + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: prod + - name: build + run: docker build -t mathwave/sprint-repo:queues . + push: + name: Push + runs-on: [ dev ] + needs: build + steps: + - name: push + run: docker push mathwave/sprint-repo:queues + deploy-prod: + name: Deploy prod + runs-on: [prod] + needs: push + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: prod + - name: deploy + env: + MONGO_PASSWORD_PROD: ${{ secrets.MONGO_PASSWORD_PROD }} + run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-prod.yaml infra diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..90334fd --- /dev/null +++ b/Dockerfile @@ -0,0 +1,6 @@ +FROM golang:alpine +RUN mkdir /usr/src/app +WORKDIR /usr/src/app +COPY . . +RUN go build +ENTRYPOINT ["./queues-go"] \ No newline at end of file diff --git a/app/routers/finish.go b/app/routers/finish.go new file mode 100644 index 0000000..b471b8b --- /dev/null +++ b/app/routers/finish.go @@ -0,0 +1,29 @@ +package routers + +import ( + "encoding/json" + "net/http" + tasks "queues-go/app/storage/mongo/collections" +) + +type FinishRequestBody struct { + Id string `json:"id"` +} + +func Finish(w http.ResponseWriter, r *http.Request) { + d := json.NewDecoder(r.Body) + body := FinishRequestBody{} + err := d.Decode(&body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + err = tasks.Finish(body.Id) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusAccepted) +} diff --git a/app/routers/put.go b/app/routers/put.go new file mode 100644 index 0000000..7a2759b --- /dev/null +++ b/app/routers/put.go @@ -0,0 +1,46 @@ +package routers + +import ( + "encoding/json" + "net/http" + tasks "queues-go/app/storage/mongo/collections" + "time" +) + +type PutRequestBody struct { + Payload json.RawMessage `json:"payload"` + SecondsToExecute int `json:"seconds_to_execute"` + Delay *int `json:"delay"` +} + +func Put(w http.ResponseWriter, r *http.Request) { + d := json.NewDecoder(r.Body) + body := PutRequestBody{} + err := d.Decode(&body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + queue := r.Header.Get("queue") + var availableFrom time.Time + if body.Delay == nil { + availableFrom = time.Now() + } else { + availableFrom = time.Now().Add(time.Second + time.Duration(*body.Delay)) + } + task := tasks.InsertedTask{ + Queue: queue, + Payload: body.Payload, + PutAt: time.Now(), + AvailableFrom: availableFrom, + SecondsToExecute: body.SecondsToExecute, + TakenAt: nil, + Attempts: 0, + } + err = tasks.Add(task) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusAccepted) +} diff --git a/app/routers/take.go b/app/routers/take.go new file mode 100644 index 0000000..8fa92ea --- /dev/null +++ b/app/routers/take.go @@ -0,0 +1,42 @@ +package routers + +import ( + "encoding/json" + "net/http" + tasks "queues-go/app/storage/mongo/collections" +) + +type TaskResponse struct { + Id string `json:"id"` + Attempt int `json:"attempt"` + Payload interface{} `json:"payload"` +} + +type TakeResponse struct { + Task *TaskResponse `json:"task"` +} + +func Take(w http.ResponseWriter, r *http.Request) { + queue := r.Header.Get("queue") + task, err := tasks.Take(queue) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + var response TakeResponse + if task == nil { + response.Task = nil + } else { + response.Task = &TaskResponse{ + Id: task.Id.Hex(), + Attempt: task.Attempts, + Payload: task.Payload, + } + } + data, err := json.Marshal(response) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Write(data) +} diff --git a/app/storage/mongo/client.go b/app/storage/mongo/client.go new file mode 100644 index 0000000..9c47fe6 --- /dev/null +++ b/app/storage/mongo/client.go @@ -0,0 +1,25 @@ +package client + +import ( + "context" + "fmt" + "queues-go/app/utils" + + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +var Database mongo.Database + +func Connect() { + connectionString := fmt.Sprintf("mongodb://mongo:%s@%s:27017/", utils.GetEnv("MONGO_PASSWORD", "password"), utils.GetEnv("MONGO_HOST", "localhost")) + client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(connectionString)) + if err != nil { + panic(err) + } + db := client.Database("queues") + if db == nil { + panic(fmt.Errorf("DATABASE DOES NOT EXIST")) + } + Database = *db +} diff --git a/app/storage/mongo/collections/tasks.go b/app/storage/mongo/collections/tasks.go new file mode 100644 index 0000000..a479d4c --- /dev/null +++ b/app/storage/mongo/collections/tasks.go @@ -0,0 +1,105 @@ +package tasks + +import ( + "context" + client "queues-go/app/storage/mongo" + errors "queues-go/app/utils" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" +) + +type Task struct { + Queue string `bson:"queue"` + Payload bson.M `bson:"payload"` + PutAt time.Time `bson:"put_at"` + AvailableFrom time.Time `bson:"available_from"` + SecondsToExecute int `bson:"seconds_to_execute"` + Id primitive.ObjectID `bson:"_id"` + TakenAt *time.Time `bson:"taken_at"` + Attempts int `bson:"attempts"` +} + +type InsertedTask struct { + Queue string `bson:"queue"` + Payload interface{} `bson:"payload"` + PutAt time.Time `bson:"put_at"` + AvailableFrom time.Time `bson:"available_from"` + SecondsToExecute int `bson:"seconds_to_execute"` + TakenAt *time.Time `bson:"taken_at"` + Attempts int `bson:"attempts"` +} + +func Add(task InsertedTask) error { + _, err := collection().InsertOne(context.TODO(), task) + if err != nil { + return errors.ErrInternalError + } + return nil +} + +func Finish(id string) error { + objectId, err := primitive.ObjectIDFromHex(id) + if err != nil { + return errors.ErrIncorrectFormat + } + + _, err = collection().DeleteOne(context.TODO(), bson.M{"_id": objectId}) + if err != nil { + return errors.ErrInternalError + } + return nil +} + +func Take(queue string) (*Task, error) { + now := time.Now() + task, err := findTask(queue, now) + if err != nil { + return nil, errors.ErrInternalError + } + if task == nil { + return nil, nil + } + _, err = collection().UpdateByID(context.TODO(), task.Id, bson.M{"$set": bson.M{"taken_at": now, "attempts": task.Attempts + 1}}) + if err != nil { + return nil, errors.ErrInternalError + } + return task, nil +} + +func findTask(queue string, now time.Time) (*Task, error) { + cursor, err := collection().Find( + context.TODO(), + bson.M{ + "queue": queue, + "available_from": bson.M{"$lte": now}, + }, + ) + if err != nil { + return nil, errors.ErrInternalError + } + + var results []Task + err = cursor.All(context.TODO(), &results) + if err != nil { + return nil, errors.ErrInternalError + } + + for _, task := range results { + if task.TakenAt == nil { + return &task, nil + } + takenAt := *task.TakenAt + + if takenAt.Add(time.Second * time.Duration(task.SecondsToExecute)).Before(now) { + return &task, nil + } + } + return nil, nil +} + +func collection() *mongo.Collection { + return client.Database.Collection("tasks") +} diff --git a/app/utils/env.go b/app/utils/env.go new file mode 100644 index 0000000..828aa41 --- /dev/null +++ b/app/utils/env.go @@ -0,0 +1,11 @@ +package utils + +import "os" + +func GetEnv(env string, defaultValue string) string { + value := os.Getenv(env) + if len(value) == 0 { + return defaultValue + } + return value +} diff --git a/app/utils/errors.go b/app/utils/errors.go new file mode 100644 index 0000000..3fb5fdf --- /dev/null +++ b/app/utils/errors.go @@ -0,0 +1,6 @@ +package utils + +import "errors" + +var ErrIncorrectFormat = errors.New("incorrect_format") +var ErrInternalError = errors.New("internal_error") diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..cdc9282 --- /dev/null +++ b/go.mod @@ -0,0 +1,17 @@ +module queues-go + +go 1.23.4 + +require ( + github.com/golang/snappy v0.0.4 // indirect + github.com/klauspost/compress v1.13.6 // indirect + github.com/montanaflynn/stats v0.7.1 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect + go.mongodb.org/mongo-driver v1.17.1 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/text v0.17.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9ea4ff2 --- /dev/null +++ b/go.sum @@ -0,0 +1,46 @@ +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= +github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.mongodb.org/mongo-driver v1.17.1 h1:Wic5cJIwJgSpBhe3lx3+/RybR5PiYRMpVFgO7cOHyIM= +go.mongodb.org/mongo-driver v1.17.1/go.mod h1:wwWm/+BuOddhcq3n68LKRmgk2wXzmF6s0SFOa0GINL4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/main.go b/main.go new file mode 100644 index 0000000..8d7344a --- /dev/null +++ b/main.go @@ -0,0 +1,15 @@ +package main + +import ( + "net/http" + "queues-go/app/routers" + client "queues-go/app/storage/mongo" +) + +func main() { + client.Connect() + http.HandleFunc("/api/v1/take", routers.Take) + http.HandleFunc("/api/v1/finish", routers.Finish) + http.HandleFunc("/api/v1/put", routers.Put) + http.ListenAndServe(":1239", nil) +} diff --git a/queues-go b/queues-go new file mode 100755 index 0000000..4fd67a1 Binary files /dev/null and b/queues-go differ