Sfoglia il codice sorgente

Merge branch 'alerting_opentsdb'

bergquist 9 anni fa
parent
commit
3ac38dfe70

+ 1 - 0
pkg/cmd/grafana-server/main.go

@@ -21,6 +21,7 @@ import (
 	_ "github.com/grafana/grafana/pkg/services/alerting/notifiers"
 	_ "github.com/grafana/grafana/pkg/services/alerting/notifiers"
 	_ "github.com/grafana/grafana/pkg/tsdb/graphite"
 	_ "github.com/grafana/grafana/pkg/tsdb/graphite"
 	_ "github.com/grafana/grafana/pkg/tsdb/influxdb"
 	_ "github.com/grafana/grafana/pkg/tsdb/influxdb"
+	_ "github.com/grafana/grafana/pkg/tsdb/opentsdb"
 	_ "github.com/grafana/grafana/pkg/tsdb/prometheus"
 	_ "github.com/grafana/grafana/pkg/tsdb/prometheus"
 	_ "github.com/grafana/grafana/pkg/tsdb/testdata"
 	_ "github.com/grafana/grafana/pkg/tsdb/testdata"
 )
 )

+ 212 - 0
pkg/tsdb/opentsdb/opentsdb.go

@@ -0,0 +1,212 @@
+package opentsdb
+
+import (
+	"context"
+	"crypto/tls"
+	"fmt"
+	"path"
+	"strconv"
+	"strings"
+	"time"
+
+	"golang.org/x/net/context/ctxhttp"
+
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"encoding/json"
+
+	"gopkg.in/guregu/null.v3"
+
+	"github.com/grafana/grafana/pkg/log"
+	"github.com/grafana/grafana/pkg/setting"
+	"github.com/grafana/grafana/pkg/tsdb"
+)
+
+type OpenTsdbExecutor struct {
+	*tsdb.DataSourceInfo
+}
+
+func NewOpenTsdbExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor {
+	return &OpenTsdbExecutor{dsInfo}
+}
+
+var (
+	plog       log.Logger
+	HttpClient *http.Client
+)
+
+func init() {
+	plog = log.New("tsdb.opentsdb")
+	tsdb.RegisterExecutor("opentsdb", NewOpenTsdbExecutor)
+
+	tr := &http.Transport{
+		TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
+	}
+
+	HttpClient = &http.Client{
+		Timeout:   time.Duration(15 * time.Second),
+		Transport: tr,
+	}
+}
+
+func (e *OpenTsdbExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) *tsdb.BatchResult {
+	result := &tsdb.BatchResult{}
+
+	var tsdbQuery OpenTsdbQuery
+
+	tsdbQuery.Start = queryContext.TimeRange.GetFromAsMsEpoch()
+	tsdbQuery.End = queryContext.TimeRange.GetToAsMsEpoch()
+
+  for _ , query := range queries {
+  	metric := e.buildMetric(query)
+  	tsdbQuery.Queries = append(tsdbQuery.Queries, metric)
+	}
+
+	if setting.Env == setting.DEV {
+		plog.Debug("OpenTsdb request", "params", tsdbQuery)
+	}
+
+	req, err := e.createRequest(tsdbQuery)
+	if err != nil {
+		result.Error = err
+		return result
+	}
+
+	res, err := ctxhttp.Do(ctx, HttpClient, req)
+	if err != nil {
+		result.Error = err
+		return result
+	}
+
+	queryResult, err := e.parseResponse(tsdbQuery, res)
+	if err != nil {
+		return result.WithError(err)
+	}
+
+	result.QueryResults = queryResult
+	return result
+}
+
+func (e *OpenTsdbExecutor) createRequest(data OpenTsdbQuery) (*http.Request, error) {
+	u, _ := url.Parse(e.Url)
+	u.Path = path.Join(u.Path, "api/query")
+
+	postData, err := json.Marshal(data)
+
+	req, err := http.NewRequest(http.MethodPost, u.String(), strings.NewReader(string(postData)))
+	if err != nil {
+		plog.Info("Failed to create request", "error", err)
+		return nil, fmt.Errorf("Failed to create request. error: %v", err)
+	}
+
+	req.Header.Set("Content-Type", "application/json")
+	if e.BasicAuth {
+		req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword)
+	}
+	
+	return req, err
+}
+
+func (e *OpenTsdbExecutor) parseResponse(query OpenTsdbQuery, res *http.Response) (map[string]*tsdb.QueryResult, error) {
+
+	queryResults := make(map[string]*tsdb.QueryResult)
+	queryRes := tsdb.NewQueryResult()
+
+	body, err := ioutil.ReadAll(res.Body)
+	defer res.Body.Close()
+	if err != nil {
+		return nil, err
+	}
+
+	if res.StatusCode/100 != 2 {
+		plog.Info("Request failed", "status", res.Status, "body", string(body))
+		return nil, fmt.Errorf("Request failed status: %v", res.Status)
+	}
+
+	var data []OpenTsdbResponse
+	err = json.Unmarshal(body, &data)
+	if err != nil {
+		plog.Info("Failed to unmarshal opentsdb response", "error", err, "status", res.Status, "body", string(body))
+		return nil, err
+	}
+
+	for _, val := range data {
+		series := tsdb.TimeSeries{
+			Name: val.Metric,
+		}
+
+		for timeString, value := range val.DataPoints {
+			timestamp, err := strconv.ParseFloat(timeString, 64)
+			if err != nil {
+				plog.Info("Failed to unmarshal opentsdb timestamp", "timestamp", timeString)
+				return nil, err
+			}
+			series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(value), timestamp))
+		}
+
+		queryRes.Series = append(queryRes.Series, &series)
+	}
+
+	queryResults["A"] = queryRes
+	return queryResults, nil
+}
+
+func (e *OpenTsdbExecutor) buildMetric(query *tsdb.Query) (map[string]interface{}) {
+
+	metric := make(map[string]interface{})
+
+		// Setting metric and aggregator		
+		metric["metric"] = query.Model.Get("metric").MustString()
+		metric["aggregator"] = query.Model.Get("aggregator").MustString()
+
+		// Setting downsampling options
+		disableDownsampling := query.Model.Get("disableDownsampling").MustBool()
+		if !disableDownsampling {
+			downsampleInterval := query.Model.Get("downsampleInterval").MustString()
+			if downsampleInterval == "" {
+				downsampleInterval = "1m"  //default value for blank
+			}
+			downsample :=  downsampleInterval + "-" + query.Model.Get("downsampleAggregator").MustString()
+			if query.Model.Get("downsampleFillPolicy").MustString() != "none" {
+				metric["downsample"] = downsample + "-" + query.Model.Get("downsampleFillPolicy").MustString()
+			} else {
+				metric["downsample"] = downsample
+			}
+		}
+
+		// Setting rate options
+		if query.Model.Get("shouldComputeRate").MustBool() {
+			
+			metric["rate"] = true
+			rateOptions := make(map[string]interface{})
+			rateOptions["counter"] = query.Model.Get("isCounter").MustBool()
+
+			counterMax, counterMaxCheck := query.Model.CheckGet("counterMax")
+			if counterMaxCheck {
+				rateOptions["counterMax"] = counterMax.MustFloat64()
+			}
+			
+			resetValue, resetValueCheck := query.Model.CheckGet("counterResetValue")
+			if resetValueCheck {
+				rateOptions["resetValue"] = resetValue.MustFloat64()
+			}
+
+			metric["rateOptions"] = rateOptions
+		}
+
+		// Setting tags
+		tags, tagsCheck := query.Model.CheckGet("tags")
+		if tagsCheck && len(tags.MustMap()) > 0 {
+			metric["tags"] = tags.MustMap()
+		}
+
+		// Setting filters
+		filters, filtersCheck := query.Model.CheckGet("filters")
+		if filtersCheck && len(filters.MustArray()) > 0 {
+			metric["filters"] = filters.MustArray()
+		}
+
+		return metric
+
+}

