Merge pull request 'master' (#37) from master into dev
Reviewed-on: #37
This commit is contained in:
		@@ -7,8 +7,8 @@ services:
 | 
			
		||||
    networks:
 | 
			
		||||
      - queues-development
 | 
			
		||||
      - monitoring
 | 
			
		||||
      - mongo-development
 | 
			
		||||
    environment:
 | 
			
		||||
      MONGO_HOST: "mongo.develop.sprinthub.ru"
 | 
			
		||||
      MONGO_PASSWORD: $MONGO_PASSWORD_DEV
 | 
			
		||||
      STAGE: "development"
 | 
			
		||||
    deploy:
 | 
			
		||||
@@ -26,3 +26,5 @@ networks:
 | 
			
		||||
    external: true
 | 
			
		||||
  monitoring:
 | 
			
		||||
    external: true
 | 
			
		||||
  mongo-development:
 | 
			
		||||
    external: true
 | 
			
		||||
 
 | 
			
		||||
@@ -7,6 +7,7 @@ services:
 | 
			
		||||
    networks:
 | 
			
		||||
      - queues
 | 
			
		||||
      - monitoring
 | 
			
		||||
      - mongo
 | 
			
		||||
    environment:
 | 
			
		||||
      MONGO_HOST: "mongo.sprinthub.ru"
 | 
			
		||||
      MONGO_PASSWORD: $MONGO_PASSWORD_PROD
 | 
			
		||||
@@ -26,3 +27,5 @@ networks:
 | 
			
		||||
    external: true
 | 
			
		||||
  monitoring:
 | 
			
		||||
    external: true
 | 
			
		||||
  mongo:
 | 
			
		||||
    external: true
 | 
			
		||||
 
 | 
			
		||||
@@ -12,7 +12,7 @@ import (
 | 
			
		||||
var Database mongo.Database
 | 
			
		||||
 | 
			
		||||
func Connect() {
 | 
			
		||||
	connectionString := fmt.Sprintf("mongodb://mongo:%s@%s:27017/", utils.GetEnv("MONGO_PASSWORD", "password"), utils.GetEnv("MONGO_HOST", "localhost"))
 | 
			
		||||
	connectionString := fmt.Sprintf("mongodb://mongo:%s@mongo:27017/", utils.GetEnv("MONGO_PASSWORD", "password"))
 | 
			
		||||
	client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(connectionString))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
 
 | 
			
		||||
@@ -33,6 +33,14 @@ type InsertedTask struct {
 | 
			
		||||
	Attempts         int        `bson:"attempts"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Count() (*int64, error) {
 | 
			
		||||
	count, err := collection().CountDocuments(context.TODO(), bson.M{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return &count, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Add(task InsertedTask) error {
 | 
			
		||||
	_, err := collection().InsertOne(context.TODO(), task)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										26
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										26
									
								
								main.go
									
									
									
									
									
								
							@@ -9,6 +9,7 @@ import (
 | 
			
		||||
	"os"
 | 
			
		||||
	"queues-go/app/routers"
 | 
			
		||||
	client "queues-go/app/storage/mongo"
 | 
			
		||||
	tasks "queues-go/app/storage/mongo/collections"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
@@ -58,6 +59,30 @@ func metricProxy(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
	w.WriteHeader(202)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func writeCount() {
 | 
			
		||||
	for {
 | 
			
		||||
		count, err := tasks.Count()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Printf("Failed getting docs count: %s", err.Error())
 | 
			
		||||
		} else {
 | 
			
		||||
			loc, _ := time.LoadLocation("Europe/Moscow")
 | 
			
		||||
			s := fmt.Sprintf(
 | 
			
		||||
				`{"timestamp":"%s","service":"queues","environment":"%s","name":"tasks","count":%s}`,
 | 
			
		||||
				time.Now().In(loc).Format("2006-01-02T15:04:05Z"),
 | 
			
		||||
				os.Getenv("STAGE"),
 | 
			
		||||
				strconv.Itoa(int(*count)),
 | 
			
		||||
			)
 | 
			
		||||
			data := []byte(s)
 | 
			
		||||
			r := bytes.NewReader(data)
 | 
			
		||||
			_, err := http.Post("http://monitoring:1237/api/v1/metrics/increment", "application/json", r)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				log.Printf("ERROR %s", err.Error())
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		time.Sleep(time.Second)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
	client.Connect()
 | 
			
		||||
	routers.MutexMap = make(map[string]*sync.Mutex)
 | 
			
		||||
@@ -66,5 +91,6 @@ func main() {
 | 
			
		||||
	http.HandleFunc("/api/v1/put", handlerWrapper(routers.Put))
 | 
			
		||||
	http.HandleFunc("/api/v1/metric", metricProxy)
 | 
			
		||||
	log.Printf("Server started")
 | 
			
		||||
	go writeCount()
 | 
			
		||||
	http.ListenAndServe(":1239", nil)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user