azuremonitor-datasource.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. package azuremonitor
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "net/url"
  10. "path"
  11. "strings"
  12. "time"
  13. "github.com/grafana/grafana/pkg/api/pluginproxy"
  14. "github.com/grafana/grafana/pkg/models"
  15. "github.com/grafana/grafana/pkg/plugins"
  16. "github.com/grafana/grafana/pkg/setting"
  17. opentracing "github.com/opentracing/opentracing-go"
  18. "golang.org/x/net/context/ctxhttp"
  19. "github.com/grafana/grafana/pkg/components/null"
  20. "github.com/grafana/grafana/pkg/components/simplejson"
  21. "github.com/grafana/grafana/pkg/tsdb"
  22. )
  23. // AzureMonitorDatasource calls the Azure Monitor API - one of the four API's supported
  24. type AzureMonitorDatasource struct {
  25. httpClient *http.Client
  26. dsInfo *models.DataSource
  27. }
  28. // executeTimeSeriesQuery does the following:
  29. // 1. build the AzureMonitor url and querystring for each query
  30. // 2. executes each query by calling the Azure Monitor API
  31. // 3. parses the responses for each query into the timeseries format
  32. func (e *AzureMonitorDatasource) executeTimeSeriesQuery(ctx context.Context, originalQueries []*tsdb.Query, timeRange *tsdb.TimeRange) (*tsdb.Response, error) {
  33. result := &tsdb.Response{
  34. Results: make(map[string]*tsdb.QueryResult),
  35. }
  36. queries, err := e.buildQueries(originalQueries, timeRange)
  37. if err != nil {
  38. return nil, err
  39. }
  40. for _, query := range queries {
  41. queryRes, resp, err := e.executeQuery(ctx, query, originalQueries, timeRange)
  42. if err != nil {
  43. return nil, err
  44. }
  45. azlog.Debug("AzureMonitor", "Response", resp)
  46. err = e.parseResponse(queryRes, resp, query)
  47. if err != nil {
  48. queryRes.Error = err
  49. }
  50. result.Results[query.RefID] = queryRes
  51. }
  52. return result, nil
  53. }
  54. func (e *AzureMonitorDatasource) buildQueries(queries []*tsdb.Query, timeRange *tsdb.TimeRange) ([]*AzureMonitorQuery, error) {
  55. azureMonitorQueries := []*AzureMonitorQuery{}
  56. startTime, err := timeRange.ParseFrom()
  57. if err != nil {
  58. return nil, err
  59. }
  60. endTime, err := timeRange.ParseTo()
  61. if err != nil {
  62. return nil, err
  63. }
  64. for _, query := range queries {
  65. var target string
  66. azureMonitorTarget := query.Model.Get("azureMonitor").MustMap()
  67. azlog.Debug("AzureMonitor", "target", azureMonitorTarget)
  68. urlComponents := make(map[string]string)
  69. urlComponents["resourceGroup"] = fmt.Sprintf("%v", azureMonitorTarget["resourceGroup"])
  70. urlComponents["metricDefinition"] = fmt.Sprintf("%v", azureMonitorTarget["metricDefinition"])
  71. urlComponents["resourceName"] = fmt.Sprintf("%v", azureMonitorTarget["resourceName"])
  72. ub := URLBuilder{
  73. ResourceGroup: urlComponents["resourceGroup"],
  74. MetricDefinition: urlComponents["metricDefinition"],
  75. ResourceName: urlComponents["resourceName"],
  76. }
  77. azureURL := ub.Build()
  78. alias := fmt.Sprintf("%v", azureMonitorTarget["alias"])
  79. params := url.Values{}
  80. params.Add("api-version", "2018-01-01")
  81. params.Add("timespan", fmt.Sprintf("%v/%v", startTime.UTC().Format(time.RFC3339), endTime.UTC().Format(time.RFC3339)))
  82. params.Add("interval", fmt.Sprintf("%v", azureMonitorTarget["timeGrain"]))
  83. params.Add("aggregation", fmt.Sprintf("%v", azureMonitorTarget["aggregation"]))
  84. params.Add("metricnames", fmt.Sprintf("%v", azureMonitorTarget["metricName"]))
  85. dimension := strings.TrimSpace(fmt.Sprintf("%v", azureMonitorTarget["dimension"]))
  86. dimensionFilter := strings.TrimSpace(fmt.Sprintf("%v", azureMonitorTarget["dimensionFilter"]))
  87. if azureMonitorTarget["dimension"] != nil && azureMonitorTarget["dimensionFilter"] != nil && len(dimension) > 0 && len(dimensionFilter) > 0 {
  88. params.Add("$filter", fmt.Sprintf("%s eq '%s'", dimension, dimensionFilter))
  89. }
  90. target = params.Encode()
  91. if setting.Env == setting.DEV {
  92. azlog.Debug("Azuremonitor request", "params", params)
  93. }
  94. azureMonitorQueries = append(azureMonitorQueries, &AzureMonitorQuery{
  95. URL: azureURL,
  96. UrlComponents: urlComponents,
  97. Target: target,
  98. Params: params,
  99. RefID: query.RefId,
  100. Alias: alias,
  101. })
  102. }
  103. return azureMonitorQueries, nil
  104. }
  105. func (e *AzureMonitorDatasource) executeQuery(ctx context.Context, query *AzureMonitorQuery, queries []*tsdb.Query, timeRange *tsdb.TimeRange) (*tsdb.QueryResult, AzureMonitorResponse, error) {
  106. queryResult := &tsdb.QueryResult{Meta: simplejson.New(), RefId: query.RefID}
  107. req, err := e.createRequest(ctx, e.dsInfo)
  108. if err != nil {
  109. queryResult.Error = err
  110. return queryResult, AzureMonitorResponse{}, nil
  111. }
  112. req.URL.Path = path.Join(req.URL.Path, query.URL)
  113. req.URL.RawQuery = query.Params.Encode()
  114. queryResult.Meta.Set("rawQuery", req.URL.RawQuery)
  115. span, ctx := opentracing.StartSpanFromContext(ctx, "azuremonitor query")
  116. span.SetTag("target", query.Target)
  117. span.SetTag("from", timeRange.From)
  118. span.SetTag("until", timeRange.To)
  119. span.SetTag("datasource_id", e.dsInfo.Id)
  120. span.SetTag("org_id", e.dsInfo.OrgId)
  121. defer span.Finish()
  122. opentracing.GlobalTracer().Inject(
  123. span.Context(),
  124. opentracing.HTTPHeaders,
  125. opentracing.HTTPHeadersCarrier(req.Header))
  126. azlog.Debug("AzureMonitor", "Request URL", req.URL.String())
  127. res, err := ctxhttp.Do(ctx, e.httpClient, req)
  128. if err != nil {
  129. queryResult.Error = err
  130. return queryResult, AzureMonitorResponse{}, nil
  131. }
  132. data, err := e.unmarshalResponse(res)
  133. if err != nil {
  134. queryResult.Error = err
  135. return queryResult, AzureMonitorResponse{}, nil
  136. }
  137. return queryResult, data, nil
  138. }
  139. func (e *AzureMonitorDatasource) createRequest(ctx context.Context, dsInfo *models.DataSource) (*http.Request, error) {
  140. // find plugin
  141. plugin, ok := plugins.DataSources[dsInfo.Type]
  142. if !ok {
  143. return nil, errors.New("Unable to find datasource plugin Azure Monitor")
  144. }
  145. var azureMonitorRoute *plugins.AppPluginRoute
  146. for _, route := range plugin.Routes {
  147. if route.Path == "azuremonitor" {
  148. azureMonitorRoute = route
  149. break
  150. }
  151. }
  152. cloudName := dsInfo.JsonData.Get("cloudName").MustString("azuremonitor")
  153. subscriptionID := dsInfo.JsonData.Get("subscriptionId").MustString()
  154. proxyPass := fmt.Sprintf("%s/subscriptions/%s", cloudName, subscriptionID)
  155. u, _ := url.Parse(dsInfo.Url)
  156. u.Path = path.Join(u.Path, "render")
  157. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  158. if err != nil {
  159. azlog.Error("Failed to create request", "error", err)
  160. return nil, fmt.Errorf("Failed to create request. error: %v", err)
  161. }
  162. req.Header.Set("Content-Type", "application/json")
  163. req.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
  164. pluginproxy.ApplyRoute(ctx, req, proxyPass, azureMonitorRoute, dsInfo)
  165. return req, nil
  166. }
  167. func (e *AzureMonitorDatasource) unmarshalResponse(res *http.Response) (AzureMonitorResponse, error) {
  168. body, err := ioutil.ReadAll(res.Body)
  169. defer res.Body.Close()
  170. if err != nil {
  171. return AzureMonitorResponse{}, err
  172. }
  173. if res.StatusCode/100 != 2 {
  174. azlog.Error("Request failed", "status", res.Status, "body", string(body))
  175. return AzureMonitorResponse{}, fmt.Errorf(string(body))
  176. }
  177. var data AzureMonitorResponse
  178. err = json.Unmarshal(body, &data)
  179. if err != nil {
  180. azlog.Error("Failed to unmarshal AzureMonitor response", "error", err, "status", res.Status, "body", string(body))
  181. return AzureMonitorResponse{}, err
  182. }
  183. return data, nil
  184. }
  185. func (e *AzureMonitorDatasource) parseResponse(queryRes *tsdb.QueryResult, data AzureMonitorResponse, query *AzureMonitorQuery) error {
  186. if len(data.Value) == 0 {
  187. return nil
  188. }
  189. for _, series := range data.Value[0].Timeseries {
  190. points := make([]tsdb.TimePoint, 0)
  191. metadataName := ""
  192. metadataValue := ""
  193. if len(series.Metadatavalues) > 0 {
  194. metadataName = series.Metadatavalues[0].Name.LocalizedValue
  195. metadataValue = series.Metadatavalues[0].Value
  196. }
  197. defaultMetricName := formatLegendKey(query.UrlComponents["resourceName"], data.Value[0].Name.LocalizedValue, metadataName, metadataValue)
  198. for _, point := range series.Data {
  199. var value float64
  200. switch query.Params.Get("aggregation") {
  201. case "Average":
  202. value = point.Average
  203. case "Total":
  204. value = point.Total
  205. case "Maximum":
  206. value = point.Maximum
  207. case "Minimum":
  208. value = point.Minimum
  209. case "Count":
  210. value = point.Count
  211. default:
  212. value = point.Count
  213. }
  214. points = append(points, tsdb.NewTimePoint(null.FloatFrom(value), float64((point.TimeStamp).Unix())*1000))
  215. }
  216. queryRes.Series = append(queryRes.Series, &tsdb.TimeSeries{
  217. Name: defaultMetricName,
  218. Points: points,
  219. })
  220. }
  221. return nil
  222. }