model_parser.go 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package elasticsearch
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/grafana/grafana/pkg/components/simplejson"
  7. "github.com/grafana/grafana/pkg/log"
  8. "github.com/grafana/grafana/pkg/models"
  9. "github.com/grafana/grafana/pkg/tsdb"
  10. "src/github.com/davecgh/go-spew/spew"
  11. "strconv"
  12. "strings"
  13. "time"
  14. )
  15. type ElasticSearchQueryParser struct {
  16. DsInfo *models.DataSource
  17. TimeRange *tsdb.TimeRange
  18. Queries []*tsdb.Query
  19. glog log.Logger
  20. }
  21. func (qp *ElasticSearchQueryParser) Parse() (string, error) {
  22. payload := bytes.Buffer{}
  23. queryHeader := qp.getQueryHeader()
  24. for _, q := range qp.Queries {
  25. timeField, err := q.Model.Get("timeField").String()
  26. if err != nil {
  27. return "", err
  28. }
  29. rawQuery := q.Model.Get("query").MustString("")
  30. bucketAggs := q.Model.Get("bucketAggs").MustArray()
  31. metrics := q.Model.Get("metrics").MustArray()
  32. alias := q.Model.Get("alias").MustString("")
  33. builder := QueryBuilder{timeField, rawQuery, bucketAggs, metrics, alias}
  34. query, err := builder.Build()
  35. if err != nil {
  36. return "", err
  37. }
  38. queryBytes, err := json.Marshal(query)
  39. if err != nil {
  40. return "", err
  41. }
  42. payload.WriteString(queryHeader.String() + "\n")
  43. payload.WriteString(string(queryBytes) + "\n")
  44. }
  45. return qp.payloadReplace(payload.String(), qp.DsInfo.JsonData)
  46. }
  47. func (qp *ElasticSearchQueryParser) getQueryHeader() *QueryHeader {
  48. var header QueryHeader
  49. esVersion := qp.DsInfo.JsonData.Get("esVersion").MustInt()
  50. searchType := "query_then_fetch"
  51. if esVersion < 5 {
  52. searchType = "count"
  53. }
  54. header.SearchType = searchType
  55. header.IgnoreUnavailable = true
  56. header.Index = qp.getIndexList()
  57. if esVersion >= 56 {
  58. header.MaxConcurrentShardRequests = qp.DsInfo.JsonData.Get("maxConcurrentShardRequests").MustInt()
  59. }
  60. return &header
  61. }
  62. func (qp *ElasticSearchQueryParser) payloadReplace(payload string, model *simplejson.Json) (string, error) {
  63. parsedInterval, err := tsdb.GetIntervalFrom(qp.DsInfo, model, time.Millisecond)
  64. if err != nil {
  65. return "", nil
  66. }
  67. interval := intervalCalculator.Calculate(qp.TimeRange, parsedInterval)
  68. glog.Warn(spew.Sdump(interval))
  69. payload = strings.Replace(payload, "$timeFrom", fmt.Sprintf("%d", qp.TimeRange.GetFromAsMsEpoch()), -1)
  70. payload = strings.Replace(payload, "$timeTo", fmt.Sprintf("%d", qp.TimeRange.GetToAsMsEpoch()), -1)
  71. payload = strings.Replace(payload, "$interval", interval.Text, -1)
  72. payload = strings.Replace(payload, "$__interval_ms", strconv.FormatInt(interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1)
  73. payload = strings.Replace(payload, "$__interval", interval.Text, -1)
  74. return payload, nil
  75. }
  76. func (qp *ElasticSearchQueryParser) getIndexList() string {
  77. _, err := qp.DsInfo.JsonData.Get("interval").String()
  78. if err != nil {
  79. return qp.DsInfo.Database
  80. }
  81. // todo: support interval
  82. return qp.DsInfo.Database
  83. }