datasource_plugin_wrapper.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package wrapper
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/grafana/grafana-plugin-model/go/datasource"
  7. "github.com/grafana/grafana/pkg/components/null"
  8. "github.com/grafana/grafana/pkg/components/simplejson"
  9. "github.com/grafana/grafana/pkg/infra/log"
  10. "github.com/grafana/grafana/pkg/models"
  11. "github.com/grafana/grafana/pkg/tsdb"
  12. )
  13. func NewDatasourcePluginWrapper(log log.Logger, plugin datasource.DatasourcePlugin) *DatasourcePluginWrapper {
  14. return &DatasourcePluginWrapper{DatasourcePlugin: plugin, logger: log}
  15. }
  16. type DatasourcePluginWrapper struct {
  17. datasource.DatasourcePlugin
  18. logger log.Logger
  19. }
  20. func (tw *DatasourcePluginWrapper) Query(ctx context.Context, ds *models.DataSource, query *tsdb.TsdbQuery) (*tsdb.Response, error) {
  21. jsonData, err := ds.JsonData.MarshalJSON()
  22. if err != nil {
  23. return nil, err
  24. }
  25. pbQuery := &datasource.DatasourceRequest{
  26. Datasource: &datasource.DatasourceInfo{
  27. Name: ds.Name,
  28. Type: ds.Type,
  29. Url: ds.Url,
  30. Id: ds.Id,
  31. OrgId: ds.OrgId,
  32. JsonData: string(jsonData),
  33. DecryptedSecureJsonData: ds.SecureJsonData.Decrypt(),
  34. },
  35. TimeRange: &datasource.TimeRange{
  36. FromRaw: query.TimeRange.From,
  37. ToRaw: query.TimeRange.To,
  38. ToEpochMs: query.TimeRange.GetToAsMsEpoch(),
  39. FromEpochMs: query.TimeRange.GetFromAsMsEpoch(),
  40. },
  41. Queries: []*datasource.Query{},
  42. }
  43. for _, q := range query.Queries {
  44. modelJson, _ := q.Model.MarshalJSON()
  45. pbQuery.Queries = append(pbQuery.Queries, &datasource.Query{
  46. ModelJson: string(modelJson),
  47. IntervalMs: q.IntervalMs,
  48. RefId: q.RefId,
  49. MaxDataPoints: q.MaxDataPoints,
  50. })
  51. }
  52. pbres, err := tw.DatasourcePlugin.Query(ctx, pbQuery)
  53. if err != nil {
  54. return nil, err
  55. }
  56. res := &tsdb.Response{
  57. Results: map[string]*tsdb.QueryResult{},
  58. }
  59. for _, r := range pbres.Results {
  60. qr := &tsdb.QueryResult{
  61. RefId: r.RefId,
  62. Series: []*tsdb.TimeSeries{},
  63. Tables: []*tsdb.Table{},
  64. }
  65. if r.Error != "" {
  66. qr.Error = errors.New(r.Error)
  67. qr.ErrorString = r.Error
  68. }
  69. if r.MetaJson != "" {
  70. metaJson, err := simplejson.NewJson([]byte(r.MetaJson))
  71. if err != nil {
  72. tw.logger.Error("Error parsing JSON Meta field: " + err.Error())
  73. }
  74. qr.Meta = metaJson
  75. }
  76. for _, s := range r.GetSeries() {
  77. points := tsdb.TimeSeriesPoints{}
  78. for _, p := range s.Points {
  79. po := tsdb.NewTimePoint(null.FloatFrom(p.Value), float64(p.Timestamp))
  80. points = append(points, po)
  81. }
  82. qr.Series = append(qr.Series, &tsdb.TimeSeries{
  83. Name: s.Name,
  84. Tags: s.Tags,
  85. Points: points,
  86. })
  87. }
  88. mappedTables, err := tw.mapTables(r)
  89. if err != nil {
  90. return nil, err
  91. }
  92. qr.Tables = mappedTables
  93. res.Results[r.RefId] = qr
  94. }
  95. return res, nil
  96. }
  97. func (tw *DatasourcePluginWrapper) mapTables(r *datasource.QueryResult) ([]*tsdb.Table, error) {
  98. var tables []*tsdb.Table
  99. for _, t := range r.GetTables() {
  100. mappedTable, err := tw.mapTable(t)
  101. if err != nil {
  102. return nil, err
  103. }
  104. tables = append(tables, mappedTable)
  105. }
  106. return tables, nil
  107. }
  108. func (tw *DatasourcePluginWrapper) mapTable(t *datasource.Table) (*tsdb.Table, error) {
  109. table := &tsdb.Table{}
  110. for _, c := range t.GetColumns() {
  111. table.Columns = append(table.Columns, tsdb.TableColumn{
  112. Text: c.Name,
  113. })
  114. }
  115. table.Rows = make([]tsdb.RowValues, 0)
  116. for _, r := range t.GetRows() {
  117. row := tsdb.RowValues{}
  118. for _, rv := range r.Values {
  119. mappedRw, err := tw.mapRowValue(rv)
  120. if err != nil {
  121. return nil, err
  122. }
  123. row = append(row, mappedRw)
  124. }
  125. table.Rows = append(table.Rows, row)
  126. }
  127. return table, nil
  128. }
  129. func (tw *DatasourcePluginWrapper) mapRowValue(rv *datasource.RowValue) (interface{}, error) {
  130. switch rv.Kind {
  131. case datasource.RowValue_TYPE_NULL:
  132. return nil, nil
  133. case datasource.RowValue_TYPE_INT64:
  134. return rv.Int64Value, nil
  135. case datasource.RowValue_TYPE_BOOL:
  136. return rv.BoolValue, nil
  137. case datasource.RowValue_TYPE_STRING:
  138. return rv.StringValue, nil
  139. case datasource.RowValue_TYPE_DOUBLE:
  140. return rv.DoubleValue, nil
  141. case datasource.RowValue_TYPE_BYTES:
  142. return rv.BytesValue, nil
  143. default:
  144. return nil, fmt.Errorf("Unsupported row value %v from plugin", rv.Kind)
  145. }
  146. }