grpc_broker.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. package plugin
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "errors"
  6. "fmt"
  7. "log"
  8. "net"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/oklog/run"
  13. "google.golang.org/grpc"
  14. "google.golang.org/grpc/credentials"
  15. )
  16. // streamer interface is used in the broker to send/receive connection
  17. // information.
  18. type streamer interface {
  19. Send(*ConnInfo) error
  20. Recv() (*ConnInfo, error)
  21. Close()
  22. }
  23. // sendErr is used to pass errors back during a send.
  24. type sendErr struct {
  25. i *ConnInfo
  26. ch chan error
  27. }
  28. // gRPCBrokerServer is used by the plugin to start a stream and to send
  29. // connection information to/from the plugin. Implements GRPCBrokerServer and
  30. // streamer interfaces.
  31. type gRPCBrokerServer struct {
  32. // send is used to send connection info to the gRPC stream.
  33. send chan *sendErr
  34. // recv is used to receive connection info from the gRPC stream.
  35. recv chan *ConnInfo
  36. // quit closes down the stream.
  37. quit chan struct{}
  38. // o is used to ensure we close the quit channel only once.
  39. o sync.Once
  40. }
  41. func newGRPCBrokerServer() *gRPCBrokerServer {
  42. return &gRPCBrokerServer{
  43. send: make(chan *sendErr),
  44. recv: make(chan *ConnInfo),
  45. quit: make(chan struct{}),
  46. }
  47. }
  48. // StartStream implements the GRPCBrokerServer interface and will block until
  49. // the quit channel is closed or the context reports Done. The stream will pass
  50. // connection information to/from the client.
  51. func (s *gRPCBrokerServer) StartStream(stream GRPCBroker_StartStreamServer) error {
  52. doneCh := stream.Context().Done()
  53. defer s.Close()
  54. // Proccess send stream
  55. go func() {
  56. for {
  57. select {
  58. case <-doneCh:
  59. return
  60. case <-s.quit:
  61. return
  62. case se := <-s.send:
  63. err := stream.Send(se.i)
  64. se.ch <- err
  65. }
  66. }
  67. }()
  68. // Process receive stream
  69. for {
  70. i, err := stream.Recv()
  71. if err != nil {
  72. return err
  73. }
  74. select {
  75. case <-doneCh:
  76. return nil
  77. case <-s.quit:
  78. return nil
  79. case s.recv <- i:
  80. }
  81. }
  82. return nil
  83. }
  84. // Send is used by the GRPCBroker to pass connection information into the stream
  85. // to the client.
  86. func (s *gRPCBrokerServer) Send(i *ConnInfo) error {
  87. ch := make(chan error)
  88. defer close(ch)
  89. select {
  90. case <-s.quit:
  91. return errors.New("broker closed")
  92. case s.send <- &sendErr{
  93. i: i,
  94. ch: ch,
  95. }:
  96. }
  97. return <-ch
  98. }
  99. // Recv is used by the GRPCBroker to pass connection information that has been
  100. // sent from the client from the stream to the broker.
  101. func (s *gRPCBrokerServer) Recv() (*ConnInfo, error) {
  102. select {
  103. case <-s.quit:
  104. return nil, errors.New("broker closed")
  105. case i := <-s.recv:
  106. return i, nil
  107. }
  108. }
  109. // Close closes the quit channel, shutting down the stream.
  110. func (s *gRPCBrokerServer) Close() {
  111. s.o.Do(func() {
  112. close(s.quit)
  113. })
  114. }
  115. // gRPCBrokerClientImpl is used by the client to start a stream and to send
  116. // connection information to/from the client. Implements GRPCBrokerClient and
  117. // streamer interfaces.
  118. type gRPCBrokerClientImpl struct {
  119. // client is the underlying GRPC client used to make calls to the server.
  120. client GRPCBrokerClient
  121. // send is used to send connection info to the gRPC stream.
  122. send chan *sendErr
  123. // recv is used to receive connection info from the gRPC stream.
  124. recv chan *ConnInfo
  125. // quit closes down the stream.
  126. quit chan struct{}
  127. // o is used to ensure we close the quit channel only once.
  128. o sync.Once
  129. }
  130. func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl {
  131. return &gRPCBrokerClientImpl{
  132. client: NewGRPCBrokerClient(conn),
  133. send: make(chan *sendErr),
  134. recv: make(chan *ConnInfo),
  135. quit: make(chan struct{}),
  136. }
  137. }
  138. // StartStream implements the GRPCBrokerClient interface and will block until
  139. // the quit channel is closed or the context reports Done. The stream will pass
  140. // connection information to/from the plugin.
  141. func (s *gRPCBrokerClientImpl) StartStream() error {
  142. ctx, cancelFunc := context.WithCancel(context.Background())
  143. defer cancelFunc()
  144. defer s.Close()
  145. stream, err := s.client.StartStream(ctx)
  146. if err != nil {
  147. return err
  148. }
  149. doneCh := stream.Context().Done()
  150. go func() {
  151. for {
  152. select {
  153. case <-doneCh:
  154. return
  155. case <-s.quit:
  156. return
  157. case se := <-s.send:
  158. err := stream.Send(se.i)
  159. se.ch <- err
  160. }
  161. }
  162. }()
  163. for {
  164. i, err := stream.Recv()
  165. if err != nil {
  166. return err
  167. }
  168. select {
  169. case <-doneCh:
  170. return nil
  171. case <-s.quit:
  172. return nil
  173. case s.recv <- i:
  174. }
  175. }
  176. return nil
  177. }
  178. // Send is used by the GRPCBroker to pass connection information into the stream
  179. // to the plugin.
  180. func (s *gRPCBrokerClientImpl) Send(i *ConnInfo) error {
  181. ch := make(chan error)
  182. defer close(ch)
  183. select {
  184. case <-s.quit:
  185. return errors.New("broker closed")
  186. case s.send <- &sendErr{
  187. i: i,
  188. ch: ch,
  189. }:
  190. }
  191. return <-ch
  192. }
  193. // Recv is used by the GRPCBroker to pass connection information that has been
  194. // sent from the plugin to the broker.
  195. func (s *gRPCBrokerClientImpl) Recv() (*ConnInfo, error) {
  196. select {
  197. case <-s.quit:
  198. return nil, errors.New("broker closed")
  199. case i := <-s.recv:
  200. return i, nil
  201. }
  202. }
  203. // Close closes the quit channel, shutting down the stream.
  204. func (s *gRPCBrokerClientImpl) Close() {
  205. s.o.Do(func() {
  206. close(s.quit)
  207. })
  208. }
  209. // GRPCBroker is responsible for brokering connections by unique ID.
  210. //
  211. // It is used by plugins to create multiple gRPC connections and data
  212. // streams between the plugin process and the host process.
  213. //
  214. // This allows a plugin to request a channel with a specific ID to connect to
  215. // or accept a connection from, and the broker handles the details of
  216. // holding these channels open while they're being negotiated.
  217. //
  218. // The Plugin interface has access to these for both Server and Client.
  219. // The broker can be used by either (optionally) to reserve and connect to
  220. // new streams. This is useful for complex args and return values,
  221. // or anything else you might need a data stream for.
  222. type GRPCBroker struct {
  223. nextId uint32
  224. streamer streamer
  225. streams map[uint32]*gRPCBrokerPending
  226. tls *tls.Config
  227. doneCh chan struct{}
  228. o sync.Once
  229. sync.Mutex
  230. }
  231. type gRPCBrokerPending struct {
  232. ch chan *ConnInfo
  233. doneCh chan struct{}
  234. }
  235. func newGRPCBroker(s streamer, tls *tls.Config) *GRPCBroker {
  236. return &GRPCBroker{
  237. streamer: s,
  238. streams: make(map[uint32]*gRPCBrokerPending),
  239. tls: tls,
  240. doneCh: make(chan struct{}),
  241. }
  242. }
  243. // Accept accepts a connection by ID.
  244. //
  245. // This should not be called multiple times with the same ID at one time.
  246. func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) {
  247. listener, err := serverListener()
  248. if err != nil {
  249. return nil, err
  250. }
  251. err = b.streamer.Send(&ConnInfo{
  252. ServiceId: id,
  253. Network: listener.Addr().Network(),
  254. Address: listener.Addr().String(),
  255. })
  256. if err != nil {
  257. return nil, err
  258. }
  259. return listener, nil
  260. }
  261. // AcceptAndServe is used to accept a specific stream ID and immediately
  262. // serve a gRPC server on that stream ID. This is used to easily serve
  263. // complex arguments. Each AcceptAndServe call opens a new listener socket and
  264. // sends the connection info down the stream to the dialer. Since a new
  265. // connection is opened every call, these calls should be used sparingly.
  266. // Multiple gRPC server implementations can be registered to a single
  267. // AcceptAndServe call.
  268. func (b *GRPCBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) {
  269. listener, err := b.Accept(id)
  270. if err != nil {
  271. log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
  272. return
  273. }
  274. defer listener.Close()
  275. var opts []grpc.ServerOption
  276. if b.tls != nil {
  277. opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(b.tls))}
  278. }
  279. server := s(opts)
  280. // Here we use a run group to close this goroutine if the server is shutdown
  281. // or the broker is shutdown.
  282. var g run.Group
  283. {
  284. // Serve on the listener, if shutting down call GracefulStop.
  285. g.Add(func() error {
  286. return server.Serve(listener)
  287. }, func(err error) {
  288. server.GracefulStop()
  289. })
  290. }
  291. {
  292. // block on the closeCh or the doneCh. If we are shutting down close the
  293. // closeCh.
  294. closeCh := make(chan struct{})
  295. g.Add(func() error {
  296. select {
  297. case <-b.doneCh:
  298. case <-closeCh:
  299. }
  300. return nil
  301. }, func(err error) {
  302. close(closeCh)
  303. })
  304. }
  305. // Block until we are done
  306. g.Run()
  307. }
  308. // Close closes the stream and all servers.
  309. func (b *GRPCBroker) Close() error {
  310. b.streamer.Close()
  311. b.o.Do(func() {
  312. close(b.doneCh)
  313. })
  314. return nil
  315. }
  316. // Dial opens a connection by ID.
  317. func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) {
  318. var c *ConnInfo
  319. // Open the stream
  320. p := b.getStream(id)
  321. select {
  322. case c = <-p.ch:
  323. close(p.doneCh)
  324. case <-time.After(5 * time.Second):
  325. return nil, fmt.Errorf("timeout waiting for connection info")
  326. }
  327. var addr net.Addr
  328. switch c.Network {
  329. case "tcp":
  330. addr, err = net.ResolveTCPAddr("tcp", c.Address)
  331. case "unix":
  332. addr, err = net.ResolveUnixAddr("unix", c.Address)
  333. default:
  334. err = fmt.Errorf("Unknown address type: %s", c.Address)
  335. }
  336. if err != nil {
  337. return nil, err
  338. }
  339. return dialGRPCConn(b.tls, netAddrDialer(addr))
  340. }
  341. // NextId returns a unique ID to use next.
  342. //
  343. // It is possible for very long-running plugin hosts to wrap this value,
  344. // though it would require a very large amount of calls. In practice
  345. // we've never seen it happen.
  346. func (m *GRPCBroker) NextId() uint32 {
  347. return atomic.AddUint32(&m.nextId, 1)
  348. }
  349. // Run starts the brokering and should be executed in a goroutine, since it
  350. // blocks forever, or until the session closes.
  351. //
  352. // Uses of GRPCBroker never need to call this. It is called internally by
  353. // the plugin host/client.
  354. func (m *GRPCBroker) Run() {
  355. for {
  356. stream, err := m.streamer.Recv()
  357. if err != nil {
  358. // Once we receive an error, just exit
  359. break
  360. }
  361. // Initialize the waiter
  362. p := m.getStream(stream.ServiceId)
  363. select {
  364. case p.ch <- stream:
  365. default:
  366. }
  367. go m.timeoutWait(stream.ServiceId, p)
  368. }
  369. }
  370. func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending {
  371. m.Lock()
  372. defer m.Unlock()
  373. p, ok := m.streams[id]
  374. if ok {
  375. return p
  376. }
  377. m.streams[id] = &gRPCBrokerPending{
  378. ch: make(chan *ConnInfo, 1),
  379. doneCh: make(chan struct{}),
  380. }
  381. return m.streams[id]
  382. }
  383. func (m *GRPCBroker) timeoutWait(id uint32, p *gRPCBrokerPending) {
  384. // Wait for the stream to either be picked up and connected, or
  385. // for a timeout.
  386. select {
  387. case <-p.doneCh:
  388. case <-time.After(5 * time.Second):
  389. }
  390. m.Lock()
  391. defer m.Unlock()
  392. // Delete the stream so no one else can grab it
  393. delete(m.streams, id)
  394. }