grpc_client.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package plugin
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/hashicorp/go-plugin/internal/plugin"
  8. "golang.org/x/net/context"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/credentials"
  11. "google.golang.org/grpc/health/grpc_health_v1"
  12. )
  13. func dialGRPCConn(tls *tls.Config, dialer func(string, time.Duration) (net.Conn, error)) (*grpc.ClientConn, error) {
  14. // Build dialing options.
  15. opts := make([]grpc.DialOption, 0, 5)
  16. // We use a custom dialer so that we can connect over unix domain sockets.
  17. opts = append(opts, grpc.WithDialer(dialer))
  18. // Fail right away
  19. opts = append(opts, grpc.FailOnNonTempDialError(true))
  20. // If we have no TLS configuration set, we need to explicitly tell grpc
  21. // that we're connecting with an insecure connection.
  22. if tls == nil {
  23. opts = append(opts, grpc.WithInsecure())
  24. } else {
  25. opts = append(opts, grpc.WithTransportCredentials(
  26. credentials.NewTLS(tls)))
  27. }
  28. // Connect. Note the first parameter is unused because we use a custom
  29. // dialer that has the state to see the address.
  30. conn, err := grpc.Dial("unused", opts...)
  31. if err != nil {
  32. return nil, err
  33. }
  34. return conn, nil
  35. }
  36. // newGRPCClient creates a new GRPCClient. The Client argument is expected
  37. // to be successfully started already with a lock held.
  38. func newGRPCClient(doneCtx context.Context, c *Client) (*GRPCClient, error) {
  39. conn, err := dialGRPCConn(c.config.TLSConfig, c.dialer)
  40. if err != nil {
  41. return nil, err
  42. }
  43. // Start the broker.
  44. brokerGRPCClient := newGRPCBrokerClient(conn)
  45. broker := newGRPCBroker(brokerGRPCClient, c.config.TLSConfig)
  46. go broker.Run()
  47. go brokerGRPCClient.StartStream()
  48. cl := &GRPCClient{
  49. Conn: conn,
  50. Plugins: c.config.Plugins,
  51. doneCtx: doneCtx,
  52. broker: broker,
  53. controller: plugin.NewGRPCControllerClient(conn),
  54. }
  55. return cl, nil
  56. }
  57. // GRPCClient connects to a GRPCServer over gRPC to dispense plugin types.
  58. type GRPCClient struct {
  59. Conn *grpc.ClientConn
  60. Plugins map[string]Plugin
  61. doneCtx context.Context
  62. broker *GRPCBroker
  63. controller plugin.GRPCControllerClient
  64. }
  65. // ClientProtocol impl.
  66. func (c *GRPCClient) Close() error {
  67. c.broker.Close()
  68. c.controller.Shutdown(c.doneCtx, &plugin.Empty{})
  69. return c.Conn.Close()
  70. }
  71. // ClientProtocol impl.
  72. func (c *GRPCClient) Dispense(name string) (interface{}, error) {
  73. raw, ok := c.Plugins[name]
  74. if !ok {
  75. return nil, fmt.Errorf("unknown plugin type: %s", name)
  76. }
  77. p, ok := raw.(GRPCPlugin)
  78. if !ok {
  79. return nil, fmt.Errorf("plugin %q doesn't support gRPC", name)
  80. }
  81. return p.GRPCClient(c.doneCtx, c.broker, c.Conn)
  82. }
  83. // ClientProtocol impl.
  84. func (c *GRPCClient) Ping() error {
  85. client := grpc_health_v1.NewHealthClient(c.Conn)
  86. _, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{
  87. Service: GRPCServiceName,
  88. })
  89. return err
  90. }