grpc_server.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package plugin
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "net"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/credentials"
  11. "google.golang.org/grpc/health"
  12. "google.golang.org/grpc/health/grpc_health_v1"
  13. )
  14. // GRPCServiceName is the name of the service that the health check should
  15. // return as passing.
  16. const GRPCServiceName = "plugin"
  17. // DefaultGRPCServer can be used with the "GRPCServer" field for Server
  18. // as a default factory method to create a gRPC server with no extra options.
  19. func DefaultGRPCServer(opts []grpc.ServerOption) *grpc.Server {
  20. return grpc.NewServer(opts...)
  21. }
  22. // GRPCServer is a ServerType implementation that serves plugins over
  23. // gRPC. This allows plugins to easily be written for other languages.
  24. //
  25. // The GRPCServer outputs a custom configuration as a base64-encoded
  26. // JSON structure represented by the GRPCServerConfig config structure.
  27. type GRPCServer struct {
  28. // Plugins are the list of plugins to serve.
  29. Plugins map[string]Plugin
  30. // Server is the actual server that will accept connections. This
  31. // will be used for plugin registration as well.
  32. Server func([]grpc.ServerOption) *grpc.Server
  33. // TLS should be the TLS configuration if available. If this is nil,
  34. // the connection will not have transport security.
  35. TLS *tls.Config
  36. // DoneCh is the channel that is closed when this server has exited.
  37. DoneCh chan struct{}
  38. // Stdout/StderrLis are the readers for stdout/stderr that will be copied
  39. // to the stdout/stderr connection that is output.
  40. Stdout io.Reader
  41. Stderr io.Reader
  42. config GRPCServerConfig
  43. server *grpc.Server
  44. }
  45. // ServerProtocol impl.
  46. func (s *GRPCServer) Init() error {
  47. // Create our server
  48. var opts []grpc.ServerOption
  49. if s.TLS != nil {
  50. opts = append(opts, grpc.Creds(credentials.NewTLS(s.TLS)))
  51. }
  52. s.server = s.Server(opts)
  53. // Register the health service
  54. healthCheck := health.NewServer()
  55. healthCheck.SetServingStatus(
  56. GRPCServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
  57. grpc_health_v1.RegisterHealthServer(s.server, healthCheck)
  58. // Register all our plugins onto the gRPC server.
  59. for k, raw := range s.Plugins {
  60. p, ok := raw.(GRPCPlugin)
  61. if !ok {
  62. return fmt.Errorf("%q is not a GRPC-compatibile plugin", k)
  63. }
  64. if err := p.GRPCServer(s.server); err != nil {
  65. return fmt.Errorf("error registring %q: %s", k, err)
  66. }
  67. }
  68. return nil
  69. }
  70. // Config is the GRPCServerConfig encoded as JSON then base64.
  71. func (s *GRPCServer) Config() string {
  72. // Create a buffer that will contain our final contents
  73. var buf bytes.Buffer
  74. // Wrap the base64 encoding with JSON encoding.
  75. if err := json.NewEncoder(&buf).Encode(s.config); err != nil {
  76. // We panic since ths shouldn't happen under any scenario. We
  77. // carefully control the structure being encoded here and it should
  78. // always be successful.
  79. panic(err)
  80. }
  81. return buf.String()
  82. }
  83. func (s *GRPCServer) Serve(lis net.Listener) {
  84. // Start serving in a goroutine
  85. go s.server.Serve(lis)
  86. // Wait until graceful completion
  87. <-s.DoneCh
  88. }
  89. // GRPCServerConfig is the extra configuration passed along for consumers
  90. // to facilitate using GRPC plugins.
  91. type GRPCServerConfig struct {
  92. StdoutAddr string `json:"stdout_addr"`
  93. StderrAddr string `json:"stderr_addr"`
  94. }