rpc_client.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package plugin
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. "io"
  6. "net"
  7. "net/rpc"
  8. "github.com/hashicorp/yamux"
  9. )
  10. // RPCClient connects to an RPCServer over net/rpc to dispense plugin types.
  11. type RPCClient struct {
  12. broker *MuxBroker
  13. control *rpc.Client
  14. plugins map[string]Plugin
  15. // These are the streams used for the various stdout/err overrides
  16. stdout, stderr net.Conn
  17. }
  18. // newRPCClient creates a new RPCClient. The Client argument is expected
  19. // to be successfully started already with a lock held.
  20. func newRPCClient(c *Client) (*RPCClient, error) {
  21. // Connect to the client
  22. conn, err := net.Dial(c.address.Network(), c.address.String())
  23. if err != nil {
  24. return nil, err
  25. }
  26. if tcpConn, ok := conn.(*net.TCPConn); ok {
  27. // Make sure to set keep alive so that the connection doesn't die
  28. tcpConn.SetKeepAlive(true)
  29. }
  30. if c.config.TLSConfig != nil {
  31. conn = tls.Client(conn, c.config.TLSConfig)
  32. }
  33. // Create the actual RPC client
  34. result, err := NewRPCClient(conn, c.config.Plugins)
  35. if err != nil {
  36. conn.Close()
  37. return nil, err
  38. }
  39. // Begin the stream syncing so that stdin, out, err work properly
  40. err = result.SyncStreams(
  41. c.config.SyncStdout,
  42. c.config.SyncStderr)
  43. if err != nil {
  44. result.Close()
  45. return nil, err
  46. }
  47. return result, nil
  48. }
  49. // NewRPCClient creates a client from an already-open connection-like value.
  50. // Dial is typically used instead.
  51. func NewRPCClient(conn io.ReadWriteCloser, plugins map[string]Plugin) (*RPCClient, error) {
  52. // Create the yamux client so we can multiplex
  53. mux, err := yamux.Client(conn, nil)
  54. if err != nil {
  55. conn.Close()
  56. return nil, err
  57. }
  58. // Connect to the control stream.
  59. control, err := mux.Open()
  60. if err != nil {
  61. mux.Close()
  62. return nil, err
  63. }
  64. // Connect stdout, stderr streams
  65. stdstream := make([]net.Conn, 2)
  66. for i, _ := range stdstream {
  67. stdstream[i], err = mux.Open()
  68. if err != nil {
  69. mux.Close()
  70. return nil, err
  71. }
  72. }
  73. // Create the broker and start it up
  74. broker := newMuxBroker(mux)
  75. go broker.Run()
  76. // Build the client using our broker and control channel.
  77. return &RPCClient{
  78. broker: broker,
  79. control: rpc.NewClient(control),
  80. plugins: plugins,
  81. stdout: stdstream[0],
  82. stderr: stdstream[1],
  83. }, nil
  84. }
  85. // SyncStreams should be called to enable syncing of stdout,
  86. // stderr with the plugin.
  87. //
  88. // This will return immediately and the syncing will continue to happen
  89. // in the background. You do not need to launch this in a goroutine itself.
  90. //
  91. // This should never be called multiple times.
  92. func (c *RPCClient) SyncStreams(stdout io.Writer, stderr io.Writer) error {
  93. go copyStream("stdout", stdout, c.stdout)
  94. go copyStream("stderr", stderr, c.stderr)
  95. return nil
  96. }
  97. // Close closes the connection. The client is no longer usable after this
  98. // is called.
  99. func (c *RPCClient) Close() error {
  100. // Call the control channel and ask it to gracefully exit. If this
  101. // errors, then we save it so that we always return an error but we
  102. // want to try to close the other channels anyways.
  103. var empty struct{}
  104. returnErr := c.control.Call("Control.Quit", true, &empty)
  105. // Close the other streams we have
  106. if err := c.control.Close(); err != nil {
  107. return err
  108. }
  109. if err := c.stdout.Close(); err != nil {
  110. return err
  111. }
  112. if err := c.stderr.Close(); err != nil {
  113. return err
  114. }
  115. if err := c.broker.Close(); err != nil {
  116. return err
  117. }
  118. // Return back the error we got from Control.Quit. This is very important
  119. // since we MUST return non-nil error if this fails so that Client.Kill
  120. // will properly try a process.Kill.
  121. return returnErr
  122. }
  123. func (c *RPCClient) Dispense(name string) (interface{}, error) {
  124. p, ok := c.plugins[name]
  125. if !ok {
  126. return nil, fmt.Errorf("unknown plugin type: %s", name)
  127. }
  128. var id uint32
  129. if err := c.control.Call(
  130. "Dispenser.Dispense", name, &id); err != nil {
  131. return nil, err
  132. }
  133. conn, err := c.broker.Dial(id)
  134. if err != nil {
  135. return nil, err
  136. }
  137. return p.Client(c.broker, rpc.NewClient(conn))
  138. }
  139. // Ping pings the connection to ensure it is still alive.
  140. //
  141. // The error from the RPC call is returned exactly if you want to inspect
  142. // it for further error analysis. Any error returned from here would indicate
  143. // that the connection to the plugin is not healthy.
  144. func (c *RPCClient) Ping() error {
  145. var empty struct{}
  146. return c.control.Call("Control.Ping", true, &empty)
  147. }