azuremonitor-datasource.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. package azuremonitor
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "net/url"
  10. "path"
  11. "strings"
  12. "time"
  13. "github.com/grafana/grafana/pkg/api/pluginproxy"
  14. "github.com/grafana/grafana/pkg/models"
  15. "github.com/grafana/grafana/pkg/plugins"
  16. "github.com/grafana/grafana/pkg/setting"
  17. opentracing "github.com/opentracing/opentracing-go"
  18. "golang.org/x/net/context/ctxhttp"
  19. "github.com/grafana/grafana/pkg/components/null"
  20. "github.com/grafana/grafana/pkg/components/simplejson"
  21. "github.com/grafana/grafana/pkg/tsdb"
  22. )
  23. // AzureMonitorDatasource calls the Azure Monitor API - one of the four API's supported
  24. type AzureMonitorDatasource struct {
  25. httpClient *http.Client
  26. dsInfo *models.DataSource
  27. }
  28. var (
  29. // 1m, 5m, 15m, 30m, 1h, 6h, 12h, 1d in milliseconds
  30. defaultAllowedIntervalsMS = []int64{60000, 300000, 900000, 1800000, 3600000, 21600000, 43200000, 86400000}
  31. )
  32. // executeTimeSeriesQuery does the following:
  33. // 1. build the AzureMonitor url and querystring for each query
  34. // 2. executes each query by calling the Azure Monitor API
  35. // 3. parses the responses for each query into the timeseries format
  36. func (e *AzureMonitorDatasource) executeTimeSeriesQuery(ctx context.Context, originalQueries []*tsdb.Query, timeRange *tsdb.TimeRange) (*tsdb.Response, error) {
  37. result := &tsdb.Response{
  38. Results: map[string]*tsdb.QueryResult{},
  39. }
  40. queries, err := e.buildQueries(originalQueries, timeRange)
  41. if err != nil {
  42. return nil, err
  43. }
  44. for _, query := range queries {
  45. queryRes, resp, err := e.executeQuery(ctx, query, originalQueries, timeRange)
  46. if err != nil {
  47. return nil, err
  48. }
  49. // azlog.Debug("AzureMonitor", "Response", resp)
  50. err = e.parseResponse(queryRes, resp, query)
  51. if err != nil {
  52. queryRes.Error = err
  53. }
  54. if val, ok := result.Results[query.RefID]; ok {
  55. val.Series = append(result.Results[query.RefID].Series, queryRes.Series...)
  56. } else {
  57. result.Results[query.RefID] = queryRes
  58. }
  59. }
  60. return result, nil
  61. }
  62. func (e *AzureMonitorDatasource) buildQueries(queries []*tsdb.Query, timeRange *tsdb.TimeRange) ([]*AzureMonitorQuery, error) {
  63. azureMonitorQueries := []*AzureMonitorQuery{}
  64. startTime, err := timeRange.ParseFrom()
  65. if err != nil {
  66. return nil, err
  67. }
  68. endTime, err := timeRange.ParseTo()
  69. if err != nil {
  70. return nil, err
  71. }
  72. for _, query := range queries {
  73. var target string
  74. azureMonitorTarget := query.Model.Get("azureMonitor").MustMap()
  75. azlog.Debug("AzureMonitor", "target", azureMonitorTarget)
  76. queryMode := fmt.Sprintf("%v", azureMonitorTarget["queryMode"])
  77. if queryMode == "crossResource" {
  78. return nil, fmt.Errorf("Alerting not supported for multiple resource queries")
  79. }
  80. var azureMonitorData map[string]interface{}
  81. if queryMode == "singleResource" {
  82. azureMonitorData = azureMonitorTarget["data"].(map[string]interface{})[queryMode].(map[string]interface{})
  83. } else {
  84. azureMonitorData = azureMonitorTarget
  85. }
  86. urlComponents := map[string]string{}
  87. urlComponents["subscription"] = fmt.Sprintf("%v", query.Model.Get("subscription").MustString())
  88. urlComponents["resourceGroup"] = fmt.Sprintf("%v", azureMonitorData["resourceGroup"])
  89. urlComponents["metricDefinition"] = fmt.Sprintf("%v", azureMonitorData["metricDefinition"])
  90. urlComponents["resourceName"] = fmt.Sprintf("%v", azureMonitorData["resourceName"])
  91. ub := urlBuilder{
  92. DefaultSubscription: query.DataSource.JsonData.Get("subscriptionId").MustString(),
  93. Subscription: urlComponents["subscription"],
  94. ResourceGroup: urlComponents["resourceGroup"],
  95. MetricDefinition: urlComponents["metricDefinition"],
  96. ResourceName: urlComponents["resourceName"],
  97. }
  98. azureURL := ub.Build()
  99. alias := ""
  100. if val, ok := azureMonitorData["alias"]; ok {
  101. alias = fmt.Sprintf("%v", val)
  102. }
  103. timeGrain := fmt.Sprintf("%v", azureMonitorData["timeGrain"])
  104. timeGrains := azureMonitorData["allowedTimeGrainsMs"]
  105. if timeGrain == "auto" {
  106. timeGrain, err = e.setAutoTimeGrain(query.IntervalMs, timeGrains)
  107. if err != nil {
  108. return nil, err
  109. }
  110. }
  111. params := url.Values{}
  112. params.Add("api-version", "2018-01-01")
  113. params.Add("timespan", fmt.Sprintf("%v/%v", startTime.UTC().Format(time.RFC3339), endTime.UTC().Format(time.RFC3339)))
  114. params.Add("interval", timeGrain)
  115. params.Add("aggregation", fmt.Sprintf("%v", azureMonitorData["aggregation"]))
  116. params.Add("metricnames", fmt.Sprintf("%v", azureMonitorData["metricName"]))
  117. if val, ok := azureMonitorData["metricNamespace"]; ok {
  118. params.Add("metricnamespace", fmt.Sprintf("%v", val))
  119. }
  120. dimension := strings.TrimSpace(fmt.Sprintf("%v", azureMonitorData["dimension"]))
  121. dimensionFilter := strings.TrimSpace(fmt.Sprintf("%v", azureMonitorData["dimensionFilter"]))
  122. if azureMonitorData["dimension"] != nil && azureMonitorData["dimensionFilter"] != nil && len(dimension) > 0 && len(dimensionFilter) > 0 && dimension != "None" {
  123. params.Add("$filter", fmt.Sprintf("%s eq '%s'", dimension, dimensionFilter))
  124. }
  125. target = params.Encode()
  126. if setting.Env == setting.DEV {
  127. azlog.Debug("Azuremonitor request", "params", params)
  128. }
  129. azureMonitorQueries = append(azureMonitorQueries, &AzureMonitorQuery{
  130. URL: azureURL,
  131. UrlComponents: urlComponents,
  132. Target: target,
  133. Params: params,
  134. RefID: query.RefId,
  135. Alias: alias,
  136. })
  137. }
  138. return azureMonitorQueries, nil
  139. }
  140. // setAutoTimeGrain tries to find the closest interval to the query's intervalMs value
  141. // if the metric has a limited set of possible intervals/time grains then use those
  142. // instead of the default list of intervals
  143. func (e *AzureMonitorDatasource) setAutoTimeGrain(intervalMs int64, timeGrains interface{}) (string, error) {
  144. // parses array of numbers from the timeGrains json field
  145. allowedTimeGrains := []int64{}
  146. tgs, ok := timeGrains.([]interface{})
  147. if ok {
  148. for _, v := range tgs {
  149. jsonNumber, ok := v.(json.Number)
  150. if ok {
  151. tg, err := jsonNumber.Int64()
  152. if err == nil {
  153. allowedTimeGrains = append(allowedTimeGrains, tg)
  154. }
  155. }
  156. }
  157. }
  158. autoInterval := e.findClosestAllowedIntervalMS(intervalMs, allowedTimeGrains)
  159. tg := &TimeGrain{}
  160. autoTimeGrain, err := tg.createISO8601DurationFromIntervalMS(autoInterval)
  161. if err != nil {
  162. return "", err
  163. }
  164. return autoTimeGrain, nil
  165. }
  166. func (e *AzureMonitorDatasource) executeQuery(ctx context.Context, query *AzureMonitorQuery, queries []*tsdb.Query, timeRange *tsdb.TimeRange) (*tsdb.QueryResult, AzureMonitorResponse, error) {
  167. queryResult := &tsdb.QueryResult{Meta: simplejson.New(), RefId: query.RefID}
  168. req, err := e.createRequest(ctx, e.dsInfo)
  169. if err != nil {
  170. queryResult.Error = err
  171. return queryResult, AzureMonitorResponse{}, nil
  172. }
  173. req.URL.Path = path.Join(req.URL.Path, query.URL)
  174. req.URL.RawQuery = query.Params.Encode()
  175. queryResult.Meta.Set("rawQuery", req.URL.RawQuery)
  176. span, ctx := opentracing.StartSpanFromContext(ctx, "azuremonitor query")
  177. span.SetTag("target", query.Target)
  178. span.SetTag("from", timeRange.From)
  179. span.SetTag("until", timeRange.To)
  180. span.SetTag("datasource_id", e.dsInfo.Id)
  181. span.SetTag("org_id", e.dsInfo.OrgId)
  182. defer span.Finish()
  183. opentracing.GlobalTracer().Inject(
  184. span.Context(),
  185. opentracing.HTTPHeaders,
  186. opentracing.HTTPHeadersCarrier(req.Header))
  187. azlog.Debug("AzureMonitor", "Request URL", req.URL.String())
  188. res, err := ctxhttp.Do(ctx, e.httpClient, req)
  189. if err != nil {
  190. queryResult.Error = err
  191. return queryResult, AzureMonitorResponse{}, nil
  192. }
  193. data, err := e.unmarshalResponse(res)
  194. if err != nil {
  195. queryResult.Error = err
  196. return queryResult, AzureMonitorResponse{}, nil
  197. }
  198. return queryResult, data, nil
  199. }
  200. func (e *AzureMonitorDatasource) createRequest(ctx context.Context, dsInfo *models.DataSource) (*http.Request, error) {
  201. // find plugin
  202. plugin, ok := plugins.DataSources[dsInfo.Type]
  203. if !ok {
  204. return nil, errors.New("Unable to find datasource plugin Azure Monitor")
  205. }
  206. var azureMonitorRoute *plugins.AppPluginRoute
  207. for _, route := range plugin.Routes {
  208. if route.Path == "azuremonitor" {
  209. azureMonitorRoute = route
  210. break
  211. }
  212. }
  213. cloudName := dsInfo.JsonData.Get("cloudName").MustString("azuremonitor")
  214. proxyPass := fmt.Sprintf("%s/subscriptions", cloudName)
  215. u, _ := url.Parse(dsInfo.Url)
  216. u.Path = path.Join(u.Path, "render")
  217. req, err := http.NewRequest(http.MethodGet, u.String(), nil)
  218. if err != nil {
  219. azlog.Error("Failed to create request", "error", err)
  220. return nil, fmt.Errorf("Failed to create request. error: %v", err)
  221. }
  222. req.Header.Set("Content-Type", "application/json")
  223. req.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
  224. pluginproxy.ApplyRoute(ctx, req, proxyPass, azureMonitorRoute, dsInfo)
  225. return req, nil
  226. }
  227. func (e *AzureMonitorDatasource) unmarshalResponse(res *http.Response) (AzureMonitorResponse, error) {
  228. body, err := ioutil.ReadAll(res.Body)
  229. defer res.Body.Close()
  230. if err != nil {
  231. return AzureMonitorResponse{}, err
  232. }
  233. if res.StatusCode/100 != 2 {
  234. azlog.Error("Request failed", "status", res.Status, "body", string(body))
  235. return AzureMonitorResponse{}, fmt.Errorf(string(body))
  236. }
  237. var data AzureMonitorResponse
  238. err = json.Unmarshal(body, &data)
  239. if err != nil {
  240. azlog.Error("Failed to unmarshal AzureMonitor response", "error", err, "status", res.Status, "body", string(body))
  241. return AzureMonitorResponse{}, err
  242. }
  243. return data, nil
  244. }
  245. func (e *AzureMonitorDatasource) parseResponse(queryRes *tsdb.QueryResult, data AzureMonitorResponse, query *AzureMonitorQuery) error {
  246. if len(data.Value) == 0 {
  247. return nil
  248. }
  249. for _, series := range data.Value[0].Timeseries {
  250. points := []tsdb.TimePoint{}
  251. metadataName := ""
  252. metadataValue := ""
  253. if len(series.Metadatavalues) > 0 {
  254. metadataName = series.Metadatavalues[0].Name.LocalizedValue
  255. metadataValue = series.Metadatavalues[0].Value
  256. }
  257. metricName := formatLegendKey(query.Alias, query.UrlComponents["resourceName"], data.Value[0].Name.LocalizedValue, metadataName, metadataValue, data.Namespace, data.Value[0].ID)
  258. for _, point := range series.Data {
  259. var value float64
  260. switch query.Params.Get("aggregation") {
  261. case "Average":
  262. value = point.Average
  263. case "Total":
  264. value = point.Total
  265. case "Maximum":
  266. value = point.Maximum
  267. case "Minimum":
  268. value = point.Minimum
  269. case "Count":
  270. value = point.Count
  271. default:
  272. value = point.Count
  273. }
  274. points = append(points, tsdb.NewTimePoint(null.FloatFrom(value), float64((point.TimeStamp).Unix())*1000))
  275. }
  276. queryRes.Series = append(queryRes.Series, &tsdb.TimeSeries{
  277. Name: metricName,
  278. Points: points,
  279. })
  280. }
  281. queryRes.Meta.Set("unit", data.Value[0].Unit)
  282. return nil
  283. }
  284. // findClosestAllowedIntervalMs is used for the auto time grain setting.
  285. // It finds the closest time grain from the list of allowed time grains for Azure Monitor
  286. // using the Grafana interval in milliseconds
  287. // Some metrics only allow a limited list of time grains. The allowedTimeGrains parameter
  288. // allows overriding the default list of allowed time grains.
  289. func (e *AzureMonitorDatasource) findClosestAllowedIntervalMS(intervalMs int64, allowedTimeGrains []int64) int64 {
  290. allowedIntervals := defaultAllowedIntervalsMS
  291. if len(allowedTimeGrains) > 0 {
  292. allowedIntervals = allowedTimeGrains
  293. }
  294. closest := allowedIntervals[0]
  295. for i, allowed := range allowedIntervals {
  296. if intervalMs > allowed {
  297. if i+1 < len(allowedIntervals) {
  298. closest = allowedIntervals[i+1]
  299. } else {
  300. closest = allowed
  301. }
  302. }
  303. }
  304. return closest
  305. }
  306. // formatLegendKey builds the legend key or timeseries name
  307. // Alias patterns like {{resourcename}} are replaced with the appropriate data values.
  308. func formatLegendKey(alias string, resourceName string, metricName string, metadataName string, metadataValue string, namespace string, seriesID string) string {
  309. if alias == "" {
  310. if len(metadataName) > 0 {
  311. return fmt.Sprintf("%s{%s=%s}.%s", resourceName, metadataName, metadataValue, metricName)
  312. }
  313. return fmt.Sprintf("%s.%s", resourceName, metricName)
  314. }
  315. startIndex := strings.Index(seriesID, "/resourceGroups/") + 16
  316. endIndex := strings.Index(seriesID, "/providers")
  317. resourceGroup := seriesID[startIndex:endIndex]
  318. result := legendKeyFormat.ReplaceAllFunc([]byte(alias), func(in []byte) []byte {
  319. metaPartName := strings.Replace(string(in), "{{", "", 1)
  320. metaPartName = strings.Replace(metaPartName, "}}", "", 1)
  321. metaPartName = strings.ToLower(strings.TrimSpace(metaPartName))
  322. if metaPartName == "resourcegroup" {
  323. return []byte(resourceGroup)
  324. }
  325. if metaPartName == "namespace" {
  326. return []byte(namespace)
  327. }
  328. if metaPartName == "resourcename" {
  329. return []byte(resourceName)
  330. }
  331. if metaPartName == "metric" {
  332. return []byte(metricName)
  333. }
  334. if metaPartName == "dimensionname" {
  335. return []byte(metadataName)
  336. }
  337. if metaPartName == "dimensionvalue" {
  338. return []byte(metadataValue)
  339. }
  340. return in
  341. })
  342. return string(result)
  343. }