Linter config tuning and first version of Prometheus metrics parser.
This commit is contained in:
		| @@ -21,3 +21,5 @@ linters-settings: | ||||
|     line-length: 120 | ||||
|   gocyclo: | ||||
|     min-complexity: 40 | ||||
|   gocognit: | ||||
|     min-complexity: 40 | ||||
|   | ||||
| @@ -3,6 +3,8 @@ package application | ||||
| import ( | ||||
| 	"context" | ||||
| 	"log" | ||||
| 	"net/http" | ||||
| 	"sync" | ||||
|  | ||||
| 	"go.dev.pztrn.name/metricator/internal/storage" | ||||
| 	"go.dev.pztrn.name/metricator/internal/storage/memory" | ||||
| @@ -18,6 +20,11 @@ type Application struct { | ||||
|  | ||||
| 	storage     storage.Metrics | ||||
| 	storageDone chan struct{} | ||||
|  | ||||
| 	fetchIsRunning      bool | ||||
| 	fetchIsRunningMutex sync.RWMutex | ||||
|  | ||||
| 	httpClient *http.Client | ||||
| } | ||||
|  | ||||
| // NewApplication creates new application. | ||||
| @@ -50,6 +57,8 @@ func (a *Application) initialize() { | ||||
| func (a *Application) Start() { | ||||
| 	a.storage.Start() | ||||
|  | ||||
| 	go a.startFetcher() | ||||
|  | ||||
| 	// The Context Listening Goroutine. | ||||
| 	go func() { | ||||
| 		<-a.ctx.Done() | ||||
|   | ||||
| @@ -6,10 +6,10 @@ import "time" | ||||
| type Config struct { | ||||
| 	// Endpoint is a remote application endpoint which should give us metrics | ||||
| 	// in Prometheus format. | ||||
| 	Endpoint string | ||||
| 	Endpoint string `yaml:"endpoint"` | ||||
| 	// Headers is a list of headers that should be added to metrics request. | ||||
| 	Headers map[string]string | ||||
| 	Headers map[string]string `yaml:"headers"` | ||||
| 	// TimeBetweenRequests is a minimal amount of time which should pass | ||||
| 	// between requests. | ||||
| 	TimeBetweenRequests time.Duration | ||||
| 	TimeBetweenRequests time.Duration `yaml:"time_between_requests"` | ||||
| } | ||||
|   | ||||
| @@ -1,4 +1,86 @@ | ||||
| package application | ||||
|  | ||||
| import ( | ||||
| 	"io/ioutil" | ||||
| 	"log" | ||||
| 	"net/http" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| // Fetches data from remote endpoint, parses it and updates in storage. | ||||
| func (a *Application) fetch() { | ||||
| 	// Do not do anything if fetching is running. | ||||
| 	// ToDo: maybe another approach? | ||||
| 	a.fetchIsRunningMutex.RLock() | ||||
| 	isFetching := a.fetchIsRunning | ||||
| 	a.fetchIsRunningMutex.RUnlock() | ||||
|  | ||||
| 	if isFetching { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	log.Println("Fetching data for", a.name) | ||||
|  | ||||
| 	req, err := http.NewRequestWithContext(a.ctx, "GET", a.config.Endpoint, nil) | ||||
| 	if err != nil { | ||||
| 		log.Println("Failed to create request for", a.name, "metrics:", err.Error()) | ||||
|  | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	for header, value := range a.config.Headers { | ||||
| 		req.Header.Add(header, value) | ||||
| 	} | ||||
|  | ||||
| 	resp, err := a.httpClient.Do(req) | ||||
| 	if err != nil { | ||||
| 		log.Println("Failed to execute request for", a.name, "metrics:", err.Error()) | ||||
|  | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	defer resp.Body.Close() | ||||
|  | ||||
| 	body, err := ioutil.ReadAll(resp.Body) | ||||
| 	if err != nil { | ||||
| 		log.Println("Failed to read response body for", a.name, "metrics:", err.Error()) | ||||
|  | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	data := a.parse(string(body)) | ||||
|  | ||||
| 	a.storage.Put(data) | ||||
|  | ||||
| 	a.fetchIsRunningMutex.Lock() | ||||
| 	a.fetchIsRunning = true | ||||
| 	a.fetchIsRunningMutex.Unlock() | ||||
|  | ||||
| 	a.fetchIsRunningMutex.Lock() | ||||
| 	a.fetchIsRunning = false | ||||
| 	a.fetchIsRunningMutex.Unlock() | ||||
| } | ||||
|  | ||||
| // Configures and starts Prometheus data fetching goroutine. | ||||
| func (a *Application) startFetcher() {} | ||||
| func (a *Application) startFetcher() { | ||||
| 	fetchTicker := time.NewTicker(a.config.TimeBetweenRequests) | ||||
|  | ||||
| 	// nolint:exaustivestruct | ||||
| 	a.httpClient = &http.Client{ | ||||
| 		Timeout: time.Second * 5, | ||||
| 	} | ||||
|  | ||||
| 	defer log.Println("Fetcher for", a.name, "completed") | ||||
|  | ||||
| 	// First fetch should be executed ASAP. | ||||
| 	a.fetch() | ||||
|  | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-a.ctx.Done(): | ||||
| 			return | ||||
| 		case <-fetchTicker.C: | ||||
| 			a.fetch() | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
							
								
								
									
										105
									
								
								internal/application/parser.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										105
									
								
								internal/application/parser.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,105 @@ | ||||
| package application | ||||
|  | ||||
| import ( | ||||
| 	"log" | ||||
| 	"strings" | ||||
| ) | ||||
|  | ||||
| // Parses passed body and returns a map suitable for pushing into storage. | ||||
| func (a *Application) parse(body string) map[string]string { | ||||
| 	data := make(map[string]string) | ||||
|  | ||||
| 	// ToDo: switch to bytes buffer and maybe do not read body in caller? | ||||
| 	splittedBody := strings.Split(body, "\n") | ||||
|  | ||||
| 	for _, line := range splittedBody { | ||||
| 		// Prometheus line contains metric name and metric parameters defined | ||||
| 		// in "{}". | ||||
| 		var ( | ||||
| 			name, value string | ||||
| 			params      []string | ||||
| 		) | ||||
|  | ||||
| 		// Skip empty lines. | ||||
| 		if line == "" { | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		// Check that line isn't commented. We should skip comments for now. | ||||
| 		if strings.HasPrefix(line, "#") { | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		log.Println("Analyzing line:", line) | ||||
|  | ||||
| 		// Check if we have parametrized metric. If no - push it to data map. | ||||
| 		if !strings.Contains(line, "{") { | ||||
| 			name = strings.Split(line, " ")[0] | ||||
| 			value = strings.Split(line, " ")[1] | ||||
| 		} else { | ||||
| 			value = strings.Split(line, " ")[1] | ||||
| 			name = strings.Split(line, "{")[0] | ||||
|  | ||||
| 			// Parse params into "name:value" string. | ||||
| 			valuesString := strings.Split(strings.Split(line, "{")[1], "}")[0] | ||||
|  | ||||
| 			var ( | ||||
| 				paramName, paramValue                                    string | ||||
| 				paramNameFinished, paramValueStarted, paramValueFinished bool | ||||
| 			) | ||||
|  | ||||
| 			for _, r := range valuesString { | ||||
| 				if paramValueFinished && string(r) == "," { | ||||
| 					params = append(params, paramName+":"+paramValue) | ||||
| 					paramName, paramValue = "", "" | ||||
| 					paramNameFinished, paramValueStarted, paramValueFinished = false, false, false | ||||
|  | ||||
| 					continue | ||||
| 				} | ||||
| 				if !paramNameFinished { | ||||
| 					if string(r) != "=" { | ||||
| 						paramName += string(r) | ||||
|  | ||||
| 						continue | ||||
| 					} else { | ||||
| 						paramNameFinished = true | ||||
|  | ||||
| 						continue | ||||
| 					} | ||||
| 				} else { | ||||
| 					if string(r) == "\"" && !paramValueStarted { | ||||
| 						paramValueStarted = true | ||||
|  | ||||
| 						continue | ||||
| 					} | ||||
|  | ||||
| 					if paramValueStarted && string(r) != "\"" { | ||||
| 						paramValue += string(r) | ||||
|  | ||||
| 						continue | ||||
| 					} | ||||
|  | ||||
| 					if paramValueStarted && string(r) == "\"" { | ||||
| 						paramValueFinished = true | ||||
|  | ||||
| 						continue | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			if paramName != "" && paramValue != "" { | ||||
| 				params = append(params, paramName+":"+paramValue) | ||||
| 			} | ||||
|  | ||||
| 			for _, param := range params { | ||||
| 				name += "/" + param | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		data[name] = value | ||||
| 	} | ||||
|  | ||||
| 	log.Printf("Data parsed: %+v\n", data) | ||||
|  | ||||
| 	return data | ||||
| } | ||||
		Reference in New Issue
	
	Block a user