datasource_plugin_wrapper.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package tsdb
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/grafana/grafana/pkg/components/null"
  6. "github.com/grafana/grafana/pkg/log"
  7. "github.com/grafana/grafana/pkg/models"
  8. "github.com/grafana/grafana/pkg/tsdb"
  9. proto "github.com/grafana/grafana/pkg/tsdb/models"
  10. )
  11. func NewDatasourcePluginWrapper(log log.Logger, plugin TsdbPlugin) *DatasourcePluginWrapper {
  12. return &DatasourcePluginWrapper{TsdbPlugin: plugin, logger: log}
  13. }
  14. type DatasourcePluginWrapper struct {
  15. TsdbPlugin
  16. logger log.Logger
  17. }
  18. func (tw *DatasourcePluginWrapper) Query(ctx context.Context, ds *models.DataSource, query *tsdb.TsdbQuery) (*tsdb.Response, error) {
  19. jsonData, err := ds.JsonData.MarshalJSON()
  20. if err != nil {
  21. return nil, err
  22. }
  23. pbQuery := &proto.TsdbQuery{
  24. Datasource: &proto.DatasourceInfo{
  25. JsonData: string(jsonData),
  26. Name: ds.Name,
  27. Type: ds.Type,
  28. Url: ds.Url,
  29. Id: ds.Id,
  30. OrgId: ds.OrgId,
  31. },
  32. TimeRange: &proto.TimeRange{
  33. FromRaw: query.TimeRange.From,
  34. ToRaw: query.TimeRange.To,
  35. ToEpochMs: query.TimeRange.GetToAsMsEpoch(),
  36. FromEpochMs: query.TimeRange.GetFromAsMsEpoch(),
  37. },
  38. Queries: []*proto.Query{},
  39. }
  40. for _, q := range query.Queries {
  41. modelJson, _ := q.Model.MarshalJSON()
  42. pbQuery.Queries = append(pbQuery.Queries, &proto.Query{
  43. ModelJson: string(modelJson),
  44. IntervalMs: q.IntervalMs,
  45. RefId: q.RefId,
  46. MaxDataPoints: q.MaxDataPoints,
  47. })
  48. }
  49. pbres, err := tw.TsdbPlugin.Query(ctx, pbQuery)
  50. if err != nil {
  51. return nil, err
  52. }
  53. res := &tsdb.Response{
  54. Results: map[string]*tsdb.QueryResult{},
  55. }
  56. for _, r := range pbres.Results {
  57. res.Results[r.RefId] = &tsdb.QueryResult{
  58. RefId: r.RefId,
  59. Series: []*tsdb.TimeSeries{},
  60. }
  61. for _, s := range r.GetSeries() {
  62. points := tsdb.TimeSeriesPoints{}
  63. for _, p := range s.Points {
  64. po := tsdb.NewTimePoint(null.FloatFrom(p.Value), float64(p.Timestamp))
  65. points = append(points, po)
  66. }
  67. res.Results[r.RefId].Series = append(res.Results[r.RefId].Series, &tsdb.TimeSeries{
  68. Name: s.Name,
  69. Tags: s.Tags,
  70. Points: points,
  71. })
  72. }
  73. mappedTables, err := tw.mapTables(r)
  74. if err != nil {
  75. return nil, err
  76. }
  77. res.Results[r.RefId].Tables = mappedTables
  78. }
  79. return res, nil
  80. }
  81. func (tw *DatasourcePluginWrapper) mapTables(r *proto.QueryResult) ([]*tsdb.Table, error) {
  82. var tables []*tsdb.Table
  83. for _, t := range r.GetTables() {
  84. mappedTable, err := tw.mapTable(t)
  85. if err != nil {
  86. return nil, err
  87. }
  88. tables = append(tables, mappedTable)
  89. }
  90. return tables, nil
  91. }
  92. func (tw *DatasourcePluginWrapper) mapTable(t *proto.Table) (*tsdb.Table, error) {
  93. table := &tsdb.Table{}
  94. for _, c := range t.GetColumns() {
  95. table.Columns = append(table.Columns, tsdb.TableColumn{
  96. Text: c.Name,
  97. })
  98. }
  99. for _, r := range t.GetRows() {
  100. row := tsdb.RowValues{}
  101. for _, rv := range r.Values {
  102. mappedRw, err := tw.mapRowValue(rv)
  103. if err != nil {
  104. return nil, err
  105. }
  106. row = append(row, mappedRw)
  107. }
  108. table.Rows = append(table.Rows, row)
  109. }
  110. return table, nil
  111. }
  112. func (tw *DatasourcePluginWrapper) mapRowValue(rv *proto.RowValue) (interface{}, error) {
  113. switch rv.Kind {
  114. case proto.RowValue_TYPE_NULL:
  115. return nil, nil
  116. case proto.RowValue_TYPE_INT64:
  117. return rv.Int64Value, nil
  118. case proto.RowValue_TYPE_BOOL:
  119. return rv.BoolValue, nil
  120. case proto.RowValue_TYPE_STRING:
  121. return rv.StringValue, nil
  122. case proto.RowValue_TYPE_DOUBLE:
  123. return rv.DoubleValue, nil
  124. case proto.RowValue_TYPE_BYTES:
  125. return rv.BytesValue, nil
  126. default:
  127. return nil, fmt.Errorf("Unsupported row value %v from plugin", rv.Kind)
  128. }
  129. }