grpc_broker.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. package plugin
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "errors"
  6. "fmt"
  7. "log"
  8. "net"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/hashicorp/go-plugin/internal/plugin"
  13. "github.com/oklog/run"
  14. "google.golang.org/grpc"
  15. "google.golang.org/grpc/credentials"
  16. )
  17. // streamer interface is used in the broker to send/receive connection
  18. // information.
  19. type streamer interface {
  20. Send(*plugin.ConnInfo) error
  21. Recv() (*plugin.ConnInfo, error)
  22. Close()
  23. }
  24. // sendErr is used to pass errors back during a send.
  25. type sendErr struct {
  26. i *plugin.ConnInfo
  27. ch chan error
  28. }
  29. // gRPCBrokerServer is used by the plugin to start a stream and to send
  30. // connection information to/from the plugin. Implements GRPCBrokerServer and
  31. // streamer interfaces.
  32. type gRPCBrokerServer struct {
  33. // send is used to send connection info to the gRPC stream.
  34. send chan *sendErr
  35. // recv is used to receive connection info from the gRPC stream.
  36. recv chan *plugin.ConnInfo
  37. // quit closes down the stream.
  38. quit chan struct{}
  39. // o is used to ensure we close the quit channel only once.
  40. o sync.Once
  41. }
  42. func newGRPCBrokerServer() *gRPCBrokerServer {
  43. return &gRPCBrokerServer{
  44. send: make(chan *sendErr),
  45. recv: make(chan *plugin.ConnInfo),
  46. quit: make(chan struct{}),
  47. }
  48. }
  49. // StartStream implements the GRPCBrokerServer interface and will block until
  50. // the quit channel is closed or the context reports Done. The stream will pass
  51. // connection information to/from the client.
  52. func (s *gRPCBrokerServer) StartStream(stream plugin.GRPCBroker_StartStreamServer) error {
  53. doneCh := stream.Context().Done()
  54. defer s.Close()
  55. // Proccess send stream
  56. go func() {
  57. for {
  58. select {
  59. case <-doneCh:
  60. return
  61. case <-s.quit:
  62. return
  63. case se := <-s.send:
  64. err := stream.Send(se.i)
  65. se.ch <- err
  66. }
  67. }
  68. }()
  69. // Process receive stream
  70. for {
  71. i, err := stream.Recv()
  72. if err != nil {
  73. return err
  74. }
  75. select {
  76. case <-doneCh:
  77. return nil
  78. case <-s.quit:
  79. return nil
  80. case s.recv <- i:
  81. }
  82. }
  83. return nil
  84. }
  85. // Send is used by the GRPCBroker to pass connection information into the stream
  86. // to the client.
  87. func (s *gRPCBrokerServer) Send(i *plugin.ConnInfo) error {
  88. ch := make(chan error)
  89. defer close(ch)
  90. select {
  91. case <-s.quit:
  92. return errors.New("broker closed")
  93. case s.send <- &sendErr{
  94. i: i,
  95. ch: ch,
  96. }:
  97. }
  98. return <-ch
  99. }
  100. // Recv is used by the GRPCBroker to pass connection information that has been
  101. // sent from the client from the stream to the broker.
  102. func (s *gRPCBrokerServer) Recv() (*plugin.ConnInfo, error) {
  103. select {
  104. case <-s.quit:
  105. return nil, errors.New("broker closed")
  106. case i := <-s.recv:
  107. return i, nil
  108. }
  109. }
  110. // Close closes the quit channel, shutting down the stream.
  111. func (s *gRPCBrokerServer) Close() {
  112. s.o.Do(func() {
  113. close(s.quit)
  114. })
  115. }
  116. // gRPCBrokerClientImpl is used by the client to start a stream and to send
  117. // connection information to/from the client. Implements GRPCBrokerClient and
  118. // streamer interfaces.
  119. type gRPCBrokerClientImpl struct {
  120. // client is the underlying GRPC client used to make calls to the server.
  121. client plugin.GRPCBrokerClient
  122. // send is used to send connection info to the gRPC stream.
  123. send chan *sendErr
  124. // recv is used to receive connection info from the gRPC stream.
  125. recv chan *plugin.ConnInfo
  126. // quit closes down the stream.
  127. quit chan struct{}
  128. // o is used to ensure we close the quit channel only once.
  129. o sync.Once
  130. }
  131. func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl {
  132. return &gRPCBrokerClientImpl{
  133. client: plugin.NewGRPCBrokerClient(conn),
  134. send: make(chan *sendErr),
  135. recv: make(chan *plugin.ConnInfo),
  136. quit: make(chan struct{}),
  137. }
  138. }
  139. // StartStream implements the GRPCBrokerClient interface and will block until
  140. // the quit channel is closed or the context reports Done. The stream will pass
  141. // connection information to/from the plugin.
  142. func (s *gRPCBrokerClientImpl) StartStream() error {
  143. ctx, cancelFunc := context.WithCancel(context.Background())
  144. defer cancelFunc()
  145. defer s.Close()
  146. stream, err := s.client.StartStream(ctx)
  147. if err != nil {
  148. return err
  149. }
  150. doneCh := stream.Context().Done()
  151. go func() {
  152. for {
  153. select {
  154. case <-doneCh:
  155. return
  156. case <-s.quit:
  157. return
  158. case se := <-s.send:
  159. err := stream.Send(se.i)
  160. se.ch <- err
  161. }
  162. }
  163. }()
  164. for {
  165. i, err := stream.Recv()
  166. if err != nil {
  167. return err
  168. }
  169. select {
  170. case <-doneCh:
  171. return nil
  172. case <-s.quit:
  173. return nil
  174. case s.recv <- i:
  175. }
  176. }
  177. return nil
  178. }
  179. // Send is used by the GRPCBroker to pass connection information into the stream
  180. // to the plugin.
  181. func (s *gRPCBrokerClientImpl) Send(i *plugin.ConnInfo) error {
  182. ch := make(chan error)
  183. defer close(ch)
  184. select {
  185. case <-s.quit:
  186. return errors.New("broker closed")
  187. case s.send <- &sendErr{
  188. i: i,
  189. ch: ch,
  190. }:
  191. }
  192. return <-ch
  193. }
  194. // Recv is used by the GRPCBroker to pass connection information that has been
  195. // sent from the plugin to the broker.
  196. func (s *gRPCBrokerClientImpl) Recv() (*plugin.ConnInfo, error) {
  197. select {
  198. case <-s.quit:
  199. return nil, errors.New("broker closed")
  200. case i := <-s.recv:
  201. return i, nil
  202. }
  203. }
  204. // Close closes the quit channel, shutting down the stream.
  205. func (s *gRPCBrokerClientImpl) Close() {
  206. s.o.Do(func() {
  207. close(s.quit)
  208. })
  209. }
  210. // GRPCBroker is responsible for brokering connections by unique ID.
  211. //
  212. // It is used by plugins to create multiple gRPC connections and data
  213. // streams between the plugin process and the host process.
  214. //
  215. // This allows a plugin to request a channel with a specific ID to connect to
  216. // or accept a connection from, and the broker handles the details of
  217. // holding these channels open while they're being negotiated.
  218. //
  219. // The Plugin interface has access to these for both Server and Client.
  220. // The broker can be used by either (optionally) to reserve and connect to
  221. // new streams. This is useful for complex args and return values,
  222. // or anything else you might need a data stream for.
  223. type GRPCBroker struct {
  224. nextId uint32
  225. streamer streamer
  226. streams map[uint32]*gRPCBrokerPending
  227. tls *tls.Config
  228. doneCh chan struct{}
  229. o sync.Once
  230. sync.Mutex
  231. }
  232. type gRPCBrokerPending struct {
  233. ch chan *plugin.ConnInfo
  234. doneCh chan struct{}
  235. }
  236. func newGRPCBroker(s streamer, tls *tls.Config) *GRPCBroker {
  237. return &GRPCBroker{
  238. streamer: s,
  239. streams: make(map[uint32]*gRPCBrokerPending),
  240. tls: tls,
  241. doneCh: make(chan struct{}),
  242. }
  243. }
  244. // Accept accepts a connection by ID.
  245. //
  246. // This should not be called multiple times with the same ID at one time.
  247. func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) {
  248. listener, err := serverListener()
  249. if err != nil {
  250. return nil, err
  251. }
  252. err = b.streamer.Send(&plugin.ConnInfo{
  253. ServiceId: id,
  254. Network: listener.Addr().Network(),
  255. Address: listener.Addr().String(),
  256. })
  257. if err != nil {
  258. return nil, err
  259. }
  260. return listener, nil
  261. }
  262. // AcceptAndServe is used to accept a specific stream ID and immediately
  263. // serve a gRPC server on that stream ID. This is used to easily serve
  264. // complex arguments. Each AcceptAndServe call opens a new listener socket and
  265. // sends the connection info down the stream to the dialer. Since a new
  266. // connection is opened every call, these calls should be used sparingly.
  267. // Multiple gRPC server implementations can be registered to a single
  268. // AcceptAndServe call.
  269. func (b *GRPCBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) {
  270. listener, err := b.Accept(id)
  271. if err != nil {
  272. log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
  273. return
  274. }
  275. defer listener.Close()
  276. var opts []grpc.ServerOption
  277. if b.tls != nil {
  278. opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(b.tls))}
  279. }
  280. server := s(opts)
  281. // Here we use a run group to close this goroutine if the server is shutdown
  282. // or the broker is shutdown.
  283. var g run.Group
  284. {
  285. // Serve on the listener, if shutting down call GracefulStop.
  286. g.Add(func() error {
  287. return server.Serve(listener)
  288. }, func(err error) {
  289. server.GracefulStop()
  290. })
  291. }
  292. {
  293. // block on the closeCh or the doneCh. If we are shutting down close the
  294. // closeCh.
  295. closeCh := make(chan struct{})
  296. g.Add(func() error {
  297. select {
  298. case <-b.doneCh:
  299. case <-closeCh:
  300. }
  301. return nil
  302. }, func(err error) {
  303. close(closeCh)
  304. })
  305. }
  306. // Block until we are done
  307. g.Run()
  308. }
  309. // Close closes the stream and all servers.
  310. func (b *GRPCBroker) Close() error {
  311. b.streamer.Close()
  312. b.o.Do(func() {
  313. close(b.doneCh)
  314. })
  315. return nil
  316. }
  317. // Dial opens a connection by ID.
  318. func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) {
  319. var c *plugin.ConnInfo
  320. // Open the stream
  321. p := b.getStream(id)
  322. select {
  323. case c = <-p.ch:
  324. close(p.doneCh)
  325. case <-time.After(5 * time.Second):
  326. return nil, fmt.Errorf("timeout waiting for connection info")
  327. }
  328. var addr net.Addr
  329. switch c.Network {
  330. case "tcp":
  331. addr, err = net.ResolveTCPAddr("tcp", c.Address)
  332. case "unix":
  333. addr, err = net.ResolveUnixAddr("unix", c.Address)
  334. default:
  335. err = fmt.Errorf("Unknown address type: %s", c.Address)
  336. }
  337. if err != nil {
  338. return nil, err
  339. }
  340. return dialGRPCConn(b.tls, netAddrDialer(addr))
  341. }
  342. // NextId returns a unique ID to use next.
  343. //
  344. // It is possible for very long-running plugin hosts to wrap this value,
  345. // though it would require a very large amount of calls. In practice
  346. // we've never seen it happen.
  347. func (m *GRPCBroker) NextId() uint32 {
  348. return atomic.AddUint32(&m.nextId, 1)
  349. }
  350. // Run starts the brokering and should be executed in a goroutine, since it
  351. // blocks forever, or until the session closes.
  352. //
  353. // Uses of GRPCBroker never need to call this. It is called internally by
  354. // the plugin host/client.
  355. func (m *GRPCBroker) Run() {
  356. for {
  357. stream, err := m.streamer.Recv()
  358. if err != nil {
  359. // Once we receive an error, just exit
  360. break
  361. }
  362. // Initialize the waiter
  363. p := m.getStream(stream.ServiceId)
  364. select {
  365. case p.ch <- stream:
  366. default:
  367. }
  368. go m.timeoutWait(stream.ServiceId, p)
  369. }
  370. }
  371. func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending {
  372. m.Lock()
  373. defer m.Unlock()
  374. p, ok := m.streams[id]
  375. if ok {
  376. return p
  377. }
  378. m.streams[id] = &gRPCBrokerPending{
  379. ch: make(chan *plugin.ConnInfo, 1),
  380. doneCh: make(chan struct{}),
  381. }
  382. return m.streams[id]
  383. }
  384. func (m *GRPCBroker) timeoutWait(id uint32, p *gRPCBrokerPending) {
  385. // Wait for the stream to either be picked up and connected, or
  386. // for a timeout.
  387. select {
  388. case <-p.doneCh:
  389. case <-time.After(5 * time.Second):
  390. }
  391. m.Lock()
  392. defer m.Unlock()
  393. // Delete the stream so no one else can grab it
  394. delete(m.streams, id)
  395. }