+ 176 - 0
pkg/tsdb/opentsdb/opentsdb_test.go

@@ -0,0 +1,176 @@
+package opentsdb
+
+import (
+	"testing"
+
+	"github.com/grafana/grafana/pkg/tsdb"
+	. "github.com/smartystreets/goconvey/convey"
+	"github.com/grafana/grafana/pkg/components/simplejson"
+)
+
+func TestOpenTsdbExecutor(t *testing.T) {
+	Convey("OpenTsdb query testing", t, func() {
+
+		exec := &OpenTsdbExecutor{}
+
+		Convey("Build metric with downsampling enabled", func() {
+
+			query := &tsdb.Query{
+				Model: simplejson.New(),
+			}
+
+			query.Model.Set("metric", "cpu.average.percent")
+			query.Model.Set("aggregator", "avg")
+			query.Model.Set("disableDownsampling", false)
+			query.Model.Set("downsampleInterval", "")
+			query.Model.Set("downsampleAggregator","avg")
+			query.Model.Set("downsampleFillPolicy","none")
+
+			metric := exec.buildMetric(query)
+
+			So(len(metric), ShouldEqual, 3)
+			So(metric["metric"], ShouldEqual, "cpu.average.percent")
+			So(metric["aggregator"], ShouldEqual, "avg")
+			So(metric["downsample"], ShouldEqual, "1m-avg")
+
+		})
+
+		Convey("Build metric with downsampling diabled", func() {
+
+			query := &tsdb.Query{
+				Model: simplejson.New(),
+			}
+
+			query.Model.Set("metric", "cpu.average.percent")
+			query.Model.Set("aggregator", "avg")
+			query.Model.Set("disableDownsampling", true)
+			query.Model.Set("downsampleInterval", "")
+			query.Model.Set("downsampleAggregator","avg")
+			query.Model.Set("downsampleFillPolicy","none")
+
+			metric := exec.buildMetric(query)
+
+			So(len(metric), ShouldEqual, 2)
+			So(metric["metric"], ShouldEqual, "cpu.average.percent")
+			So(metric["aggregator"], ShouldEqual, "avg")
+			
+		})
+
+		Convey("Build metric with downsampling enabled with params", func() {
+
+			query := &tsdb.Query{
+				Model: simplejson.New(),
+			}
+
+			query.Model.Set("metric", "cpu.average.percent")
+			query.Model.Set("aggregator", "avg")
+			query.Model.Set("disableDownsampling", false)
+			query.Model.Set("downsampleInterval", "5m")
+			query.Model.Set("downsampleAggregator","sum")
+			query.Model.Set("downsampleFillPolicy","null")
+
+			metric := exec.buildMetric(query)
+
+			So(len(metric), ShouldEqual, 3)
+			So(metric["metric"], ShouldEqual, "cpu.average.percent")
+			So(metric["aggregator"], ShouldEqual, "avg")
+			So(metric["downsample"], ShouldEqual, "5m-sum-null")
+		})
+
+		Convey("Build metric with tags with downsampling disabled", func() {
+
+			query := &tsdb.Query{
+				Model: simplejson.New(),
+			}
+
+			query.Model.Set("metric", "cpu.average.percent")
+			query.Model.Set("aggregator", "avg")
+			query.Model.Set("disableDownsampling", true)
+			query.Model.Set("downsampleInterval", "5m")
+			query.Model.Set("downsampleAggregator","sum")
+			query.Model.Set("downsampleFillPolicy","null")
+
+			tags := simplejson.New()
+			tags.Set("env", "prod")
+			tags.Set("app", "grafana")
+			query.Model.Set("tags", tags.MustMap())
+
+			metric := exec.buildMetric(query)
+
+			So(len(metric), ShouldEqual, 3)
+			So(metric["metric"], ShouldEqual, "cpu.average.percent")
+			So(metric["aggregator"], ShouldEqual, "avg")
+			So(metric["downsample"], ShouldEqual, nil)
+			So(len(metric["tags"].(map[string]interface{})), ShouldEqual, 2)
+			So(metric["tags"].(map[string]interface{})["env"], ShouldEqual, "prod")
+			So(metric["tags"].(map[string]interface{})["app"], ShouldEqual, "grafana")
+			So(metric["tags"].(map[string]interface{})["ip"], ShouldEqual, nil)
+		})
+
+		Convey("Build metric with rate enabled but counter disabled", func() {
+
+			query := &tsdb.Query{
+				Model: simplejson.New(),
+			}
+
+			query.Model.Set("metric", "cpu.average.percent")
+			query.Model.Set("aggregator", "avg")
+			query.Model.Set("disableDownsampling", true)
+			query.Model.Set("shouldComputeRate", true)
+			query.Model.Set("isCounter",false)
+			
+			tags := simplejson.New()
+			tags.Set("env", "prod")
+			tags.Set("app", "grafana")
+			query.Model.Set("tags", tags.MustMap())
+
+			metric := exec.buildMetric(query)
+
+			So(len(metric), ShouldEqual, 5)
+			So(metric["metric"], ShouldEqual, "cpu.average.percent")
+			So(metric["aggregator"], ShouldEqual, "avg")
+			So(len(metric["tags"].(map[string]interface{})), ShouldEqual, 2)
+			So(metric["tags"].(map[string]interface{})["env"], ShouldEqual, "prod")
+			So(metric["tags"].(map[string]interface{})["app"], ShouldEqual, "grafana")
+			So(metric["tags"].(map[string]interface{})["ip"], ShouldEqual, nil)
+			So(metric["rate"], ShouldEqual, true)
+			So(metric["rateOptions"].(map[string]interface{})["counter"], ShouldEqual, false)
+		})
+
+		Convey("Build metric with rate and counter enabled", func() {
+
+			query := &tsdb.Query{
+				Model: simplejson.New(),
+			}
+
+			query.Model.Set("metric", "cpu.average.percent")
+			query.Model.Set("aggregator", "avg")
+			query.Model.Set("disableDownsampling", true)
+			query.Model.Set("shouldComputeRate", true)
+			query.Model.Set("isCounter",true)
+			query.Model.Set("counterMax",45)
+			query.Model.Set("counterResetValue",60)
+
+			tags := simplejson.New()
+			tags.Set("env", "prod")
+			tags.Set("app", "grafana")
+			query.Model.Set("tags", tags.MustMap())
+
+			metric := exec.buildMetric(query)
+
+			So(len(metric), ShouldEqual, 5)
+			So(metric["metric"], ShouldEqual, "cpu.average.percent")
+			So(metric["aggregator"], ShouldEqual, "avg")
+			So(len(metric["tags"].(map[string]interface{})), ShouldEqual, 2)
+			So(metric["tags"].(map[string]interface{})["env"], ShouldEqual, "prod")
+			So(metric["tags"].(map[string]interface{})["app"], ShouldEqual, "grafana")
+			So(metric["tags"].(map[string]interface{})["ip"], ShouldEqual, nil)
+			So(metric["rate"], ShouldEqual, true)
+			So(len(metric["rateOptions"].(map[string]interface{})), ShouldEqual, 3)
+			So(metric["rateOptions"].(map[string]interface{})["counter"], ShouldEqual, true)
+			So(metric["rateOptions"].(map[string]interface{})["counterMax"], ShouldEqual, 45)
+			So(metric["rateOptions"].(map[string]interface{})["resetValue"], ShouldEqual, 60)
+		})
+
+	})
+}

