mux_broker.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package plugin
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "log"
  6. "net"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/hashicorp/yamux"
  11. )
  12. // MuxBroker is responsible for brokering multiplexed connections by unique ID.
  13. //
  14. // It is used by plugins to multiplex multiple RPC connections and data
  15. // streams on top of a single connection between the plugin process and the
  16. // host process.
  17. //
  18. // This allows a plugin to request a channel with a specific ID to connect to
  19. // or accept a connection from, and the broker handles the details of
  20. // holding these channels open while they're being negotiated.
  21. //
  22. // The Plugin interface has access to these for both Server and Client.
  23. // The broker can be used by either (optionally) to reserve and connect to
  24. // new multiplexed streams. This is useful for complex args and return values,
  25. // or anything else you might need a data stream for.
  26. type MuxBroker struct {
  27. nextId uint32
  28. session *yamux.Session
  29. streams map[uint32]*muxBrokerPending
  30. sync.Mutex
  31. }
  32. type muxBrokerPending struct {
  33. ch chan net.Conn
  34. doneCh chan struct{}
  35. }
  36. func newMuxBroker(s *yamux.Session) *MuxBroker {
  37. return &MuxBroker{
  38. session: s,
  39. streams: make(map[uint32]*muxBrokerPending),
  40. }
  41. }
  42. // Accept accepts a connection by ID.
  43. //
  44. // This should not be called multiple times with the same ID at one time.
  45. func (m *MuxBroker) Accept(id uint32) (net.Conn, error) {
  46. var c net.Conn
  47. p := m.getStream(id)
  48. select {
  49. case c = <-p.ch:
  50. close(p.doneCh)
  51. case <-time.After(5 * time.Second):
  52. m.Lock()
  53. defer m.Unlock()
  54. delete(m.streams, id)
  55. return nil, fmt.Errorf("timeout waiting for accept")
  56. }
  57. // Ack our connection
  58. if err := binary.Write(c, binary.LittleEndian, id); err != nil {
  59. c.Close()
  60. return nil, err
  61. }
  62. return c, nil
  63. }
  64. // AcceptAndServe is used to accept a specific stream ID and immediately
  65. // serve an RPC server on that stream ID. This is used to easily serve
  66. // complex arguments.
  67. //
  68. // The served interface is always registered to the "Plugin" name.
  69. func (m *MuxBroker) AcceptAndServe(id uint32, v interface{}) {
  70. conn, err := m.Accept(id)
  71. if err != nil {
  72. log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
  73. return
  74. }
  75. serve(conn, "Plugin", v)
  76. }
  77. // Close closes the connection and all sub-connections.
  78. func (m *MuxBroker) Close() error {
  79. return m.session.Close()
  80. }
  81. // Dial opens a connection by ID.
  82. func (m *MuxBroker) Dial(id uint32) (net.Conn, error) {
  83. // Open the stream
  84. stream, err := m.session.OpenStream()
  85. if err != nil {
  86. return nil, err
  87. }
  88. // Write the stream ID onto the wire.
  89. if err := binary.Write(stream, binary.LittleEndian, id); err != nil {
  90. stream.Close()
  91. return nil, err
  92. }
  93. // Read the ack that we connected. Then we're off!
  94. var ack uint32
  95. if err := binary.Read(stream, binary.LittleEndian, &ack); err != nil {
  96. stream.Close()
  97. return nil, err
  98. }
  99. if ack != id {
  100. stream.Close()
  101. return nil, fmt.Errorf("bad ack: %d (expected %d)", ack, id)
  102. }
  103. return stream, nil
  104. }
  105. // NextId returns a unique ID to use next.
  106. //
  107. // It is possible for very long-running plugin hosts to wrap this value,
  108. // though it would require a very large amount of RPC calls. In practice
  109. // we've never seen it happen.
  110. func (m *MuxBroker) NextId() uint32 {
  111. return atomic.AddUint32(&m.nextId, 1)
  112. }
  113. // Run starts the brokering and should be executed in a goroutine, since it
  114. // blocks forever, or until the session closes.
  115. //
  116. // Uses of MuxBroker never need to call this. It is called internally by
  117. // the plugin host/client.
  118. func (m *MuxBroker) Run() {
  119. for {
  120. stream, err := m.session.AcceptStream()
  121. if err != nil {
  122. // Once we receive an error, just exit
  123. break
  124. }
  125. // Read the stream ID from the stream
  126. var id uint32
  127. if err := binary.Read(stream, binary.LittleEndian, &id); err != nil {
  128. stream.Close()
  129. continue
  130. }
  131. // Initialize the waiter
  132. p := m.getStream(id)
  133. select {
  134. case p.ch <- stream:
  135. default:
  136. }
  137. // Wait for a timeout
  138. go m.timeoutWait(id, p)
  139. }
  140. }
  141. func (m *MuxBroker) getStream(id uint32) *muxBrokerPending {
  142. m.Lock()
  143. defer m.Unlock()
  144. p, ok := m.streams[id]
  145. if ok {
  146. return p
  147. }
  148. m.streams[id] = &muxBrokerPending{
  149. ch: make(chan net.Conn, 1),
  150. doneCh: make(chan struct{}),
  151. }
  152. return m.streams[id]
  153. }
  154. func (m *MuxBroker) timeoutWait(id uint32, p *muxBrokerPending) {
  155. // Wait for the stream to either be picked up and connected, or
  156. // for a timeout.
  157. timeout := false
  158. select {
  159. case <-p.doneCh:
  160. case <-time.After(5 * time.Second):
  161. timeout = true
  162. }
  163. m.Lock()
  164. defer m.Unlock()
  165. // Delete the stream so no one else can grab it
  166. delete(m.streams, id)
  167. // If we timed out, then check if we have a channel in the buffer,
  168. // and if so, close it.
  169. if timeout {
  170. select {
  171. case s := <-p.ch:
  172. s.Close()
  173. }
  174. }
  175. }