grpc_server.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package plugin
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "net"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/credentials"
  11. "google.golang.org/grpc/health"
  12. "google.golang.org/grpc/health/grpc_health_v1"
  13. )
  14. // GRPCServiceName is the name of the service that the health check should
  15. // return as passing.
  16. const GRPCServiceName = "plugin"
  17. // DefaultGRPCServer can be used with the "GRPCServer" field for Server
  18. // as a default factory method to create a gRPC server with no extra options.
  19. func DefaultGRPCServer(opts []grpc.ServerOption) *grpc.Server {
  20. return grpc.NewServer(opts...)
  21. }
  22. // GRPCServer is a ServerType implementation that serves plugins over
  23. // gRPC. This allows plugins to easily be written for other languages.
  24. //
  25. // The GRPCServer outputs a custom configuration as a base64-encoded
  26. // JSON structure represented by the GRPCServerConfig config structure.
  27. type GRPCServer struct {
  28. // Plugins are the list of plugins to serve.
  29. Plugins map[string]Plugin
  30. // Server is the actual server that will accept connections. This
  31. // will be used for plugin registration as well.
  32. Server func([]grpc.ServerOption) *grpc.Server
  33. // TLS should be the TLS configuration if available. If this is nil,
  34. // the connection will not have transport security.
  35. TLS *tls.Config
  36. // DoneCh is the channel that is closed when this server has exited.
  37. DoneCh chan struct{}
  38. // Stdout/StderrLis are the readers for stdout/stderr that will be copied
  39. // to the stdout/stderr connection that is output.
  40. Stdout io.Reader
  41. Stderr io.Reader
  42. config GRPCServerConfig
  43. server *grpc.Server
  44. broker *GRPCBroker
  45. }
  46. // ServerProtocol impl.
  47. func (s *GRPCServer) Init() error {
  48. // Create our server
  49. var opts []grpc.ServerOption
  50. if s.TLS != nil {
  51. opts = append(opts, grpc.Creds(credentials.NewTLS(s.TLS)))
  52. }
  53. s.server = s.Server(opts)
  54. // Register the health service
  55. healthCheck := health.NewServer()
  56. healthCheck.SetServingStatus(
  57. GRPCServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
  58. grpc_health_v1.RegisterHealthServer(s.server, healthCheck)
  59. // Register the broker service
  60. brokerServer := newGRPCBrokerServer()
  61. RegisterGRPCBrokerServer(s.server, brokerServer)
  62. s.broker = newGRPCBroker(brokerServer, s.TLS)
  63. go s.broker.Run()
  64. // Register all our plugins onto the gRPC server.
  65. for k, raw := range s.Plugins {
  66. p, ok := raw.(GRPCPlugin)
  67. if !ok {
  68. return fmt.Errorf("%q is not a GRPC-compatible plugin", k)
  69. }
  70. if err := p.GRPCServer(s.broker, s.server); err != nil {
  71. return fmt.Errorf("error registring %q: %s", k, err)
  72. }
  73. }
  74. return nil
  75. }
  76. // Stop calls Stop on the underlying grpc.Server
  77. func (s *GRPCServer) Stop() {
  78. s.server.Stop()
  79. }
  80. // GracefulStop calls GracefulStop on the underlying grpc.Server
  81. func (s *GRPCServer) GracefulStop() {
  82. s.server.GracefulStop()
  83. }
  84. // Config is the GRPCServerConfig encoded as JSON then base64.
  85. func (s *GRPCServer) Config() string {
  86. // Create a buffer that will contain our final contents
  87. var buf bytes.Buffer
  88. // Wrap the base64 encoding with JSON encoding.
  89. if err := json.NewEncoder(&buf).Encode(s.config); err != nil {
  90. // We panic since ths shouldn't happen under any scenario. We
  91. // carefully control the structure being encoded here and it should
  92. // always be successful.
  93. panic(err)
  94. }
  95. return buf.String()
  96. }
  97. func (s *GRPCServer) Serve(lis net.Listener) {
  98. // Start serving in a goroutine
  99. go s.server.Serve(lis)
  100. // Wait until graceful completion
  101. <-s.DoneCh
  102. }
  103. // GRPCServerConfig is the extra configuration passed along for consumers
  104. // to facilitate using GRPC plugins.
  105. type GRPCServerConfig struct {
  106. StdoutAddr string `json:"stdout_addr"`
  107. StderrAddr string `json:"stderr_addr"`
  108. }