grpc_server.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package plugin
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "net"
  9. hclog "github.com/hashicorp/go-hclog"
  10. "github.com/hashicorp/go-plugin/internal/plugin"
  11. "google.golang.org/grpc"
  12. "google.golang.org/grpc/credentials"
  13. "google.golang.org/grpc/health"
  14. "google.golang.org/grpc/health/grpc_health_v1"
  15. )
  16. // GRPCServiceName is the name of the service that the health check should
  17. // return as passing.
  18. const GRPCServiceName = "plugin"
  19. // DefaultGRPCServer can be used with the "GRPCServer" field for Server
  20. // as a default factory method to create a gRPC server with no extra options.
  21. func DefaultGRPCServer(opts []grpc.ServerOption) *grpc.Server {
  22. return grpc.NewServer(opts...)
  23. }
  24. // GRPCServer is a ServerType implementation that serves plugins over
  25. // gRPC. This allows plugins to easily be written for other languages.
  26. //
  27. // The GRPCServer outputs a custom configuration as a base64-encoded
  28. // JSON structure represented by the GRPCServerConfig config structure.
  29. type GRPCServer struct {
  30. // Plugins are the list of plugins to serve.
  31. Plugins map[string]Plugin
  32. // Server is the actual server that will accept connections. This
  33. // will be used for plugin registration as well.
  34. Server func([]grpc.ServerOption) *grpc.Server
  35. // TLS should be the TLS configuration if available. If this is nil,
  36. // the connection will not have transport security.
  37. TLS *tls.Config
  38. // DoneCh is the channel that is closed when this server has exited.
  39. DoneCh chan struct{}
  40. // Stdout/StderrLis are the readers for stdout/stderr that will be copied
  41. // to the stdout/stderr connection that is output.
  42. Stdout io.Reader
  43. Stderr io.Reader
  44. config GRPCServerConfig
  45. server *grpc.Server
  46. broker *GRPCBroker
  47. logger hclog.Logger
  48. }
  49. // ServerProtocol impl.
  50. func (s *GRPCServer) Init() error {
  51. // Create our server
  52. var opts []grpc.ServerOption
  53. if s.TLS != nil {
  54. opts = append(opts, grpc.Creds(credentials.NewTLS(s.TLS)))
  55. }
  56. s.server = s.Server(opts)
  57. // Register the health service
  58. healthCheck := health.NewServer()
  59. healthCheck.SetServingStatus(
  60. GRPCServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
  61. grpc_health_v1.RegisterHealthServer(s.server, healthCheck)
  62. // Register the broker service
  63. brokerServer := newGRPCBrokerServer()
  64. plugin.RegisterGRPCBrokerServer(s.server, brokerServer)
  65. s.broker = newGRPCBroker(brokerServer, s.TLS)
  66. go s.broker.Run()
  67. // Register the controller
  68. controllerServer := &grpcControllerServer{
  69. server: s,
  70. }
  71. plugin.RegisterGRPCControllerServer(s.server, controllerServer)
  72. // Register all our plugins onto the gRPC server.
  73. for k, raw := range s.Plugins {
  74. p, ok := raw.(GRPCPlugin)
  75. if !ok {
  76. return fmt.Errorf("%q is not a GRPC-compatible plugin", k)
  77. }
  78. if err := p.GRPCServer(s.broker, s.server); err != nil {
  79. return fmt.Errorf("error registering %q: %s", k, err)
  80. }
  81. }
  82. return nil
  83. }
  84. // Stop calls Stop on the underlying grpc.Server
  85. func (s *GRPCServer) Stop() {
  86. s.server.Stop()
  87. }
  88. // GracefulStop calls GracefulStop on the underlying grpc.Server
  89. func (s *GRPCServer) GracefulStop() {
  90. s.server.GracefulStop()
  91. }
  92. // Config is the GRPCServerConfig encoded as JSON then base64.
  93. func (s *GRPCServer) Config() string {
  94. // Create a buffer that will contain our final contents
  95. var buf bytes.Buffer
  96. // Wrap the base64 encoding with JSON encoding.
  97. if err := json.NewEncoder(&buf).Encode(s.config); err != nil {
  98. // We panic since ths shouldn't happen under any scenario. We
  99. // carefully control the structure being encoded here and it should
  100. // always be successful.
  101. panic(err)
  102. }
  103. return buf.String()
  104. }
  105. func (s *GRPCServer) Serve(lis net.Listener) {
  106. defer close(s.DoneCh)
  107. err := s.server.Serve(lis)
  108. if err != nil {
  109. s.logger.Error("grpc server", "error", err)
  110. }
  111. }
  112. // GRPCServerConfig is the extra configuration passed along for consumers
  113. // to facilitate using GRPC plugins.
  114. type GRPCServerConfig struct {
  115. StdoutAddr string `json:"stdout_addr"`
  116. StderrAddr string `json:"stderr_addr"`
  117. }