+ 12 - 0
pkg/tsdb/opentsdb/types.go

@@ -0,0 +1,12 @@
+package opentsdb
+
+type OpenTsdbQuery struct {
+	Start	   int64     				         `json:"start"`
+	End		   int64   	 				 				 `json:"end"`
+	Queries  []map[string]interface{}  `json:"queries"`
+}
+
+type OpenTsdbResponse struct {
+  Metric     string              `json:"metric"`
+  DataPoints map[string]float64  `json:"dps"`
+}

+ 20 - 0
public/app/plugins/datasource/opentsdb/datasource.js

@@ -102,6 +102,26 @@ function (angular, _, dateMath) {
       }.bind(this));
       }.bind(this));
     };
     };
 
 
+    this.targetContainsTemplate = function(target) {
+      if (target.filters && target.filters.length > 0) {
+        for (var i = 0; i < target.filters.length; i++) {
+          if (templateSrv.variableExists(target.filters[i].filter)) {
+            return true;
+          }
+        }
+      }
+
+      if (target.tags && Object.keys(target.tags).length > 0) {
+        for (var tagKey in target.tags) {
+          if (templateSrv.variableExists(target.tags[tagKey])) {
+            return true;
+          }
+        }
+      }
+
+      return false;
+    };
+
     this.performTimeSeriesQuery = function(queries, start, end) {
     this.performTimeSeriesQuery = function(queries, start, end) {
       var msResolution = false;
       var msResolution = false;
       if (this.tsdbResolution === 2) {
       if (this.tsdbResolution === 2) {

+ 1 - 0
public/app/plugins/datasource/opentsdb/plugin.json

@@ -6,6 +6,7 @@
   "metrics": true,
   "metrics": true,
   "defaultMatchFormat": "pipe",
   "defaultMatchFormat": "pipe",
   "annotations": true,
   "annotations": true,
+  "alerting": true,
 
 
   "info": {
   "info": {
     "author": {
     "author": {