grpc_client.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package plugin
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. "net"
  6. "time"
  7. "golang.org/x/net/context"
  8. "google.golang.org/grpc"
  9. "google.golang.org/grpc/credentials"
  10. "google.golang.org/grpc/health/grpc_health_v1"
  11. )
  12. func dialGRPCConn(tls *tls.Config, dialer func(string, time.Duration) (net.Conn, error)) (*grpc.ClientConn, error) {
  13. // Build dialing options.
  14. opts := make([]grpc.DialOption, 0, 5)
  15. // We use a custom dialer so that we can connect over unix domain sockets
  16. opts = append(opts, grpc.WithDialer(dialer))
  17. // go-plugin expects to block the connection
  18. opts = append(opts, grpc.WithBlock())
  19. // Fail right away
  20. opts = append(opts, grpc.FailOnNonTempDialError(true))
  21. // If we have no TLS configuration set, we need to explicitly tell grpc
  22. // that we're connecting with an insecure connection.
  23. if tls == nil {
  24. opts = append(opts, grpc.WithInsecure())
  25. } else {
  26. opts = append(opts, grpc.WithTransportCredentials(
  27. credentials.NewTLS(tls)))
  28. }
  29. // Connect. Note the first parameter is unused because we use a custom
  30. // dialer that has the state to see the address.
  31. conn, err := grpc.Dial("unused", opts...)
  32. if err != nil {
  33. return nil, err
  34. }
  35. return conn, nil
  36. }
  37. // newGRPCClient creates a new GRPCClient. The Client argument is expected
  38. // to be successfully started already with a lock held.
  39. func newGRPCClient(doneCtx context.Context, c *Client) (*GRPCClient, error) {
  40. conn, err := dialGRPCConn(c.config.TLSConfig, c.dialer)
  41. if err != nil {
  42. return nil, err
  43. }
  44. // Start the broker.
  45. brokerGRPCClient := newGRPCBrokerClient(conn)
  46. broker := newGRPCBroker(brokerGRPCClient, c.config.TLSConfig)
  47. go broker.Run()
  48. go brokerGRPCClient.StartStream()
  49. return &GRPCClient{
  50. Conn: conn,
  51. Plugins: c.config.Plugins,
  52. doneCtx: doneCtx,
  53. broker: broker,
  54. }, nil
  55. }
  56. // GRPCClient connects to a GRPCServer over gRPC to dispense plugin types.
  57. type GRPCClient struct {
  58. Conn *grpc.ClientConn
  59. Plugins map[string]Plugin
  60. doneCtx context.Context
  61. broker *GRPCBroker
  62. }
  63. // ClientProtocol impl.
  64. func (c *GRPCClient) Close() error {
  65. c.broker.Close()
  66. return c.Conn.Close()
  67. }
  68. // ClientProtocol impl.
  69. func (c *GRPCClient) Dispense(name string) (interface{}, error) {
  70. raw, ok := c.Plugins[name]
  71. if !ok {
  72. return nil, fmt.Errorf("unknown plugin type: %s", name)
  73. }
  74. p, ok := raw.(GRPCPlugin)
  75. if !ok {
  76. return nil, fmt.Errorf("plugin %q doesn't support gRPC", name)
  77. }
  78. return p.GRPCClient(c.doneCtx, c.broker, c.Conn)
  79. }
  80. // ClientProtocol impl.
  81. func (c *GRPCClient) Ping() error {
  82. client := grpc_health_v1.NewHealthClient(c.Conn)
  83. _, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{
  84. Service: GRPCServiceName,
  85. })
  86. return err
  87. }