rpc_server.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package plugin
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "log"
  7. "net"
  8. "net/rpc"
  9. "sync"
  10. "github.com/hashicorp/yamux"
  11. )
  12. // RPCServer listens for network connections and then dispenses interface
  13. // implementations over net/rpc.
  14. //
  15. // After setting the fields below, they shouldn't be read again directly
  16. // from the structure which may be reading/writing them concurrently.
  17. type RPCServer struct {
  18. Plugins map[string]Plugin
  19. // Stdout, Stderr are what this server will use instead of the
  20. // normal stdin/out/err. This is because due to the multi-process nature
  21. // of our plugin system, we can't use the normal process values so we
  22. // make our own custom one we pipe across.
  23. Stdout io.Reader
  24. Stderr io.Reader
  25. // DoneCh should be set to a non-nil channel that will be closed
  26. // when the control requests the RPC server to end.
  27. DoneCh chan<- struct{}
  28. lock sync.Mutex
  29. }
  30. // ServerProtocol impl.
  31. func (s *RPCServer) Init() error { return nil }
  32. // ServerProtocol impl.
  33. func (s *RPCServer) Config() string { return "" }
  34. // ServerProtocol impl.
  35. func (s *RPCServer) Serve(lis net.Listener) {
  36. for {
  37. conn, err := lis.Accept()
  38. if err != nil {
  39. log.Printf("[ERR] plugin: plugin server: %s", err)
  40. return
  41. }
  42. go s.ServeConn(conn)
  43. }
  44. }
  45. // ServeConn runs a single connection.
  46. //
  47. // ServeConn blocks, serving the connection until the client hangs up.
  48. func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) {
  49. // First create the yamux server to wrap this connection
  50. mux, err := yamux.Server(conn, nil)
  51. if err != nil {
  52. conn.Close()
  53. log.Printf("[ERR] plugin: error creating yamux server: %s", err)
  54. return
  55. }
  56. // Accept the control connection
  57. control, err := mux.Accept()
  58. if err != nil {
  59. mux.Close()
  60. if err != io.EOF {
  61. log.Printf("[ERR] plugin: error accepting control connection: %s", err)
  62. }
  63. return
  64. }
  65. // Connect the stdstreams (in, out, err)
  66. stdstream := make([]net.Conn, 2)
  67. for i, _ := range stdstream {
  68. stdstream[i], err = mux.Accept()
  69. if err != nil {
  70. mux.Close()
  71. log.Printf("[ERR] plugin: accepting stream %d: %s", i, err)
  72. return
  73. }
  74. }
  75. // Copy std streams out to the proper place
  76. go copyStream("stdout", stdstream[0], s.Stdout)
  77. go copyStream("stderr", stdstream[1], s.Stderr)
  78. // Create the broker and start it up
  79. broker := newMuxBroker(mux)
  80. go broker.Run()
  81. // Use the control connection to build the dispenser and serve the
  82. // connection.
  83. server := rpc.NewServer()
  84. server.RegisterName("Control", &controlServer{
  85. server: s,
  86. })
  87. server.RegisterName("Dispenser", &dispenseServer{
  88. broker: broker,
  89. plugins: s.Plugins,
  90. })
  91. server.ServeConn(control)
  92. }
  93. // done is called internally by the control server to trigger the
  94. // doneCh to close which is listened to by the main process to cleanly
  95. // exit.
  96. func (s *RPCServer) done() {
  97. s.lock.Lock()
  98. defer s.lock.Unlock()
  99. if s.DoneCh != nil {
  100. close(s.DoneCh)
  101. s.DoneCh = nil
  102. }
  103. }
  104. // dispenseServer dispenses variousinterface implementations for Terraform.
  105. type controlServer struct {
  106. server *RPCServer
  107. }
  108. // Ping can be called to verify the connection (and likely the binary)
  109. // is still alive to a plugin.
  110. func (c *controlServer) Ping(
  111. null bool, response *struct{}) error {
  112. *response = struct{}{}
  113. return nil
  114. }
  115. func (c *controlServer) Quit(
  116. null bool, response *struct{}) error {
  117. // End the server
  118. c.server.done()
  119. // Always return true
  120. *response = struct{}{}
  121. return nil
  122. }
  123. // dispenseServer dispenses variousinterface implementations for Terraform.
  124. type dispenseServer struct {
  125. broker *MuxBroker
  126. plugins map[string]Plugin
  127. }
  128. func (d *dispenseServer) Dispense(
  129. name string, response *uint32) error {
  130. // Find the function to create this implementation
  131. p, ok := d.plugins[name]
  132. if !ok {
  133. return fmt.Errorf("unknown plugin type: %s", name)
  134. }
  135. // Create the implementation first so we know if there is an error.
  136. impl, err := p.Server(d.broker)
  137. if err != nil {
  138. // We turn the error into an errors error so that it works across RPC
  139. return errors.New(err.Error())
  140. }
  141. // Reserve an ID for our implementation
  142. id := d.broker.NextId()
  143. *response = id
  144. // Run the rest in a goroutine since it can only happen once this RPC
  145. // call returns. We wait for a connection for the plugin implementation
  146. // and serve it.
  147. go func() {
  148. conn, err := d.broker.Accept(id)
  149. if err != nil {
  150. log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err)
  151. return
  152. }
  153. serve(conn, "Plugin", impl)
  154. }()
  155. return nil
  156. }
  157. func serve(conn io.ReadWriteCloser, name string, v interface{}) {
  158. server := rpc.NewServer()
  159. if err := server.RegisterName(name, v); err != nil {
  160. log.Printf("[ERR] go-plugin: plugin dispense error: %s", err)
  161. return
  162. }
  163. server.ServeConn(conn)
  164. }