datasource_plugin_wrapper.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package wrapper
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/grafana/grafana/pkg/components/null"
  7. "github.com/grafana/grafana/pkg/log"
  8. "github.com/grafana/grafana/pkg/models"
  9. "github.com/grafana/grafana/pkg/tsdb"
  10. "github.com/grafana/grafana_plugin_model/go/datasource"
  11. )
  12. func NewDatasourcePluginWrapper(log log.Logger, plugin datasource.DatasourcePlugin) *DatasourcePluginWrapper {
  13. return &DatasourcePluginWrapper{DatasourcePlugin: plugin, logger: log}
  14. }
  15. type DatasourcePluginWrapper struct {
  16. datasource.DatasourcePlugin
  17. logger log.Logger
  18. }
  19. func (tw *DatasourcePluginWrapper) Query(ctx context.Context, ds *models.DataSource, query *tsdb.TsdbQuery) (*tsdb.Response, error) {
  20. jsonData, err := ds.JsonData.MarshalJSON()
  21. if err != nil {
  22. return nil, err
  23. }
  24. pbQuery := &datasource.DatasourceRequest{
  25. Datasource: &datasource.DatasourceInfo{
  26. Name: ds.Name,
  27. Type: ds.Type,
  28. Url: ds.Url,
  29. Id: ds.Id,
  30. OrgId: ds.OrgId,
  31. JsonData: string(jsonData),
  32. DecryptedSecureJsonData: ds.SecureJsonData.Decrypt(),
  33. },
  34. TimeRange: &datasource.TimeRange{
  35. FromRaw: query.TimeRange.From,
  36. ToRaw: query.TimeRange.To,
  37. ToEpochMs: query.TimeRange.GetToAsMsEpoch(),
  38. FromEpochMs: query.TimeRange.GetFromAsMsEpoch(),
  39. },
  40. Queries: []*datasource.Query{},
  41. }
  42. for _, q := range query.Queries {
  43. modelJson, _ := q.Model.MarshalJSON()
  44. pbQuery.Queries = append(pbQuery.Queries, &datasource.Query{
  45. ModelJson: string(modelJson),
  46. IntervalMs: q.IntervalMs,
  47. RefId: q.RefId,
  48. MaxDataPoints: q.MaxDataPoints,
  49. })
  50. }
  51. pbres, err := tw.DatasourcePlugin.Query(ctx, pbQuery)
  52. if err != nil {
  53. return nil, err
  54. }
  55. res := &tsdb.Response{
  56. Results: map[string]*tsdb.QueryResult{},
  57. }
  58. for _, r := range pbres.Results {
  59. qr := &tsdb.QueryResult{
  60. RefId: r.RefId,
  61. Series: []*tsdb.TimeSeries{},
  62. Tables: []*tsdb.Table{},
  63. }
  64. if r.Error != "" {
  65. qr.Error = errors.New(r.Error)
  66. qr.ErrorString = r.Error
  67. }
  68. for _, s := range r.GetSeries() {
  69. points := tsdb.TimeSeriesPoints{}
  70. for _, p := range s.Points {
  71. po := tsdb.NewTimePoint(null.FloatFrom(p.Value), float64(p.Timestamp))
  72. points = append(points, po)
  73. }
  74. qr.Series = append(qr.Series, &tsdb.TimeSeries{
  75. Name: s.Name,
  76. Tags: s.Tags,
  77. Points: points,
  78. })
  79. }
  80. mappedTables, err := tw.mapTables(r)
  81. if err != nil {
  82. return nil, err
  83. }
  84. qr.Tables = mappedTables
  85. res.Results[r.RefId] = qr
  86. }
  87. return res, nil
  88. }
  89. func (tw *DatasourcePluginWrapper) mapTables(r *datasource.QueryResult) ([]*tsdb.Table, error) {
  90. var tables []*tsdb.Table
  91. for _, t := range r.GetTables() {
  92. mappedTable, err := tw.mapTable(t)
  93. if err != nil {
  94. return nil, err
  95. }
  96. tables = append(tables, mappedTable)
  97. }
  98. return tables, nil
  99. }
  100. func (tw *DatasourcePluginWrapper) mapTable(t *datasource.Table) (*tsdb.Table, error) {
  101. table := &tsdb.Table{}
  102. for _, c := range t.GetColumns() {
  103. table.Columns = append(table.Columns, tsdb.TableColumn{
  104. Text: c.Name,
  105. })
  106. }
  107. table.Rows = make([]tsdb.RowValues, 0)
  108. for _, r := range t.GetRows() {
  109. row := tsdb.RowValues{}
  110. for _, rv := range r.Values {
  111. mappedRw, err := tw.mapRowValue(rv)
  112. if err != nil {
  113. return nil, err
  114. }
  115. row = append(row, mappedRw)
  116. }
  117. table.Rows = append(table.Rows, row)
  118. }
  119. return table, nil
  120. }
  121. func (tw *DatasourcePluginWrapper) mapRowValue(rv *datasource.RowValue) (interface{}, error) {
  122. switch rv.Kind {
  123. case datasource.RowValue_TYPE_NULL:
  124. return nil, nil
  125. case datasource.RowValue_TYPE_INT64:
  126. return rv.Int64Value, nil
  127. case datasource.RowValue_TYPE_BOOL:
  128. return rv.BoolValue, nil
  129. case datasource.RowValue_TYPE_STRING:
  130. return rv.StringValue, nil
  131. case datasource.RowValue_TYPE_DOUBLE:
  132. return rv.DoubleValue, nil
  133. case datasource.RowValue_TYPE_BYTES:
  134. return rv.BytesValue, nil
  135. default:
  136. return nil, fmt.Errorf("Unsupported row value %v from plugin", rv.Kind)
  137. }
  138. }