| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457 |
- package plugin
- import (
- "context"
- "crypto/tls"
- "errors"
- "fmt"
- "log"
- "net"
- "sync"
- "sync/atomic"
- "time"
- "github.com/hashicorp/go-plugin/internal/plugin"
- "github.com/oklog/run"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials"
- )
- // streamer interface is used in the broker to send/receive connection
- // information.
- type streamer interface {
- Send(*plugin.ConnInfo) error
- Recv() (*plugin.ConnInfo, error)
- Close()
- }
- // sendErr is used to pass errors back during a send.
- type sendErr struct {
- i *plugin.ConnInfo
- ch chan error
- }
- // gRPCBrokerServer is used by the plugin to start a stream and to send
- // connection information to/from the plugin. Implements GRPCBrokerServer and
- // streamer interfaces.
- type gRPCBrokerServer struct {
- // send is used to send connection info to the gRPC stream.
- send chan *sendErr
- // recv is used to receive connection info from the gRPC stream.
- recv chan *plugin.ConnInfo
- // quit closes down the stream.
- quit chan struct{}
- // o is used to ensure we close the quit channel only once.
- o sync.Once
- }
- func newGRPCBrokerServer() *gRPCBrokerServer {
- return &gRPCBrokerServer{
- send: make(chan *sendErr),
- recv: make(chan *plugin.ConnInfo),
- quit: make(chan struct{}),
- }
- }
- // StartStream implements the GRPCBrokerServer interface and will block until
- // the quit channel is closed or the context reports Done. The stream will pass
- // connection information to/from the client.
- func (s *gRPCBrokerServer) StartStream(stream plugin.GRPCBroker_StartStreamServer) error {
- doneCh := stream.Context().Done()
- defer s.Close()
- // Proccess send stream
- go func() {
- for {
- select {
- case <-doneCh:
- return
- case <-s.quit:
- return
- case se := <-s.send:
- err := stream.Send(se.i)
- se.ch <- err
- }
- }
- }()
- // Process receive stream
- for {
- i, err := stream.Recv()
- if err != nil {
- return err
- }
- select {
- case <-doneCh:
- return nil
- case <-s.quit:
- return nil
- case s.recv <- i:
- }
- }
- return nil
- }
- // Send is used by the GRPCBroker to pass connection information into the stream
- // to the client.
- func (s *gRPCBrokerServer) Send(i *plugin.ConnInfo) error {
- ch := make(chan error)
- defer close(ch)
- select {
- case <-s.quit:
- return errors.New("broker closed")
- case s.send <- &sendErr{
- i: i,
- ch: ch,
- }:
- }
- return <-ch
- }
- // Recv is used by the GRPCBroker to pass connection information that has been
- // sent from the client from the stream to the broker.
- func (s *gRPCBrokerServer) Recv() (*plugin.ConnInfo, error) {
- select {
- case <-s.quit:
- return nil, errors.New("broker closed")
- case i := <-s.recv:
- return i, nil
- }
- }
- // Close closes the quit channel, shutting down the stream.
- func (s *gRPCBrokerServer) Close() {
- s.o.Do(func() {
- close(s.quit)
- })
- }
- // gRPCBrokerClientImpl is used by the client to start a stream and to send
- // connection information to/from the client. Implements GRPCBrokerClient and
- // streamer interfaces.
- type gRPCBrokerClientImpl struct {
- // client is the underlying GRPC client used to make calls to the server.
- client plugin.GRPCBrokerClient
- // send is used to send connection info to the gRPC stream.
- send chan *sendErr
- // recv is used to receive connection info from the gRPC stream.
- recv chan *plugin.ConnInfo
- // quit closes down the stream.
- quit chan struct{}
- // o is used to ensure we close the quit channel only once.
- o sync.Once
- }
- func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl {
- return &gRPCBrokerClientImpl{
- client: plugin.NewGRPCBrokerClient(conn),
- send: make(chan *sendErr),
- recv: make(chan *plugin.ConnInfo),
- quit: make(chan struct{}),
- }
- }
- // StartStream implements the GRPCBrokerClient interface and will block until
- // the quit channel is closed or the context reports Done. The stream will pass
- // connection information to/from the plugin.
- func (s *gRPCBrokerClientImpl) StartStream() error {
- ctx, cancelFunc := context.WithCancel(context.Background())
- defer cancelFunc()
- defer s.Close()
- stream, err := s.client.StartStream(ctx)
- if err != nil {
- return err
- }
- doneCh := stream.Context().Done()
- go func() {
- for {
- select {
- case <-doneCh:
- return
- case <-s.quit:
- return
- case se := <-s.send:
- err := stream.Send(se.i)
- se.ch <- err
- }
- }
- }()
- for {
- i, err := stream.Recv()
- if err != nil {
- return err
- }
- select {
- case <-doneCh:
- return nil
- case <-s.quit:
- return nil
- case s.recv <- i:
- }
- }
- return nil
- }
- // Send is used by the GRPCBroker to pass connection information into the stream
- // to the plugin.
- func (s *gRPCBrokerClientImpl) Send(i *plugin.ConnInfo) error {
- ch := make(chan error)
- defer close(ch)
- select {
- case <-s.quit:
- return errors.New("broker closed")
- case s.send <- &sendErr{
- i: i,
- ch: ch,
- }:
- }
- return <-ch
- }
- // Recv is used by the GRPCBroker to pass connection information that has been
- // sent from the plugin to the broker.
- func (s *gRPCBrokerClientImpl) Recv() (*plugin.ConnInfo, error) {
- select {
- case <-s.quit:
- return nil, errors.New("broker closed")
- case i := <-s.recv:
- return i, nil
- }
- }
- // Close closes the quit channel, shutting down the stream.
- func (s *gRPCBrokerClientImpl) Close() {
- s.o.Do(func() {
- close(s.quit)
- })
- }
- // GRPCBroker is responsible for brokering connections by unique ID.
- //
- // It is used by plugins to create multiple gRPC connections and data
- // streams between the plugin process and the host process.
- //
- // This allows a plugin to request a channel with a specific ID to connect to
- // or accept a connection from, and the broker handles the details of
- // holding these channels open while they're being negotiated.
- //
- // The Plugin interface has access to these for both Server and Client.
- // The broker can be used by either (optionally) to reserve and connect to
- // new streams. This is useful for complex args and return values,
- // or anything else you might need a data stream for.
- type GRPCBroker struct {
- nextId uint32
- streamer streamer
- streams map[uint32]*gRPCBrokerPending
- tls *tls.Config
- doneCh chan struct{}
- o sync.Once
- sync.Mutex
- }
- type gRPCBrokerPending struct {
- ch chan *plugin.ConnInfo
- doneCh chan struct{}
- }
- func newGRPCBroker(s streamer, tls *tls.Config) *GRPCBroker {
- return &GRPCBroker{
- streamer: s,
- streams: make(map[uint32]*gRPCBrokerPending),
- tls: tls,
- doneCh: make(chan struct{}),
- }
- }
- // Accept accepts a connection by ID.
- //
- // This should not be called multiple times with the same ID at one time.
- func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) {
- listener, err := serverListener()
- if err != nil {
- return nil, err
- }
- err = b.streamer.Send(&plugin.ConnInfo{
- ServiceId: id,
- Network: listener.Addr().Network(),
- Address: listener.Addr().String(),
- })
- if err != nil {
- return nil, err
- }
- return listener, nil
- }
- // AcceptAndServe is used to accept a specific stream ID and immediately
- // serve a gRPC server on that stream ID. This is used to easily serve
- // complex arguments. Each AcceptAndServe call opens a new listener socket and
- // sends the connection info down the stream to the dialer. Since a new
- // connection is opened every call, these calls should be used sparingly.
- // Multiple gRPC server implementations can be registered to a single
- // AcceptAndServe call.
- func (b *GRPCBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) {
- listener, err := b.Accept(id)
- if err != nil {
- log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
- return
- }
- defer listener.Close()
- var opts []grpc.ServerOption
- if b.tls != nil {
- opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(b.tls))}
- }
- server := s(opts)
- // Here we use a run group to close this goroutine if the server is shutdown
- // or the broker is shutdown.
- var g run.Group
- {
- // Serve on the listener, if shutting down call GracefulStop.
- g.Add(func() error {
- return server.Serve(listener)
- }, func(err error) {
- server.GracefulStop()
- })
- }
- {
- // block on the closeCh or the doneCh. If we are shutting down close the
- // closeCh.
- closeCh := make(chan struct{})
- g.Add(func() error {
- select {
- case <-b.doneCh:
- case <-closeCh:
- }
- return nil
- }, func(err error) {
- close(closeCh)
- })
- }
- // Block until we are done
- g.Run()
- }
- // Close closes the stream and all servers.
- func (b *GRPCBroker) Close() error {
- b.streamer.Close()
- b.o.Do(func() {
- close(b.doneCh)
- })
- return nil
- }
- // Dial opens a connection by ID.
- func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) {
- var c *plugin.ConnInfo
- // Open the stream
- p := b.getStream(id)
- select {
- case c = <-p.ch:
- close(p.doneCh)
- case <-time.After(5 * time.Second):
- return nil, fmt.Errorf("timeout waiting for connection info")
- }
- var addr net.Addr
- switch c.Network {
- case "tcp":
- addr, err = net.ResolveTCPAddr("tcp", c.Address)
- case "unix":
- addr, err = net.ResolveUnixAddr("unix", c.Address)
- default:
- err = fmt.Errorf("Unknown address type: %s", c.Address)
- }
- if err != nil {
- return nil, err
- }
- return dialGRPCConn(b.tls, netAddrDialer(addr))
- }
- // NextId returns a unique ID to use next.
- //
- // It is possible for very long-running plugin hosts to wrap this value,
- // though it would require a very large amount of calls. In practice
- // we've never seen it happen.
- func (m *GRPCBroker) NextId() uint32 {
- return atomic.AddUint32(&m.nextId, 1)
- }
- // Run starts the brokering and should be executed in a goroutine, since it
- // blocks forever, or until the session closes.
- //
- // Uses of GRPCBroker never need to call this. It is called internally by
- // the plugin host/client.
- func (m *GRPCBroker) Run() {
- for {
- stream, err := m.streamer.Recv()
- if err != nil {
- // Once we receive an error, just exit
- break
- }
- // Initialize the waiter
- p := m.getStream(stream.ServiceId)
- select {
- case p.ch <- stream:
- default:
- }
- go m.timeoutWait(stream.ServiceId, p)
- }
- }
- func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending {
- m.Lock()
- defer m.Unlock()
- p, ok := m.streams[id]
- if ok {
- return p
- }
- m.streams[id] = &gRPCBrokerPending{
- ch: make(chan *plugin.ConnInfo, 1),
- doneCh: make(chan struct{}),
- }
- return m.streams[id]
- }
- func (m *GRPCBroker) timeoutWait(id uint32, p *gRPCBrokerPending) {
- // Wait for the stream to either be picked up and connected, or
- // for a timeout.
- select {
- case <-p.doneCh:
- case <-time.After(5 * time.Second):
- }
- m.Lock()
- defer m.Unlock()
- // Delete the stream so no one else can grab it
- delete(m.streams, id)
- }
|