cloudwatch.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. package cloudwatch
  2. import (
  3. "context"
  4. "fmt"
  5. "regexp"
  6. "strconv"
  7. "strings"
  8. "github.com/aws/aws-sdk-go/aws/awserr"
  9. "github.com/aws/aws-sdk-go/service/ec2/ec2iface"
  10. "github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
  11. "github.com/grafana/grafana/pkg/infra/log"
  12. "github.com/grafana/grafana/pkg/models"
  13. "github.com/grafana/grafana/pkg/tsdb"
  14. "golang.org/x/sync/errgroup"
  15. )
  16. type CloudWatchExecutor struct {
  17. *models.DataSource
  18. ec2Svc ec2iface.EC2API
  19. rgtaSvc resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI
  20. }
  21. type DatasourceInfo struct {
  22. Profile string
  23. Region string
  24. AuthType string
  25. AssumeRoleArn string
  26. Namespace string
  27. AccessKey string
  28. SecretKey string
  29. }
  30. func NewCloudWatchExecutor(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
  31. return &CloudWatchExecutor{}, nil
  32. }
  33. var (
  34. plog log.Logger
  35. standardStatistics map[string]bool
  36. aliasFormat *regexp.Regexp
  37. )
  38. func init() {
  39. plog = log.New("tsdb.cloudwatch")
  40. tsdb.RegisterTsdbQueryEndpoint("cloudwatch", NewCloudWatchExecutor)
  41. standardStatistics = map[string]bool{
  42. "Average": true,
  43. "Maximum": true,
  44. "Minimum": true,
  45. "Sum": true,
  46. "SampleCount": true,
  47. }
  48. aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
  49. }
  50. func (e *CloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSource, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
  51. var result *tsdb.Response
  52. e.DataSource = dsInfo
  53. queryType := queryContext.Queries[0].Model.Get("type").MustString("")
  54. var err error
  55. switch queryType {
  56. case "metricFindQuery":
  57. result, err = e.executeMetricFindQuery(ctx, queryContext)
  58. case "annotationQuery":
  59. result, err = e.executeAnnotationQuery(ctx, queryContext)
  60. case "timeSeriesQuery":
  61. fallthrough
  62. default:
  63. result, err = e.executeTimeSeriesQuery(ctx, queryContext)
  64. }
  65. return result, err
  66. }
  67. func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
  68. results := &tsdb.Response{
  69. Results: make(map[string]*tsdb.QueryResult),
  70. }
  71. resultChan := make(chan *tsdb.QueryResult, len(queryContext.Queries))
  72. eg, ectx := errgroup.WithContext(ctx)
  73. getMetricDataQueries := make(map[string]map[string]*CloudWatchQuery)
  74. for i, model := range queryContext.Queries {
  75. queryType := model.Model.Get("type").MustString()
  76. if queryType != "timeSeriesQuery" && queryType != "" {
  77. continue
  78. }
  79. RefId := queryContext.Queries[i].RefId
  80. query, err := parseQuery(queryContext.Queries[i].Model)
  81. if err != nil {
  82. results.Results[RefId] = &tsdb.QueryResult{
  83. Error: err,
  84. }
  85. return results, nil
  86. }
  87. query.RefId = RefId
  88. if query.Id != "" {
  89. if _, ok := getMetricDataQueries[query.Region]; !ok {
  90. getMetricDataQueries[query.Region] = make(map[string]*CloudWatchQuery)
  91. }
  92. getMetricDataQueries[query.Region][query.Id] = query
  93. continue
  94. }
  95. if query.Id == "" && query.Expression != "" {
  96. results.Results[query.RefId] = &tsdb.QueryResult{
  97. Error: fmt.Errorf("Invalid query: id should be set if using expression"),
  98. }
  99. return results, nil
  100. }
  101. eg.Go(func() error {
  102. defer func() {
  103. if err := recover(); err != nil {
  104. plog.Error("Execute Query Panic", "error", err, "stack", log.Stack(1))
  105. if theErr, ok := err.(error); ok {
  106. resultChan <- &tsdb.QueryResult{
  107. RefId: query.RefId,
  108. Error: theErr,
  109. }
  110. }
  111. }
  112. }()
  113. queryRes, err := e.executeQuery(ectx, query, queryContext)
  114. if ae, ok := err.(awserr.Error); ok && ae.Code() == "500" {
  115. return err
  116. }
  117. if err != nil {
  118. resultChan <- &tsdb.QueryResult{
  119. RefId: query.RefId,
  120. Error: err,
  121. }
  122. return nil
  123. }
  124. resultChan <- queryRes
  125. return nil
  126. })
  127. }
  128. if len(getMetricDataQueries) > 0 {
  129. for region, getMetricDataQuery := range getMetricDataQueries {
  130. q := getMetricDataQuery
  131. eg.Go(func() error {
  132. defer func() {
  133. if err := recover(); err != nil {
  134. plog.Error("Execute Get Metric Data Query Panic", "error", err, "stack", log.Stack(1))
  135. if theErr, ok := err.(error); ok {
  136. resultChan <- &tsdb.QueryResult{
  137. Error: theErr,
  138. }
  139. }
  140. }
  141. }()
  142. queryResponses, err := e.executeGetMetricDataQuery(ectx, region, q, queryContext)
  143. if ae, ok := err.(awserr.Error); ok && ae.Code() == "500" {
  144. return err
  145. }
  146. for _, queryRes := range queryResponses {
  147. if err != nil {
  148. queryRes.Error = err
  149. }
  150. resultChan <- queryRes
  151. }
  152. return nil
  153. })
  154. }
  155. }
  156. if err := eg.Wait(); err != nil {
  157. return nil, err
  158. }
  159. close(resultChan)
  160. for result := range resultChan {
  161. results.Results[result.RefId] = result
  162. }
  163. return results, nil
  164. }
  165. func formatAlias(query *CloudWatchQuery, stat string, dimensions map[string]string, label string) string {
  166. region := query.Region
  167. namespace := query.Namespace
  168. metricName := query.MetricName
  169. period := strconv.Itoa(query.Period)
  170. if len(query.Id) > 0 && len(query.Expression) > 0 {
  171. if strings.Index(query.Expression, "SEARCH(") == 0 {
  172. pIndex := strings.LastIndex(query.Expression, ",")
  173. period = strings.Trim(query.Expression[pIndex+1:], " )")
  174. sIndex := strings.LastIndex(query.Expression[:pIndex], ",")
  175. stat = strings.Trim(query.Expression[sIndex+1:pIndex], " '")
  176. } else if len(query.Alias) > 0 {
  177. // expand by Alias
  178. } else {
  179. return query.Id
  180. }
  181. }
  182. data := map[string]string{}
  183. data["region"] = region
  184. data["namespace"] = namespace
  185. data["metric"] = metricName
  186. data["stat"] = stat
  187. data["period"] = period
  188. if len(label) != 0 {
  189. data["label"] = label
  190. }
  191. for k, v := range dimensions {
  192. data[k] = v
  193. }
  194. result := aliasFormat.ReplaceAllFunc([]byte(query.Alias), func(in []byte) []byte {
  195. labelName := strings.Replace(string(in), "{{", "", 1)
  196. labelName = strings.Replace(labelName, "}}", "", 1)
  197. labelName = strings.TrimSpace(labelName)
  198. if val, exists := data[labelName]; exists {
  199. return []byte(val)
  200. }
  201. return in
  202. })
  203. if string(result) == "" {
  204. return metricName + "_" + stat
  205. }
  206. return string(result)
  207. }