|
|
@@ -1,6 +1,7 @@
|
|
|
package plugins
|
|
|
|
|
|
import (
|
|
|
+ "context"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"os"
|
|
|
@@ -69,11 +70,11 @@ func buildExecutablePath(pluginDir, executable, os, arch string) string {
|
|
|
return path.Join(pluginDir, fmt.Sprintf("%s_%s_%s", executable, strings.ToLower(os), strings.ToLower(arch)))
|
|
|
}
|
|
|
|
|
|
-func (p *DataSourcePlugin) initBackendPlugin(log log.Logger) error {
|
|
|
+func (p *DataSourcePlugin) initBackendPlugin(ctx context.Context, log log.Logger) error {
|
|
|
p.log = log.New("plugin-id", p.Id)
|
|
|
|
|
|
p.spawnSubProcess()
|
|
|
- go p.reattachKilledProcess()
|
|
|
+ go p.reattachKilledProcess(ctx)
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
@@ -108,14 +109,17 @@ func (p *DataSourcePlugin) spawnSubProcess() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (p *DataSourcePlugin) reattachKilledProcess() {
|
|
|
+func (p *DataSourcePlugin) reattachKilledProcess(ctx context.Context) error {
|
|
|
ticker := time.NewTicker(time.Second * 1)
|
|
|
|
|
|
for {
|
|
|
select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ return ctx.Err()
|
|
|
case <-ticker.C:
|
|
|
if p.client.Exited() {
|
|
|
err := p.spawnSubProcess()
|
|
|
+ p.log.Debug("Spawning new sub process", "name", p.Name, "id", p.Id)
|
|
|
if err != nil {
|
|
|
p.log.Error("Failed to spawn subprocess")
|
|
|
}
|
|
|
@@ -126,6 +130,7 @@ func (p *DataSourcePlugin) reattachKilledProcess() {
|
|
|
|
|
|
func (p *DataSourcePlugin) Kill() {
|
|
|
if p.client != nil {
|
|
|
+ p.log.Debug("Killing subprocess ", "name", p.Name)
|
|
|
p.client.Kill()
|
|
|
}
|
|
|
}
|