azuremonitor-datasource.go 12 KB


  1. package azuremonitor
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "net/url"
  10. "path"
  11. "strings"
  12. "time"
  13. "github.com/grafana/grafana/pkg/api/pluginproxy"
  14. "github.com/grafana/grafana/pkg/models"
  15. "github.com/grafana/grafana/pkg/plugins"
  16. "github.com/grafana/grafana/pkg/setting"
  17. opentracing "github.com/opentracing/opentracing-go"
  18. "golang.org/x/net/context/ctxhttp"
  19. "github.com/grafana/grafana/pkg/components/null"
  20. "github.com/grafana/grafana/pkg/components/simplejson"
  21. "github.com/grafana/grafana/pkg/tsdb"
  22. )
  23. // AzureMonitorDatasource calls the Azure Monitor API - one of the four API's supported
  24. type AzureMonitorDatasource struct {
  25. httpClient *http.Client
  26. dsInfo *models.DataSource
  27. }
  28. var (
  29. // 1m, 5m, 15m, 30m, 1h, 6h, 12h, 1d in milliseconds
  30. defaultAllowedIntervalsMS = []int64{60000, 300000, 900000, 1800000, 3600000, 21600000, 43200000, 86400000}
  31. )
  32. // executeTimeSeriesQuery does the following:
  33. // 1. build the AzureMonitor url and querystring for each query
  34. // 2. executes each query by calling the Azure Monitor API
  35. // 3. parses the responses for each query into the timeseries format
  36. func (e *AzureMonitorDatasource) executeTimeSeriesQuery(ctx context.Context, originalQueries []*tsdb.Query, timeRange *tsdb.TimeRange) (*tsdb.Response, error) {
  37. result := &tsdb.Response{
  38. Results: map[string]*tsdb.QueryResult{},
  39. }
  40. queries, err := e.buildQueries(originalQueries, timeRange)
  41. if err != nil {
  42. return nil, err
  43. }
  44. for _, query := range queries {
  45. queryRes, resp, err := e.executeQuery(ctx, query, originalQueries, timeRange)
  46. if err != nil {
  47. return nil, err
  48. }
  49. // azlog.Debug("AzureMonitor", "Response", resp)
  50. err = e.parseResponse(queryRes, resp, query)
  51. if err != nil {
  52. queryRes.Error = err
  53. }
  54. result.Results[query.RefID] = queryRes
  55. }
  56. return result, nil
  57. }
  58. func (e *AzureMonitorDatasource) buildQueries(queries []*tsdb.Query, timeRange *tsdb.TimeRange) ([]*AzureMonitorQuery, error) {
  59. azureMonitorQueries := []*AzureMonitorQuery{}
  60. startTime, err := timeRange.ParseFrom()
  61. if err != nil {
  62. return nil, err
  63. }
  64. endTime, err := timeRange.ParseTo()
  65. if err != nil {
  66. return nil, err
  67. }
  68. for _, query := range queries {
  69. var target string
  70. azureMonitorTarget := query.Model.Get("azureMonitor").MustMap()
  71. azlog.Debug("AzureMonitor", "target", azureMonitorTarget)
  72. urlComponents := map[string]string{}
  73. urlComponents["subscription"] = fmt.Sprintf("%v", query.Model.Get("subscription").MustString())
  74. urlComponents["resourceGroup"] = fmt.Sprintf("%v", azureMonitorTarget["resourceGroup"])
  75. urlComponents["metricDefinition"] = fmt.Sprintf("%v", azureMonitorTarget["metricDefinition"])
  76. urlComponents["resourceName"] = fmt.Sprintf("%v", azureMonitorTarget["resourceName"])
  77. ub := urlBuilder{
  78. DefaultSubscription: query.DataSource.JsonData.Get("subscriptionId").MustString(),
  79. Subscription: urlComponents["subscription"],
  80. ResourceGroup: urlComponents["resourceGroup"],
  81. MetricDefinition: urlComponents["metricDefinition"],
  82. ResourceName: urlComponents["resourceName"],
  83. }
  84. azureURL := ub.Build()
  85. alias := ""
  86. if val, ok := azureMonitorTarget["alias"]; ok {
  87. alias = fmt.Sprintf("%v", val)
  88. }
  89. timeGrain := fmt.Sprintf("%v", azureMonitorTarget["timeGrain"])
  90. timeGrains := azureMonitorTarget["allowedTimeGrainsMs"]
  91. if timeGrain == "auto" {
  92. timeGrain, err = e.setAutoTimeGrain(query.IntervalMs, timeGrains)
  93. if err != nil {
  94. return nil, err
  95. }
  96. }
  97. params := url.Values{}
  98. params.Add("api-version", "2018-01-01")
  99. params.Add("timespan", fmt.Sprintf("%v/%v", startTime.UTC().Format(time.RFC3339), endTime.UTC().Format(time.RFC3339)))
  100. params.Add("interval", timeGrain)
  101. params.Add("aggregation", fmt.Sprintf("%v", azureMonitorTarget["aggregation"]))
  102. params.Add("metricnames", fmt.Sprintf("%v", azureMonitorTarget["metricName"]))
  103. dimension := strings.TrimSpace(fmt.Sprintf("%v", azureMonitorTarget["dimension"]))
  104. dimensionFilter := strings.TrimSpace(fmt.Sprintf("%v", azureMonitorTarget["dimensionFilter"]))
  105. if azureMonitorTarget["dimension"] != nil && azureMonitorTarget["dimensionFilter"] != nil && len(dimension) > 0 && len(dimensionFilter) > 0 && dimension != "None" {
  106. params.Add("$filter", fmt.Sprintf("%s eq '%s'", dimension, dimensionFilter))
  107. }
  108. target = params.Encode()
  109. if setting.Env == setting.DEV {
  110. azlog.Debug("Azuremonitor request", "params", params)
  111. }
  112. azureMonitorQueries = append(azureMonitorQueries, &AzureMonitorQuery{
  113. URL: azureURL,
  114. UrlComponents: urlComponents,
  115. Target: target,
  116. Params: params,
  117. RefID: query.RefId,
  118. Alias: alias,
  119. })
  120. }
  121. return azureMonitorQueries, nil
  122. }
  123. // setAutoTimeGrain tries to find the closest interval to the query's intervalMs value
  124. // if the metric has a limited set of possible intervals/time grains then use those
  125. // instead of the default list of intervals
  126. func (e *AzureMonitorDatasource) setAutoTimeGrain(intervalMs int64, timeGrains interface{}) (string, error) {
  127. // parses array of numbers from the timeGrains json field
  128. allowedTimeGrains := []int64{}
  129. tgs, ok := timeGrains.([]interface{})
  130. if ok {
  131. for _, v := range tgs {
  132. jsonNumber, ok := v.(json.Number)
  133. if ok {
  134. tg, err := jsonNumber.Int64()
  135. if err == nil {
  136. allowedTimeGrains = append(allowedTimeGrains, tg)
  137. }
  138. }
  139. }
  140. }
  141. autoInterval := e.findClosestAllowedIntervalMS(intervalMs, allowedTimeGrains)
  142. tg := &TimeGrain{}
  143. autoTimeGrain, err := tg.createISO8601DurationFromIntervalMS(autoInterval)
  144. if err != nil {
  145. return "", err
  146. }
  147. return autoTimeGrain, nil
  148. }
  149. func (e *AzureMonitorDatasource) executeQuery(ctx context.Context, query *AzureMonitorQuery, queries []*tsdb.Query, timeRange *tsdb.TimeRange) (*tsdb.QueryResult, AzureMonitorResponse, error) {
  150. queryResult := &tsdb.QueryResult{Meta: simplejson.New(), RefId: query.RefID}
  151. req, err := e.createRequest(ctx, e.dsInfo)
  152. if err != nil {
  153. queryResult.Error = err
  154. return queryResult, AzureMonitorResponse{}, nil
  155. }
  156. req.URL.Path = path.Join(req.URL.Path, query.URL)
  157. req.URL.RawQuery = query.Params.Encode()
  158. queryResult.Meta.Set("rawQuery", req.URL.RawQuery)
  159. span, ctx := opentracing.StartSpanFromContext(ctx, "azuremonitor query")
  160. span.SetTag("target", query.Target)
  161. span.SetTag("from", timeRange.From)
  162. span.SetTag("until", timeRange.To)
  163. span.SetTag("datasource_id", e.dsInfo.Id)
  164. span.SetTag("org_id", e.dsInfo.OrgId)
  165. defer span.Finish()
  166. opentracing.GlobalTracer().Inject(
  167. span.Context(),
  168. opentracing.HTTPHeaders,
  169. opentracing.HTTPHeadersCarrier(req.Header))
  170. azlog.Debug("AzureMonitor", "Request URL", req.URL.String())
  171. res, err := ctxhttp.Do(ctx, e.httpClient, req)
  172. if err != nil {
  173. queryResult.Error = err
  174. return queryResult, AzureMonitorResponse{}, nil
  175. }
  176. data, err := e.unmarshalResponse(res)
  177. if err != nil {
  178. queryResult.Error = err
  179. return queryResult, AzureMonitorResponse{}, nil
  180. }
  181. return queryResult, data, nil
  182. }
  183. func (e *AzureMonitorDatasource) createRequest(ctx context.Context, dsInfo *models.DataSource) (*http.Request, error) {
  184. // find plugin
  185. plugin, ok := plugins.DataSources[dsInfo.Type]
  186. if !ok {
  187. return nil, errors.New("Unable to find datasource plugin Azure Monitor")
  188. }
  189. var azureMonitorRoute *plugins.AppPluginRoute
  190. for _, route := range plugin.Routes {
  191. if route.Path == "azuremonitor" {
  192. azureMonitorRoute = route
  193. break
  194. }
  195. }
  196. cloudName := dsInfo.JsonData.Get("cloudName").MustString("azuremonitor")
  197. proxyPass := fmt.Sprintf("%s/subscriptions", cloudName)
  198. u, _ := url.Parse(dsInfo.Url)
  199. u.Path = path.Join(u.Path, "render")
  200. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  201. if err != nil {
  202. azlog.Error("Failed to create request", "error", err)
  203. return nil, fmt.Errorf("Failed to create request. error: %v", err)
  204. }
  205. req.Header.Set("Content-Type", "application/json")
  206. req.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
  207. pluginproxy.ApplyRoute(ctx, req, proxyPass, azureMonitorRoute, dsInfo)
  208. return req, nil
  209. }
  210. func (e *AzureMonitorDatasource) unmarshalResponse(res *http.Response) (AzureMonitorResponse, error) {
  211. body, err := ioutil.ReadAll(res.Body)
  212. defer res.Body.Close()
  213. if err != nil {
  214. return AzureMonitorResponse{}, err
  215. }
  216. if res.StatusCode/100 != 2 {
  217. azlog.Error("Request failed", "status", res.Status, "body", string(body))
  218. return AzureMonitorResponse{}, fmt.Errorf(string(body))
  219. }
  220. var data AzureMonitorResponse
  221. err = json.Unmarshal(body, &data)
  222. if err != nil {
  223. azlog.Error("Failed to unmarshal AzureMonitor response", "error", err, "status", res.Status, "body", string(body))
  224. return AzureMonitorResponse{}, err
  225. }
  226. return data, nil
  227. }
  228. func (e *AzureMonitorDatasource) parseResponse(queryRes *tsdb.QueryResult, data AzureMonitorResponse, query *AzureMonitorQuery) error {
  229. if len(data.Value) == 0 {
  230. return nil
  231. }
  232. for _, series := range data.Value[0].Timeseries {
  233. points := []tsdb.TimePoint{}
  234. metadataName := ""
  235. metadataValue := ""
  236. if len(series.Metadatavalues) > 0 {
  237. metadataName = series.Metadatavalues[0].Name.LocalizedValue
  238. metadataValue = series.Metadatavalues[0].Value
  239. }
  240. metricName := formatLegendKey(query.Alias, query.UrlComponents["resourceName"], data.Value[0].Name.LocalizedValue, metadataName, metadataValue, data.Namespace, data.Value[0].ID)
  241. for _, point := range series.Data {
  242. var value float64
  243. switch query.Params.Get("aggregation") {
  244. case "Average":
  245. value = point.Average
  246. case "Total":
  247. value = point.Total
  248. case "Maximum":
  249. value = point.Maximum
  250. case "Minimum":
  251. value = point.Minimum
  252. case "Count":
  253. value = point.Count
  254. default:
  255. value = point.Count
  256. }
  257. points = append(points, tsdb.NewTimePoint(null.FloatFrom(value), float64((point.TimeStamp).Unix())*1000))
  258. }
  259. queryRes.Series = append(queryRes.Series, &tsdb.TimeSeries{
  260. Name: metricName,
  261. Points: points,
  262. })
  263. }
  264. queryRes.Meta.Set("unit", data.Value[0].Unit)
  265. return nil
  266. }
  267. // findClosestAllowedIntervalMs is used for the auto time grain setting.
  268. // It finds the closest time grain from the list of allowed time grains for Azure Monitor
  269. // using the Grafana interval in milliseconds
  270. // Some metrics only allow a limited list of time grains. The allowedTimeGrains parameter
  271. // allows overriding the default list of allowed time grains.
  272. func (e *AzureMonitorDatasource) findClosestAllowedIntervalMS(intervalMs int64, allowedTimeGrains []int64) int64 {
  273. allowedIntervals := defaultAllowedIntervalsMS
  274. if len(allowedTimeGrains) > 0 {
  275. allowedIntervals = allowedTimeGrains
  276. }
  277. closest := allowedIntervals[0]
  278. for i, allowed := range allowedIntervals {
  279. if intervalMs > allowed {
  280. if i+1 < len(allowedIntervals) {
  281. closest = allowedIntervals[i+1]
  282. } else {
  283. closest = allowed
  284. }
  285. }
  286. }
  287. return closest
  288. }
  289. // formatLegendKey builds the legend key or timeseries name
  290. // Alias patterns like {{resourcename}} are replaced with the appropriate data values.
  291. func formatLegendKey(alias string, resourceName string, metricName string, metadataName string, metadataValue string, namespace string, seriesID string) string {
  292. if alias == "" {
  293. if len(metadataName) > 0 {
  294. return fmt.Sprintf("%s{%s=%s}.%s", resourceName, metadataName, metadataValue, metricName)
  295. }
  296. return fmt.Sprintf("%s.%s", resourceName, metricName)
  297. }
  298. startIndex := strings.Index(seriesID, "/resourceGroups/") + 16
  299. endIndex := strings.Index(seriesID, "/providers")
  300. resourceGroup := seriesID[startIndex:endIndex]
  301. result := legendKeyFormat.ReplaceAllFunc([]byte(alias), func(in []byte) []byte {
  302. metaPartName := strings.Replace(string(in), "{{", "", 1)
  303. metaPartName = strings.Replace(metaPartName, "}}", "", 1)
  304. metaPartName = strings.ToLower(strings.TrimSpace(metaPartName))
  305. if metaPartName == "resourcegroup" {
  306. return []byte(resourceGroup)
  307. }
  308. if metaPartName == "namespace" {
  309. return []byte(namespace)
  310. }
  311. if metaPartName == "resourcename" {
  312. return []byte(resourceName)
  313. }
  314. if metaPartName == "metric" {
  315. return []byte(metricName)
  316. }
  317. if metaPartName == "dimensionname" {
  318. return []byte(metadataName)
  319. }
  320. if metaPartName == "dimensionvalue" {
  321. return []byte(metadataValue)
  322. }
  323. return in
  324. })
  325. return string(result)
  326. }