initial
This commit is contained in:
29
app/routers/finish.go
Normal file
29
app/routers/finish.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package routers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
tasks "queues-go/app/storage/mongo/collections"
|
||||
)
|
||||
|
||||
type FinishRequestBody struct {
|
||||
Id string `json:"id"`
|
||||
}
|
||||
|
||||
func Finish(w http.ResponseWriter, r *http.Request) {
|
||||
d := json.NewDecoder(r.Body)
|
||||
body := FinishRequestBody{}
|
||||
err := d.Decode(&body)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
err = tasks.Finish(body.Id)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
}
|
||||
46
app/routers/put.go
Normal file
46
app/routers/put.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package routers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
tasks "queues-go/app/storage/mongo/collections"
|
||||
"time"
|
||||
)
|
||||
|
||||
type PutRequestBody struct {
|
||||
Payload json.RawMessage `json:"payload"`
|
||||
SecondsToExecute int `json:"seconds_to_execute"`
|
||||
Delay *int `json:"delay"`
|
||||
}
|
||||
|
||||
func Put(w http.ResponseWriter, r *http.Request) {
|
||||
d := json.NewDecoder(r.Body)
|
||||
body := PutRequestBody{}
|
||||
err := d.Decode(&body)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
queue := r.Header.Get("queue")
|
||||
var availableFrom time.Time
|
||||
if body.Delay == nil {
|
||||
availableFrom = time.Now()
|
||||
} else {
|
||||
availableFrom = time.Now().Add(time.Second + time.Duration(*body.Delay))
|
||||
}
|
||||
task := tasks.InsertedTask{
|
||||
Queue: queue,
|
||||
Payload: body.Payload,
|
||||
PutAt: time.Now(),
|
||||
AvailableFrom: availableFrom,
|
||||
SecondsToExecute: body.SecondsToExecute,
|
||||
TakenAt: nil,
|
||||
Attempts: 0,
|
||||
}
|
||||
err = tasks.Add(task)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
}
|
||||
42
app/routers/take.go
Normal file
42
app/routers/take.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package routers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
tasks "queues-go/app/storage/mongo/collections"
|
||||
)
|
||||
|
||||
type TaskResponse struct {
|
||||
Id string `json:"id"`
|
||||
Attempt int `json:"attempt"`
|
||||
Payload interface{} `json:"payload"`
|
||||
}
|
||||
|
||||
type TakeResponse struct {
|
||||
Task *TaskResponse `json:"task"`
|
||||
}
|
||||
|
||||
func Take(w http.ResponseWriter, r *http.Request) {
|
||||
queue := r.Header.Get("queue")
|
||||
task, err := tasks.Take(queue)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var response TakeResponse
|
||||
if task == nil {
|
||||
response.Task = nil
|
||||
} else {
|
||||
response.Task = &TaskResponse{
|
||||
Id: task.Id.Hex(),
|
||||
Attempt: task.Attempts,
|
||||
Payload: task.Payload,
|
||||
}
|
||||
}
|
||||
data, err := json.Marshal(response)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.Write(data)
|
||||
}
|
||||
25
app/storage/mongo/client.go
Normal file
25
app/storage/mongo/client.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"queues-go/app/utils"
|
||||
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
|
||||
var Database mongo.Database
|
||||
|
||||
func Connect() {
|
||||
connectionString := fmt.Sprintf("mongodb://mongo:%s@%s:27017/", utils.GetEnv("MONGO_PASSWORD", "password"), utils.GetEnv("MONGO_HOST", "localhost"))
|
||||
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(connectionString))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
db := client.Database("queues")
|
||||
if db == nil {
|
||||
panic(fmt.Errorf("DATABASE DOES NOT EXIST"))
|
||||
}
|
||||
Database = *db
|
||||
}
|
||||
105
app/storage/mongo/collections/tasks.go
Normal file
105
app/storage/mongo/collections/tasks.go
Normal file
@@ -0,0 +1,105 @@
|
||||
package tasks
|
||||
|
||||
import (
|
||||
"context"
|
||||
client "queues-go/app/storage/mongo"
|
||||
errors "queues-go/app/utils"
|
||||
"time"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
)
|
||||
|
||||
type Task struct {
|
||||
Queue string `bson:"queue"`
|
||||
Payload bson.M `bson:"payload"`
|
||||
PutAt time.Time `bson:"put_at"`
|
||||
AvailableFrom time.Time `bson:"available_from"`
|
||||
SecondsToExecute int `bson:"seconds_to_execute"`
|
||||
Id primitive.ObjectID `bson:"_id"`
|
||||
TakenAt *time.Time `bson:"taken_at"`
|
||||
Attempts int `bson:"attempts"`
|
||||
}
|
||||
|
||||
type InsertedTask struct {
|
||||
Queue string `bson:"queue"`
|
||||
Payload interface{} `bson:"payload"`
|
||||
PutAt time.Time `bson:"put_at"`
|
||||
AvailableFrom time.Time `bson:"available_from"`
|
||||
SecondsToExecute int `bson:"seconds_to_execute"`
|
||||
TakenAt *time.Time `bson:"taken_at"`
|
||||
Attempts int `bson:"attempts"`
|
||||
}
|
||||
|
||||
func Add(task InsertedTask) error {
|
||||
_, err := collection().InsertOne(context.TODO(), task)
|
||||
if err != nil {
|
||||
return errors.ErrInternalError
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Finish(id string) error {
|
||||
objectId, err := primitive.ObjectIDFromHex(id)
|
||||
if err != nil {
|
||||
return errors.ErrIncorrectFormat
|
||||
}
|
||||
|
||||
_, err = collection().DeleteOne(context.TODO(), bson.M{"_id": objectId})
|
||||
if err != nil {
|
||||
return errors.ErrInternalError
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Take(queue string) (*Task, error) {
|
||||
now := time.Now()
|
||||
task, err := findTask(queue, now)
|
||||
if err != nil {
|
||||
return nil, errors.ErrInternalError
|
||||
}
|
||||
if task == nil {
|
||||
return nil, nil
|
||||
}
|
||||
_, err = collection().UpdateByID(context.TODO(), task.Id, bson.M{"$set": bson.M{"taken_at": now, "attempts": task.Attempts + 1}})
|
||||
if err != nil {
|
||||
return nil, errors.ErrInternalError
|
||||
}
|
||||
return task, nil
|
||||
}
|
||||
|
||||
func findTask(queue string, now time.Time) (*Task, error) {
|
||||
cursor, err := collection().Find(
|
||||
context.TODO(),
|
||||
bson.M{
|
||||
"queue": queue,
|
||||
"available_from": bson.M{"$lte": now},
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.ErrInternalError
|
||||
}
|
||||
|
||||
var results []Task
|
||||
err = cursor.All(context.TODO(), &results)
|
||||
if err != nil {
|
||||
return nil, errors.ErrInternalError
|
||||
}
|
||||
|
||||
for _, task := range results {
|
||||
if task.TakenAt == nil {
|
||||
return &task, nil
|
||||
}
|
||||
takenAt := *task.TakenAt
|
||||
|
||||
if takenAt.Add(time.Second * time.Duration(task.SecondsToExecute)).Before(now) {
|
||||
return &task, nil
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func collection() *mongo.Collection {
|
||||
return client.Database.Collection("tasks")
|
||||
}
|
||||
11
app/utils/env.go
Normal file
11
app/utils/env.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package utils
|
||||
|
||||
import "os"
|
||||
|
||||
func GetEnv(env string, defaultValue string) string {
|
||||
value := os.Getenv(env)
|
||||
if len(value) == 0 {
|
||||
return defaultValue
|
||||
}
|
||||
return value
|
||||
}
|
||||
6
app/utils/errors.go
Normal file
6
app/utils/errors.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package utils
|
||||
|
||||
import "errors"
|
||||
|
||||
var ErrIncorrectFormat = errors.New("incorrect_format")
|
||||
var ErrInternalError = errors.New("internal_error")
|
||||
Reference in New Issue
Block a user