diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index 54393d7..1d372aa 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -7,8 +7,8 @@ services: networks: - queues-development - monitoring + - mongo-development environment: - MONGO_HOST: "mongo.develop.sprinthub.ru" MONGO_PASSWORD: $MONGO_PASSWORD_DEV STAGE: "development" deploy: @@ -26,3 +26,5 @@ networks: external: true monitoring: external: true + mongo-development: + external: true diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml index 3426f08..644aaca 100644 --- a/.deploy/deploy-prod.yaml +++ b/.deploy/deploy-prod.yaml @@ -7,6 +7,7 @@ services: networks: - queues - monitoring + - mongo environment: MONGO_HOST: "mongo.sprinthub.ru" MONGO_PASSWORD: $MONGO_PASSWORD_PROD @@ -26,3 +27,5 @@ networks: external: true monitoring: external: true + mongo: + external: true diff --git a/app/storage/mongo/client.go b/app/storage/mongo/client.go index 9c47fe6..9eec809 100644 --- a/app/storage/mongo/client.go +++ b/app/storage/mongo/client.go @@ -12,7 +12,7 @@ import ( var Database mongo.Database func Connect() { - connectionString := fmt.Sprintf("mongodb://mongo:%s@%s:27017/", utils.GetEnv("MONGO_PASSWORD", "password"), utils.GetEnv("MONGO_HOST", "localhost")) + connectionString := fmt.Sprintf("mongodb://mongo:%s@mongo:27017/", utils.GetEnv("MONGO_PASSWORD", "password")) client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(connectionString)) if err != nil { panic(err) diff --git a/app/storage/mongo/collections/tasks.go b/app/storage/mongo/collections/tasks.go index 05c521c..0943924 100644 --- a/app/storage/mongo/collections/tasks.go +++ b/app/storage/mongo/collections/tasks.go @@ -33,6 +33,14 @@ type InsertedTask struct { Attempts int `bson:"attempts"` } +func Count() (*int64, error) { + count, err := collection().CountDocuments(context.TODO(), bson.M{}) + if err != nil { + return nil, err + } + return &count, nil +} + func Add(task InsertedTask) error { _, err := collection().InsertOne(context.TODO(), task) if err != nil { diff --git a/main.go b/main.go index e8fb004..27ea094 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( "os" "queues-go/app/routers" client "queues-go/app/storage/mongo" + tasks "queues-go/app/storage/mongo/collections" "strconv" "sync" "time" @@ -58,6 +59,30 @@ func metricProxy(w http.ResponseWriter, r *http.Request) { w.WriteHeader(202) } +func writeCount() { + for { + count, err := tasks.Count() + if err != nil { + log.Printf("Failed getting docs count: %s", err.Error()) + } else { + loc, _ := time.LoadLocation("Europe/Moscow") + s := fmt.Sprintf( + `{"timestamp":"%s","service":"queues","environment":"%s","name":"tasks","count":%s}`, + time.Now().In(loc).Format("2006-01-02T15:04:05Z"), + os.Getenv("STAGE"), + strconv.Itoa(int(*count)), + ) + data := []byte(s) + r := bytes.NewReader(data) + _, err := http.Post("http://monitoring:1237/api/v1/metrics/increment", "application/json", r) + if err != nil { + log.Printf("ERROR %s", err.Error()) + } + } + time.Sleep(time.Second) + } +} + func main() { client.Connect() routers.MutexMap = make(map[string]*sync.Mutex) @@ -66,5 +91,6 @@ func main() { http.HandleFunc("/api/v1/put", handlerWrapper(routers.Put)) http.HandleFunc("/api/v1/metric", metricProxy) log.Printf("Server started") + go writeCount() http.ListenAndServe(":1239", nil) }