datasource_plugin.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package plugins
  2. import (
  3. "context"
  4. "encoding/json"
  5. "os/exec"
  6. "path"
  7. "time"
  8. "github.com/grafana/grafana-plugin-model/go/datasource"
  9. "github.com/grafana/grafana/pkg/infra/log"
  10. "github.com/grafana/grafana/pkg/models"
  11. "github.com/grafana/grafana/pkg/plugins/datasource/wrapper"
  12. "github.com/grafana/grafana/pkg/tsdb"
  13. plugin "github.com/hashicorp/go-plugin"
  14. )
  15. // DataSourcePlugin contains all metadata about a datasource plugin
  16. type DataSourcePlugin struct {
  17. FrontendPluginBase
  18. Annotations bool `json:"annotations"`
  19. Metrics bool `json:"metrics"`
  20. Alerting bool `json:"alerting"`
  21. Explore bool `json:"explore"`
  22. Table bool `json:"tables"`
  23. Logs bool `json:"logs"`
  24. QueryOptions map[string]bool `json:"queryOptions,omitempty"`
  25. BuiltIn bool `json:"builtIn,omitempty"`
  26. Mixed bool `json:"mixed,omitempty"`
  27. Routes []*AppPluginRoute `json:"routes"`
  28. Backend bool `json:"backend,omitempty"`
  29. Executable string `json:"executable,omitempty"`
  30. log log.Logger
  31. client *plugin.Client
  32. }
  33. func (p *DataSourcePlugin) Load(decoder *json.Decoder, pluginDir string) error {
  34. if err := decoder.Decode(&p); err != nil {
  35. return err
  36. }
  37. if err := p.registerPlugin(pluginDir); err != nil {
  38. return err
  39. }
  40. DataSources[p.Id] = p
  41. return nil
  42. }
  43. var handshakeConfig = plugin.HandshakeConfig{
  44. ProtocolVersion: 1,
  45. MagicCookieKey: "grafana_plugin_type",
  46. MagicCookieValue: "datasource",
  47. }
  48. func (p *DataSourcePlugin) startBackendPlugin(ctx context.Context, log log.Logger) error {
  49. p.log = log.New("plugin-id", p.Id)
  50. err := p.spawnSubProcess()
  51. if err == nil {
  52. go p.restartKilledProcess(ctx)
  53. }
  54. return err
  55. }
  56. func (p *DataSourcePlugin) spawnSubProcess() error {
  57. cmd := ComposePluginStartCommmand(p.Executable)
  58. fullpath := path.Join(p.PluginDir, cmd)
  59. p.client = plugin.NewClient(&plugin.ClientConfig{
  60. HandshakeConfig: handshakeConfig,
  61. Plugins: map[string]plugin.Plugin{p.Id: &datasource.DatasourcePluginImpl{}},
  62. Cmd: exec.Command(fullpath),
  63. AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
  64. Logger: LogWrapper{Logger: p.log},
  65. })
  66. rpcClient, err := p.client.Client()
  67. if err != nil {
  68. return err
  69. }
  70. raw, err := rpcClient.Dispense(p.Id)
  71. if err != nil {
  72. return err
  73. }
  74. plugin := raw.(datasource.DatasourcePlugin)
  75. tsdb.RegisterTsdbQueryEndpoint(p.Id, func(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
  76. return wrapper.NewDatasourcePluginWrapper(p.log, plugin), nil
  77. })
  78. return nil
  79. }
  80. func (p *DataSourcePlugin) restartKilledProcess(ctx context.Context) error {
  81. ticker := time.NewTicker(time.Second * 1)
  82. for {
  83. select {
  84. case <-ctx.Done():
  85. return ctx.Err()
  86. case <-ticker.C:
  87. if p.client.Exited() {
  88. err := p.spawnSubProcess()
  89. p.log.Debug("Spawning new sub process", "name", p.Name, "id", p.Id)
  90. if err != nil {
  91. p.log.Error("Failed to spawn subprocess")
  92. }
  93. }
  94. }
  95. }
  96. }
  97. func (p *DataSourcePlugin) Kill() {
  98. if p.client != nil {
  99. p.log.Debug("Killing subprocess ", "name", p.Name)
  100. p.client.Kill()
  101. }
  102. }