get_metric_statistics.go 6.9 KB


  1. package cloudwatch
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "regexp"
  7. "sort"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/aws/aws-sdk-go/aws"
  12. "github.com/aws/aws-sdk-go/aws/request"
  13. "github.com/aws/aws-sdk-go/service/cloudwatch"
  14. "github.com/grafana/grafana/pkg/components/null"
  15. "github.com/grafana/grafana/pkg/components/simplejson"
  16. "github.com/grafana/grafana/pkg/infra/metrics"
  17. "github.com/grafana/grafana/pkg/setting"
  18. "github.com/grafana/grafana/pkg/tsdb"
  19. )
  20. func (e *CloudWatchExecutor) executeQuery(ctx context.Context, query *CloudWatchQuery, queryContext *tsdb.TsdbQuery) (*tsdb.QueryResult, error) {
  21. client, err := e.getClient(query.Region)
  22. if err != nil {
  23. return nil, err
  24. }
  25. startTime, err := queryContext.TimeRange.ParseFrom()
  26. if err != nil {
  27. return nil, err
  28. }
  29. endTime, err := queryContext.TimeRange.ParseTo()
  30. if err != nil {
  31. return nil, err
  32. }
  33. if !startTime.Before(endTime) {
  34. return nil, fmt.Errorf("Invalid time range: Start time must be before end time")
  35. }
  36. params := &cloudwatch.GetMetricStatisticsInput{
  37. Namespace: aws.String(query.Namespace),
  38. MetricName: aws.String(query.MetricName),
  39. Dimensions: query.Dimensions,
  40. Period: aws.Int64(int64(query.Period)),
  41. }
  42. if len(query.Statistics) > 0 {
  43. params.Statistics = query.Statistics
  44. }
  45. if len(query.ExtendedStatistics) > 0 {
  46. params.ExtendedStatistics = query.ExtendedStatistics
  47. }
  48. // 1 minutes resolution metrics is stored for 15 days, 15 * 24 * 60 = 21600
  49. if query.HighResolution && (((endTime.Unix() - startTime.Unix()) / int64(query.Period)) > 21600) {
  50. return nil, errors.New("too long query period")
  51. }
  52. var resp *cloudwatch.GetMetricStatisticsOutput
  53. for startTime.Before(endTime) {
  54. params.StartTime = aws.Time(startTime)
  55. if query.HighResolution {
  56. startTime = startTime.Add(time.Duration(1440*query.Period) * time.Second)
  57. } else {
  58. startTime = endTime
  59. }
  60. params.EndTime = aws.Time(startTime)
  61. if setting.Env == setting.DEV {
  62. plog.Debug("CloudWatch query", "raw query", params)
  63. }
  64. partResp, err := client.GetMetricStatisticsWithContext(ctx, params, request.WithResponseReadTimeout(10*time.Second))
  65. if err != nil {
  66. return nil, err
  67. }
  68. if resp != nil {
  69. resp.Datapoints = append(resp.Datapoints, partResp.Datapoints...)
  70. } else {
  71. resp = partResp
  72. }
  73. metrics.MAwsCloudWatchGetMetricStatistics.Inc()
  74. }
  75. queryRes, err := parseResponse(resp, query)
  76. if err != nil {
  77. return nil, err
  78. }
  79. return queryRes, nil
  80. }
  81. func parseQuery(model *simplejson.Json) (*CloudWatchQuery, error) {
  82. region, err := model.Get("region").String()
  83. if err != nil {
  84. return nil, err
  85. }
  86. namespace, err := model.Get("namespace").String()
  87. if err != nil {
  88. return nil, err
  89. }
  90. metricName, err := model.Get("metricName").String()
  91. if err != nil {
  92. return nil, err
  93. }
  94. id := model.Get("id").MustString("")
  95. expression := model.Get("expression").MustString("")
  96. dimensions, err := parseDimensions(model)
  97. if err != nil {
  98. return nil, err
  99. }
  100. statistics, extendedStatistics, err := parseStatistics(model)
  101. if err != nil {
  102. return nil, err
  103. }
  104. p := model.Get("period").MustString("")
  105. if p == "" {
  106. if namespace == "AWS/EC2" {
  107. p = "300"
  108. } else {
  109. p = "60"
  110. }
  111. }
  112. var period int
  113. if regexp.MustCompile(`^\d+$`).Match([]byte(p)) {
  114. period, err = strconv.Atoi(p)
  115. if err != nil {
  116. return nil, err
  117. }
  118. } else {
  119. d, err := time.ParseDuration(p)
  120. if err != nil {
  121. return nil, err
  122. }
  123. period = int(d.Seconds())
  124. }
  125. alias := model.Get("alias").MustString()
  126. returnData := model.Get("returnData").MustBool(false)
  127. highResolution := model.Get("highResolution").MustBool(false)
  128. return &CloudWatchQuery{
  129. Region: region,
  130. Namespace: namespace,
  131. MetricName: metricName,
  132. Dimensions: dimensions,
  133. Statistics: aws.StringSlice(statistics),
  134. ExtendedStatistics: aws.StringSlice(extendedStatistics),
  135. Period: period,
  136. Alias: alias,
  137. Id: id,
  138. Expression: expression,
  139. ReturnData: returnData,
  140. HighResolution: highResolution,
  141. }, nil
  142. }
  143. func parseDimensions(model *simplejson.Json) ([]*cloudwatch.Dimension, error) {
  144. var result []*cloudwatch.Dimension
  145. for k, v := range model.Get("dimensions").MustMap() {
  146. kk := k
  147. if vv, ok := v.(string); ok {
  148. result = append(result, &cloudwatch.Dimension{
  149. Name: &kk,
  150. Value: &vv,
  151. })
  152. } else {
  153. return nil, errors.New("failed to parse")
  154. }
  155. }
  156. sort.Slice(result, func(i, j int) bool {
  157. return *result[i].Name < *result[j].Name
  158. })
  159. return result, nil
  160. }
  161. func parseStatistics(model *simplejson.Json) ([]string, []string, error) {
  162. var statistics []string
  163. var extendedStatistics []string
  164. for _, s := range model.Get("statistics").MustArray() {
  165. if ss, ok := s.(string); ok {
  166. if _, isStandard := standardStatistics[ss]; isStandard {
  167. statistics = append(statistics, ss)
  168. } else {
  169. extendedStatistics = append(extendedStatistics, ss)
  170. }
  171. } else {
  172. return nil, nil, errors.New("failed to parse")
  173. }
  174. }
  175. return statistics, extendedStatistics, nil
  176. }
  177. func parseResponse(resp *cloudwatch.GetMetricStatisticsOutput, query *CloudWatchQuery) (*tsdb.QueryResult, error) {
  178. queryRes := tsdb.NewQueryResult()
  179. queryRes.RefId = query.RefId
  180. var value float64
  181. for _, s := range append(query.Statistics, query.ExtendedStatistics...) {
  182. series := tsdb.TimeSeries{
  183. Tags: map[string]string{},
  184. Points: make([]tsdb.TimePoint, 0),
  185. }
  186. for _, d := range query.Dimensions {
  187. series.Tags[*d.Name] = *d.Value
  188. }
  189. series.Name = formatAlias(query, *s, series.Tags, "")
  190. lastTimestamp := make(map[string]time.Time)
  191. sort.Slice(resp.Datapoints, func(i, j int) bool {
  192. return (*resp.Datapoints[i].Timestamp).Before(*resp.Datapoints[j].Timestamp)
  193. })
  194. for _, v := range resp.Datapoints {
  195. switch *s {
  196. case "Average":
  197. value = *v.Average
  198. case "Maximum":
  199. value = *v.Maximum
  200. case "Minimum":
  201. value = *v.Minimum
  202. case "Sum":
  203. value = *v.Sum
  204. case "SampleCount":
  205. value = *v.SampleCount
  206. default:
  207. if strings.Index(*s, "p") == 0 && v.ExtendedStatistics[*s] != nil {
  208. value = *v.ExtendedStatistics[*s]
  209. }
  210. }
  211. // terminate gap of data points
  212. timestamp := *v.Timestamp
  213. if _, ok := lastTimestamp[*s]; ok {
  214. nextTimestampFromLast := lastTimestamp[*s].Add(time.Duration(query.Period) * time.Second)
  215. for timestamp.After(nextTimestampFromLast) {
  216. series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFromPtr(nil), float64(nextTimestampFromLast.Unix()*1000)))
  217. nextTimestampFromLast = nextTimestampFromLast.Add(time.Duration(query.Period) * time.Second)
  218. }
  219. }
  220. lastTimestamp[*s] = timestamp
  221. series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(value), float64(timestamp.Unix()*1000)))
  222. }
  223. queryRes.Series = append(queryRes.Series, &series)
  224. queryRes.Meta = simplejson.New()
  225. if len(resp.Datapoints) > 0 && resp.Datapoints[0].Unit != nil {
  226. if unit, ok := cloudwatchUnitMappings[*resp.Datapoints[0].Unit]; ok {
  227. queryRes.Meta.Set("unit", unit)
  228. }
  229. }
  230. }
  231. return queryRes, nil
  232. }