model_parser.go 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package elasticsearch
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/grafana/grafana/pkg/components/simplejson"
  7. "github.com/grafana/grafana/pkg/models"
  8. "github.com/grafana/grafana/pkg/tsdb"
  9. "src/github.com/davecgh/go-spew/spew"
  10. "strconv"
  11. "strings"
  12. "time"
  13. )
  14. type ElasticSearchQueryParser struct {
  15. DsInfo *models.DataSource
  16. TimeRange *tsdb.TimeRange
  17. Queries []*tsdb.Query
  18. }
  19. func (qp *ElasticSearchQueryParser) Parse() (string, []*QueryBuilder, error) {
  20. payload := bytes.Buffer{}
  21. queryHeader := qp.getQueryHeader()
  22. targets := make([]*QueryBuilder, 0)
  23. for _, q := range qp.Queries {
  24. timeField, err := q.Model.Get("timeField").String()
  25. if err != nil {
  26. return "", nil, err
  27. }
  28. rawQuery := q.Model.Get("query").MustString("")
  29. bucketAggs := q.Model.Get("bucketAggs").MustArray()
  30. metrics := q.Model.Get("metrics").MustArray()
  31. alias := q.Model.Get("alias").MustString("")
  32. builder := QueryBuilder{timeField, rawQuery, bucketAggs, metrics, alias}
  33. targets = append(targets, &builder)
  34. query, err := builder.Build()
  35. if err != nil {
  36. return "", nil, err
  37. }
  38. queryBytes, err := json.Marshal(query)
  39. if err != nil {
  40. return "", nil, err
  41. }
  42. payload.WriteString(queryHeader.String() + "\n")
  43. payload.WriteString(string(queryBytes) + "\n")
  44. }
  45. p, err := qp.payloadReplace(payload.String(), qp.DsInfo.JsonData)
  46. return p, targets, err
  47. }
  48. func (qp *ElasticSearchQueryParser) getQueryHeader() *QueryHeader {
  49. var header QueryHeader
  50. esVersion := qp.DsInfo.JsonData.Get("esVersion").MustInt()
  51. searchType := "query_then_fetch"
  52. if esVersion < 5 {
  53. searchType = "count"
  54. }
  55. header.SearchType = searchType
  56. header.IgnoreUnavailable = true
  57. header.Index = qp.getIndexList()
  58. if esVersion >= 56 {
  59. header.MaxConcurrentShardRequests = qp.DsInfo.JsonData.Get("maxConcurrentShardRequests").MustInt()
  60. }
  61. return &header
  62. }
  63. func (qp *ElasticSearchQueryParser) payloadReplace(payload string, model *simplejson.Json) (string, error) {
  64. parsedInterval, err := tsdb.GetIntervalFrom(qp.DsInfo, model, time.Millisecond)
  65. if err != nil {
  66. return "", nil
  67. }
  68. interval := intervalCalculator.Calculate(qp.TimeRange, parsedInterval)
  69. glog.Warn(spew.Sdump(interval))
  70. payload = strings.Replace(payload, "$timeFrom", fmt.Sprintf("%d", qp.TimeRange.GetFromAsMsEpoch()), -1)
  71. payload = strings.Replace(payload, "$timeTo", fmt.Sprintf("%d", qp.TimeRange.GetToAsMsEpoch()), -1)
  72. payload = strings.Replace(payload, "$interval", interval.Text, -1)
  73. payload = strings.Replace(payload, "$__interval_ms", strconv.FormatInt(interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1)
  74. payload = strings.Replace(payload, "$__interval", interval.Text, -1)
  75. return payload, nil
  76. }
  77. func (qp *ElasticSearchQueryParser) getIndexList() string {
  78. _, err := qp.DsInfo.JsonData.Get("interval").String()
  79. if err != nil {
  80. return qp.DsInfo.Database
  81. }
  82. // todo: support interval
  83. return qp.DsInfo.Database
  84. }