From 5b1094c2584d27219676584de1f6cf4f4939a445 Mon Sep 17 00:00:00 2001 From: "Stanislav N. aka pztrn" Date: Sun, 29 Nov 2020 05:44:21 +0500 Subject: [PATCH] Linter config tuning and first version of Prometheus metrics parser. --- .golangci.yml | 2 + internal/application/application.go | 9 +++ internal/application/config.go | 6 +- internal/application/fetcher.go | 84 +++++++++++++++++++++- internal/application/parser.go | 105 ++++++++++++++++++++++++++++ 5 files changed, 202 insertions(+), 4 deletions(-) create mode 100644 internal/application/parser.go diff --git a/.golangci.yml b/.golangci.yml index cc70430..cba426f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -21,3 +21,5 @@ linters-settings: line-length: 120 gocyclo: min-complexity: 40 + gocognit: + min-complexity: 40 diff --git a/internal/application/application.go b/internal/application/application.go index f44a6d0..dad17b2 100644 --- a/internal/application/application.go +++ b/internal/application/application.go @@ -3,6 +3,8 @@ package application import ( "context" "log" + "net/http" + "sync" "go.dev.pztrn.name/metricator/internal/storage" "go.dev.pztrn.name/metricator/internal/storage/memory" @@ -18,6 +20,11 @@ type Application struct { storage storage.Metrics storageDone chan struct{} + + fetchIsRunning bool + fetchIsRunningMutex sync.RWMutex + + httpClient *http.Client } // NewApplication creates new application. @@ -50,6 +57,8 @@ func (a *Application) initialize() { func (a *Application) Start() { a.storage.Start() + go a.startFetcher() + // The Context Listening Goroutine. go func() { <-a.ctx.Done() diff --git a/internal/application/config.go b/internal/application/config.go index 304182f..ef3f5d4 100644 --- a/internal/application/config.go +++ b/internal/application/config.go @@ -6,10 +6,10 @@ import "time" type Config struct { // Endpoint is a remote application endpoint which should give us metrics // in Prometheus format. - Endpoint string + Endpoint string `yaml:"endpoint"` // Headers is a list of headers that should be added to metrics request. - Headers map[string]string + Headers map[string]string `yaml:"headers"` // TimeBetweenRequests is a minimal amount of time which should pass // between requests. - TimeBetweenRequests time.Duration + TimeBetweenRequests time.Duration `yaml:"time_between_requests"` } diff --git a/internal/application/fetcher.go b/internal/application/fetcher.go index 65574aa..0da1922 100644 --- a/internal/application/fetcher.go +++ b/internal/application/fetcher.go @@ -1,4 +1,86 @@ package application +import ( + "io/ioutil" + "log" + "net/http" + "time" +) + +// Fetches data from remote endpoint, parses it and updates in storage. +func (a *Application) fetch() { + // Do not do anything if fetching is running. + // ToDo: maybe another approach? + a.fetchIsRunningMutex.RLock() + isFetching := a.fetchIsRunning + a.fetchIsRunningMutex.RUnlock() + + if isFetching { + return + } + + log.Println("Fetching data for", a.name) + + req, err := http.NewRequestWithContext(a.ctx, "GET", a.config.Endpoint, nil) + if err != nil { + log.Println("Failed to create request for", a.name, "metrics:", err.Error()) + + return + } + + for header, value := range a.config.Headers { + req.Header.Add(header, value) + } + + resp, err := a.httpClient.Do(req) + if err != nil { + log.Println("Failed to execute request for", a.name, "metrics:", err.Error()) + + return + } + + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Println("Failed to read response body for", a.name, "metrics:", err.Error()) + + return + } + + data := a.parse(string(body)) + + a.storage.Put(data) + + a.fetchIsRunningMutex.Lock() + a.fetchIsRunning = true + a.fetchIsRunningMutex.Unlock() + + a.fetchIsRunningMutex.Lock() + a.fetchIsRunning = false + a.fetchIsRunningMutex.Unlock() +} + // Configures and starts Prometheus data fetching goroutine. -func (a *Application) startFetcher() {} +func (a *Application) startFetcher() { + fetchTicker := time.NewTicker(a.config.TimeBetweenRequests) + + // nolint:exaustivestruct + a.httpClient = &http.Client{ + Timeout: time.Second * 5, + } + + defer log.Println("Fetcher for", a.name, "completed") + + // First fetch should be executed ASAP. + a.fetch() + + for { + select { + case <-a.ctx.Done(): + return + case <-fetchTicker.C: + a.fetch() + } + } +} diff --git a/internal/application/parser.go b/internal/application/parser.go new file mode 100644 index 0000000..9210cdb --- /dev/null +++ b/internal/application/parser.go @@ -0,0 +1,105 @@ +package application + +import ( + "log" + "strings" +) + +// Parses passed body and returns a map suitable for pushing into storage. +func (a *Application) parse(body string) map[string]string { + data := make(map[string]string) + + // ToDo: switch to bytes buffer and maybe do not read body in caller? + splittedBody := strings.Split(body, "\n") + + for _, line := range splittedBody { + // Prometheus line contains metric name and metric parameters defined + // in "{}". + var ( + name, value string + params []string + ) + + // Skip empty lines. + if line == "" { + continue + } + + // Check that line isn't commented. We should skip comments for now. + if strings.HasPrefix(line, "#") { + continue + } + + log.Println("Analyzing line:", line) + + // Check if we have parametrized metric. If no - push it to data map. + if !strings.Contains(line, "{") { + name = strings.Split(line, " ")[0] + value = strings.Split(line, " ")[1] + } else { + value = strings.Split(line, " ")[1] + name = strings.Split(line, "{")[0] + + // Parse params into "name:value" string. + valuesString := strings.Split(strings.Split(line, "{")[1], "}")[0] + + var ( + paramName, paramValue string + paramNameFinished, paramValueStarted, paramValueFinished bool + ) + + for _, r := range valuesString { + if paramValueFinished && string(r) == "," { + params = append(params, paramName+":"+paramValue) + paramName, paramValue = "", "" + paramNameFinished, paramValueStarted, paramValueFinished = false, false, false + + continue + } + if !paramNameFinished { + if string(r) != "=" { + paramName += string(r) + + continue + } else { + paramNameFinished = true + + continue + } + } else { + if string(r) == "\"" && !paramValueStarted { + paramValueStarted = true + + continue + } + + if paramValueStarted && string(r) != "\"" { + paramValue += string(r) + + continue + } + + if paramValueStarted && string(r) == "\"" { + paramValueFinished = true + + continue + } + } + } + + if paramName != "" && paramValue != "" { + params = append(params, paramName+":"+paramValue) + } + + for _, param := range params { + name += "/" + param + } + } + + data[name] = value + } + + log.Printf("Data parsed: %+v\n", data) + + return data +}