| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- package plugin
- import (
- "crypto/tls"
- "fmt"
- "io"
- "net"
- "net/rpc"
- "github.com/hashicorp/yamux"
- )
- // RPCClient connects to an RPCServer over net/rpc to dispense plugin types.
- type RPCClient struct {
- broker *MuxBroker
- control *rpc.Client
- plugins map[string]Plugin
- // These are the streams used for the various stdout/err overrides
- stdout, stderr net.Conn
- }
- // newRPCClient creates a new RPCClient. The Client argument is expected
- // to be successfully started already with a lock held.
- func newRPCClient(c *Client) (*RPCClient, error) {
- // Connect to the client
- conn, err := net.Dial(c.address.Network(), c.address.String())
- if err != nil {
- return nil, err
- }
- if tcpConn, ok := conn.(*net.TCPConn); ok {
- // Make sure to set keep alive so that the connection doesn't die
- tcpConn.SetKeepAlive(true)
- }
- if c.config.TLSConfig != nil {
- conn = tls.Client(conn, c.config.TLSConfig)
- }
- // Create the actual RPC client
- result, err := NewRPCClient(conn, c.config.Plugins)
- if err != nil {
- conn.Close()
- return nil, err
- }
- // Begin the stream syncing so that stdin, out, err work properly
- err = result.SyncStreams(
- c.config.SyncStdout,
- c.config.SyncStderr)
- if err != nil {
- result.Close()
- return nil, err
- }
- return result, nil
- }
- // NewRPCClient creates a client from an already-open connection-like value.
- // Dial is typically used instead.
- func NewRPCClient(conn io.ReadWriteCloser, plugins map[string]Plugin) (*RPCClient, error) {
- // Create the yamux client so we can multiplex
- mux, err := yamux.Client(conn, nil)
- if err != nil {
- conn.Close()
- return nil, err
- }
- // Connect to the control stream.
- control, err := mux.Open()
- if err != nil {
- mux.Close()
- return nil, err
- }
- // Connect stdout, stderr streams
- stdstream := make([]net.Conn, 2)
- for i, _ := range stdstream {
- stdstream[i], err = mux.Open()
- if err != nil {
- mux.Close()
- return nil, err
- }
- }
- // Create the broker and start it up
- broker := newMuxBroker(mux)
- go broker.Run()
- // Build the client using our broker and control channel.
- return &RPCClient{
- broker: broker,
- control: rpc.NewClient(control),
- plugins: plugins,
- stdout: stdstream[0],
- stderr: stdstream[1],
- }, nil
- }
- // SyncStreams should be called to enable syncing of stdout,
- // stderr with the plugin.
- //
- // This will return immediately and the syncing will continue to happen
- // in the background. You do not need to launch this in a goroutine itself.
- //
- // This should never be called multiple times.
- func (c *RPCClient) SyncStreams(stdout io.Writer, stderr io.Writer) error {
- go copyStream("stdout", stdout, c.stdout)
- go copyStream("stderr", stderr, c.stderr)
- return nil
- }
- // Close closes the connection. The client is no longer usable after this
- // is called.
- func (c *RPCClient) Close() error {
- // Call the control channel and ask it to gracefully exit. If this
- // errors, then we save it so that we always return an error but we
- // want to try to close the other channels anyways.
- var empty struct{}
- returnErr := c.control.Call("Control.Quit", true, &empty)
- // Close the other streams we have
- if err := c.control.Close(); err != nil {
- return err
- }
- if err := c.stdout.Close(); err != nil {
- return err
- }
- if err := c.stderr.Close(); err != nil {
- return err
- }
- if err := c.broker.Close(); err != nil {
- return err
- }
- // Return back the error we got from Control.Quit. This is very important
- // since we MUST return non-nil error if this fails so that Client.Kill
- // will properly try a process.Kill.
- return returnErr
- }
- func (c *RPCClient) Dispense(name string) (interface{}, error) {
- p, ok := c.plugins[name]
- if !ok {
- return nil, fmt.Errorf("unknown plugin type: %s", name)
- }
- var id uint32
- if err := c.control.Call(
- "Dispenser.Dispense", name, &id); err != nil {
- return nil, err
- }
- conn, err := c.broker.Dial(id)
- if err != nil {
- return nil, err
- }
- return p.Client(c.broker, rpc.NewClient(conn))
- }
- // Ping pings the connection to ensure it is still alive.
- //
- // The error from the RPC call is returned exactly if you want to inspect
- // it for further error analysis. Any error returned from here would indicate
- // that the connection to the plugin is not healthy.
- func (c *RPCClient) Ping() error {
- var empty struct{}
- return c.control.Call("Control.Ping", true, &empty)
- }
|