get_metric_data.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package cloudwatch
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "time"
  7. "github.com/aws/aws-sdk-go/aws"
  8. "github.com/aws/aws-sdk-go/service/cloudwatch"
  9. "github.com/grafana/grafana/pkg/components/null"
  10. "github.com/grafana/grafana/pkg/components/simplejson"
  11. "github.com/grafana/grafana/pkg/infra/metrics"
  12. "github.com/grafana/grafana/pkg/tsdb"
  13. )
  14. func (e *CloudWatchExecutor) executeGetMetricDataQuery(ctx context.Context, region string, queries map[string]*CloudWatchQuery, queryContext *tsdb.TsdbQuery) ([]*tsdb.QueryResult, error) {
  15. queryResponses := make([]*tsdb.QueryResult, 0)
  16. client, err := e.getClient(region)
  17. if err != nil {
  18. return queryResponses, err
  19. }
  20. params, err := parseGetMetricDataQuery(queries, queryContext)
  21. if err != nil {
  22. return queryResponses, err
  23. }
  24. nextToken := ""
  25. mdr := make(map[string]map[string]*cloudwatch.MetricDataResult)
  26. for {
  27. if nextToken != "" {
  28. params.NextToken = aws.String(nextToken)
  29. }
  30. resp, err := client.GetMetricDataWithContext(ctx, params)
  31. if err != nil {
  32. return queryResponses, err
  33. }
  34. metrics.MAwsCloudWatchGetMetricData.Add(float64(len(params.MetricDataQueries)))
  35. for _, r := range resp.MetricDataResults {
  36. if _, ok := mdr[*r.Id]; !ok {
  37. mdr[*r.Id] = make(map[string]*cloudwatch.MetricDataResult)
  38. mdr[*r.Id][*r.Label] = r
  39. } else if _, ok := mdr[*r.Id][*r.Label]; !ok {
  40. mdr[*r.Id][*r.Label] = r
  41. } else {
  42. mdr[*r.Id][*r.Label].Timestamps = append(mdr[*r.Id][*r.Label].Timestamps, r.Timestamps...)
  43. mdr[*r.Id][*r.Label].Values = append(mdr[*r.Id][*r.Label].Values, r.Values...)
  44. }
  45. }
  46. if resp.NextToken == nil || *resp.NextToken == "" {
  47. break
  48. }
  49. nextToken = *resp.NextToken
  50. }
  51. for id, lr := range mdr {
  52. queryRes, err := parseGetMetricDataResponse(lr, queries[id])
  53. if err != nil {
  54. return queryResponses, err
  55. }
  56. queryResponses = append(queryResponses, queryRes)
  57. }
  58. return queryResponses, nil
  59. }
  60. func parseGetMetricDataQuery(queries map[string]*CloudWatchQuery, queryContext *tsdb.TsdbQuery) (*cloudwatch.GetMetricDataInput, error) {
  61. // validate query
  62. for _, query := range queries {
  63. if !(len(query.Statistics) == 1 && len(query.ExtendedStatistics) == 0) &&
  64. !(len(query.Statistics) == 0 && len(query.ExtendedStatistics) == 1) {
  65. return nil, errors.New("Statistics count should be 1")
  66. }
  67. }
  68. startTime, err := queryContext.TimeRange.ParseFrom()
  69. if err != nil {
  70. return nil, err
  71. }
  72. endTime, err := queryContext.TimeRange.ParseTo()
  73. if err != nil {
  74. return nil, err
  75. }
  76. params := &cloudwatch.GetMetricDataInput{
  77. StartTime: aws.Time(startTime),
  78. EndTime: aws.Time(endTime),
  79. ScanBy: aws.String("TimestampAscending"),
  80. }
  81. for _, query := range queries {
  82. // 1 minutes resolution metrics is stored for 15 days, 15 * 24 * 60 = 21600
  83. if query.HighResolution && (((endTime.Unix() - startTime.Unix()) / int64(query.Period)) > 21600) {
  84. return nil, errors.New("too long query period")
  85. }
  86. mdq := &cloudwatch.MetricDataQuery{
  87. Id: aws.String(query.Id),
  88. ReturnData: aws.Bool(query.ReturnData),
  89. }
  90. if query.Expression != "" {
  91. mdq.Expression = aws.String(query.Expression)
  92. } else {
  93. mdq.MetricStat = &cloudwatch.MetricStat{
  94. Metric: &cloudwatch.Metric{
  95. Namespace: aws.String(query.Namespace),
  96. MetricName: aws.String(query.MetricName),
  97. },
  98. Period: aws.Int64(int64(query.Period)),
  99. }
  100. for _, d := range query.Dimensions {
  101. mdq.MetricStat.Metric.Dimensions = append(mdq.MetricStat.Metric.Dimensions,
  102. &cloudwatch.Dimension{
  103. Name: d.Name,
  104. Value: d.Value,
  105. })
  106. }
  107. if len(query.Statistics) == 1 {
  108. mdq.MetricStat.Stat = query.Statistics[0]
  109. } else {
  110. mdq.MetricStat.Stat = query.ExtendedStatistics[0]
  111. }
  112. }
  113. params.MetricDataQueries = append(params.MetricDataQueries, mdq)
  114. }
  115. return params, nil
  116. }
  117. func parseGetMetricDataResponse(lr map[string]*cloudwatch.MetricDataResult, query *CloudWatchQuery) (*tsdb.QueryResult, error) {
  118. queryRes := tsdb.NewQueryResult()
  119. queryRes.RefId = query.RefId
  120. for label, r := range lr {
  121. if *r.StatusCode != "Complete" {
  122. return queryRes, fmt.Errorf("Part of query is failed: %s", *r.StatusCode)
  123. }
  124. series := tsdb.TimeSeries{
  125. Tags: map[string]string{},
  126. Points: make([]tsdb.TimePoint, 0),
  127. }
  128. for _, d := range query.Dimensions {
  129. series.Tags[*d.Name] = *d.Value
  130. }
  131. s := ""
  132. if len(query.Statistics) == 1 {
  133. s = *query.Statistics[0]
  134. } else {
  135. s = *query.ExtendedStatistics[0]
  136. }
  137. series.Name = formatAlias(query, s, series.Tags, label)
  138. for j, t := range r.Timestamps {
  139. if j > 0 {
  140. expectedTimestamp := r.Timestamps[j-1].Add(time.Duration(query.Period) * time.Second)
  141. if expectedTimestamp.Before(*t) {
  142. series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFromPtr(nil), float64(expectedTimestamp.Unix()*1000)))
  143. }
  144. }
  145. series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(*r.Values[j]), float64((*t).Unix())*1000))
  146. }
  147. queryRes.Series = append(queryRes.Series, &series)
  148. queryRes.Meta = simplejson.New()
  149. }
  150. return queryRes, nil
  151. }