Compare commits
19 Commits
56add999c6
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
| d136a71f09 | |||
|
|
e94ca96c42 | ||
|
|
a3182947a1 | ||
| 02d5631c4d | |||
|
|
357418c66c | ||
| e791890ea5 | |||
|
|
3cdc41f864 | ||
| 710a81139c | |||
|
|
35dcc8390c | ||
| 28e83ee592 | |||
|
|
7a99b36b00 | ||
| 7f60dc0f59 | |||
|
|
db88db8b20 | ||
| 900cb70f37 | |||
|
|
cc2875563f | ||
| 740cac21d3 | |||
|
|
4538137ce4 | ||
| 15563aa75d | |||
|
|
6567a1dc35 |
@@ -7,8 +7,8 @@ services:
|
|||||||
networks:
|
networks:
|
||||||
- queues-development
|
- queues-development
|
||||||
- monitoring
|
- monitoring
|
||||||
|
- mongo-development
|
||||||
environment:
|
environment:
|
||||||
MONGO_HOST: "mongo.develop.sprinthub.ru"
|
|
||||||
MONGO_PASSWORD: $MONGO_PASSWORD_DEV
|
MONGO_PASSWORD: $MONGO_PASSWORD_DEV
|
||||||
STAGE: "development"
|
STAGE: "development"
|
||||||
deploy:
|
deploy:
|
||||||
@@ -26,3 +26,5 @@ networks:
|
|||||||
external: true
|
external: true
|
||||||
monitoring:
|
monitoring:
|
||||||
external: true
|
external: true
|
||||||
|
mongo-development:
|
||||||
|
external: true
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ services:
|
|||||||
networks:
|
networks:
|
||||||
- queues
|
- queues
|
||||||
- monitoring
|
- monitoring
|
||||||
|
- mongo
|
||||||
environment:
|
environment:
|
||||||
MONGO_HOST: "mongo.sprinthub.ru"
|
MONGO_HOST: "mongo.sprinthub.ru"
|
||||||
MONGO_PASSWORD: $MONGO_PASSWORD_PROD
|
MONGO_PASSWORD: $MONGO_PASSWORD_PROD
|
||||||
@@ -26,3 +27,5 @@ networks:
|
|||||||
external: true
|
external: true
|
||||||
monitoring:
|
monitoring:
|
||||||
external: true
|
external: true
|
||||||
|
mongo:
|
||||||
|
external: true
|
||||||
|
|||||||
@@ -1,9 +1,15 @@
|
|||||||
package routers
|
package routers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
tasks "queues-go/app/storage/mongo/collections"
|
tasks "queues-go/app/storage/mongo/collections"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TaskResponse struct {
|
type TaskResponse struct {
|
||||||
@@ -18,6 +24,24 @@ type TakeResponse struct {
|
|||||||
|
|
||||||
var MutexMap map[string]*sync.Mutex
|
var MutexMap map[string]*sync.Mutex
|
||||||
|
|
||||||
|
func sendLatency(timestamp time.Time, latency int) error {
|
||||||
|
loc, _ := time.LoadLocation("Europe/Moscow")
|
||||||
|
|
||||||
|
s := fmt.Sprintf(
|
||||||
|
`{"timestamp":"%s","service":"queues","environment":"%s","name":"latency","count":%s}`,
|
||||||
|
timestamp.In(loc).Format("2006-01-02T15:04:05Z"),
|
||||||
|
os.Getenv("STAGE"),
|
||||||
|
strconv.Itoa(latency),
|
||||||
|
)
|
||||||
|
data := []byte(s)
|
||||||
|
r := bytes.NewReader(data)
|
||||||
|
_, err := http.Post("http://monitoring:1237/api/v1/metrics/increment", "application/json", r)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("ERROR %s", err.Error())
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func Take(r *http.Request) (interface{}, int) {
|
func Take(r *http.Request) (interface{}, int) {
|
||||||
queue := r.Header.Get("queue")
|
queue := r.Header.Get("queue")
|
||||||
mutex, ok := MutexMap[queue]
|
mutex, ok := MutexMap[queue]
|
||||||
@@ -41,6 +65,8 @@ func Take(r *http.Request) (interface{}, int) {
|
|||||||
Attempt: task.Attempts,
|
Attempt: task.Attempts,
|
||||||
Payload: task.Payload,
|
Payload: task.Payload,
|
||||||
}
|
}
|
||||||
|
now := time.Now()
|
||||||
|
go sendLatency(now, int(now.Sub(task.PutAt).Milliseconds()))
|
||||||
}
|
}
|
||||||
|
|
||||||
return response, http.StatusOK
|
return response, http.StatusOK
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ import (
|
|||||||
var Database mongo.Database
|
var Database mongo.Database
|
||||||
|
|
||||||
func Connect() {
|
func Connect() {
|
||||||
connectionString := fmt.Sprintf("mongodb://mongo:%s@%s:27017/", utils.GetEnv("MONGO_PASSWORD", "password"), utils.GetEnv("MONGO_HOST", "localhost"))
|
connectionString := fmt.Sprintf("mongodb://mongo:%s@mongo:27017/", utils.GetEnv("MONGO_PASSWORD", "password"))
|
||||||
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(connectionString))
|
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(connectionString))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
|||||||
@@ -33,6 +33,14 @@ type InsertedTask struct {
|
|||||||
Attempts int `bson:"attempts"`
|
Attempts int `bson:"attempts"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Count() (*int64, error) {
|
||||||
|
count, err := collection().CountDocuments(context.TODO(), bson.M{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &count, nil
|
||||||
|
}
|
||||||
|
|
||||||
func Add(task InsertedTask) error {
|
func Add(task InsertedTask) error {
|
||||||
_, err := collection().InsertOne(context.TODO(), task)
|
_, err := collection().InsertOne(context.TODO(), task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
35
main.go
35
main.go
@@ -9,15 +9,17 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"queues-go/app/routers"
|
"queues-go/app/routers"
|
||||||
client "queues-go/app/storage/mongo"
|
client "queues-go/app/storage/mongo"
|
||||||
|
tasks "queues-go/app/storage/mongo/collections"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func writeMetric(timestamp time.Time, endpoint string, statusCode int, responseTime int, method string) {
|
func writeMetric(timestamp time.Time, endpoint string, statusCode int, responseTime int, method string) {
|
||||||
|
loc, _ := time.LoadLocation("Europe/Moscow")
|
||||||
s := fmt.Sprintf(
|
s := fmt.Sprintf(
|
||||||
`{"timestamp":"%s","service":"queues","environment":"%s","endpoint":"%s","status_code":%s,"response_time":%s,"method":"%s"}`,
|
`{"timestamp":"%s","service":"queues","environment":"%s","endpoint":"%s","status_code":%s,"response_time":%s,"method":"%s"}`,
|
||||||
timestamp.Format("2006-01-02T15:04:05Z"),
|
timestamp.In(loc).Format("2006-01-02T15:04:05Z"),
|
||||||
os.Getenv("STAGE"),
|
os.Getenv("STAGE"),
|
||||||
endpoint,
|
endpoint,
|
||||||
strconv.Itoa(statusCode),
|
strconv.Itoa(statusCode),
|
||||||
@@ -52,12 +54,43 @@ func handlerWrapper(f func(*http.Request) (interface{}, int)) func(http.Response
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func metricProxy(w http.ResponseWriter, r *http.Request) {
|
||||||
|
http.Post("http://monitoring:1237/api/v1/metrics/task", "application/json", r.Body)
|
||||||
|
w.WriteHeader(202)
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeCount() {
|
||||||
|
for {
|
||||||
|
count, err := tasks.Count()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed getting docs count: %s", err.Error())
|
||||||
|
} else {
|
||||||
|
loc, _ := time.LoadLocation("Europe/Moscow")
|
||||||
|
s := fmt.Sprintf(
|
||||||
|
`{"timestamp":"%s","service":"queues","environment":"%s","name":"tasks","count":%s}`,
|
||||||
|
time.Now().In(loc).Format("2006-01-02T15:04:05Z"),
|
||||||
|
os.Getenv("STAGE"),
|
||||||
|
strconv.Itoa(int(*count)),
|
||||||
|
)
|
||||||
|
data := []byte(s)
|
||||||
|
r := bytes.NewReader(data)
|
||||||
|
_, err := http.Post("http://monitoring:1237/api/v1/metrics/increment", "application/json", r)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("ERROR %s", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
client.Connect()
|
client.Connect()
|
||||||
routers.MutexMap = make(map[string]*sync.Mutex)
|
routers.MutexMap = make(map[string]*sync.Mutex)
|
||||||
http.HandleFunc("/api/v1/take", handlerWrapper(routers.Take))
|
http.HandleFunc("/api/v1/take", handlerWrapper(routers.Take))
|
||||||
http.HandleFunc("/api/v1/finish", handlerWrapper(routers.Finish))
|
http.HandleFunc("/api/v1/finish", handlerWrapper(routers.Finish))
|
||||||
http.HandleFunc("/api/v1/put", handlerWrapper(routers.Put))
|
http.HandleFunc("/api/v1/put", handlerWrapper(routers.Put))
|
||||||
|
http.HandleFunc("/api/v1/metric", metricProxy)
|
||||||
log.Printf("Server started")
|
log.Printf("Server started")
|
||||||
|
go writeCount()
|
||||||
http.ListenAndServe(":1239", nil)
|
http.ListenAndServe(":1239", nil)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user