| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- package plugin
- import (
- "errors"
- "fmt"
- "io"
- "log"
- "net"
- "net/rpc"
- "sync"
- "github.com/hashicorp/yamux"
- )
- // RPCServer listens for network connections and then dispenses interface
- // implementations over net/rpc.
- //
- // After setting the fields below, they shouldn't be read again directly
- // from the structure which may be reading/writing them concurrently.
- type RPCServer struct {
- Plugins map[string]Plugin
- // Stdout, Stderr are what this server will use instead of the
- // normal stdin/out/err. This is because due to the multi-process nature
- // of our plugin system, we can't use the normal process values so we
- // make our own custom one we pipe across.
- Stdout io.Reader
- Stderr io.Reader
- // DoneCh should be set to a non-nil channel that will be closed
- // when the control requests the RPC server to end.
- DoneCh chan<- struct{}
- lock sync.Mutex
- }
- // ServerProtocol impl.
- func (s *RPCServer) Init() error { return nil }
- // ServerProtocol impl.
- func (s *RPCServer) Config() string { return "" }
- // ServerProtocol impl.
- func (s *RPCServer) Serve(lis net.Listener) {
- for {
- conn, err := lis.Accept()
- if err != nil {
- log.Printf("[ERR] plugin: plugin server: %s", err)
- return
- }
- go s.ServeConn(conn)
- }
- }
- // ServeConn runs a single connection.
- //
- // ServeConn blocks, serving the connection until the client hangs up.
- func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) {
- // First create the yamux server to wrap this connection
- mux, err := yamux.Server(conn, nil)
- if err != nil {
- conn.Close()
- log.Printf("[ERR] plugin: error creating yamux server: %s", err)
- return
- }
- // Accept the control connection
- control, err := mux.Accept()
- if err != nil {
- mux.Close()
- if err != io.EOF {
- log.Printf("[ERR] plugin: error accepting control connection: %s", err)
- }
- return
- }
- // Connect the stdstreams (in, out, err)
- stdstream := make([]net.Conn, 2)
- for i, _ := range stdstream {
- stdstream[i], err = mux.Accept()
- if err != nil {
- mux.Close()
- log.Printf("[ERR] plugin: accepting stream %d: %s", i, err)
- return
- }
- }
- // Copy std streams out to the proper place
- go copyStream("stdout", stdstream[0], s.Stdout)
- go copyStream("stderr", stdstream[1], s.Stderr)
- // Create the broker and start it up
- broker := newMuxBroker(mux)
- go broker.Run()
- // Use the control connection to build the dispenser and serve the
- // connection.
- server := rpc.NewServer()
- server.RegisterName("Control", &controlServer{
- server: s,
- })
- server.RegisterName("Dispenser", &dispenseServer{
- broker: broker,
- plugins: s.Plugins,
- })
- server.ServeConn(control)
- }
- // done is called internally by the control server to trigger the
- // doneCh to close which is listened to by the main process to cleanly
- // exit.
- func (s *RPCServer) done() {
- s.lock.Lock()
- defer s.lock.Unlock()
- if s.DoneCh != nil {
- close(s.DoneCh)
- s.DoneCh = nil
- }
- }
- // dispenseServer dispenses variousinterface implementations for Terraform.
- type controlServer struct {
- server *RPCServer
- }
- // Ping can be called to verify the connection (and likely the binary)
- // is still alive to a plugin.
- func (c *controlServer) Ping(
- null bool, response *struct{}) error {
- *response = struct{}{}
- return nil
- }
- func (c *controlServer) Quit(
- null bool, response *struct{}) error {
- // End the server
- c.server.done()
- // Always return true
- *response = struct{}{}
- return nil
- }
- // dispenseServer dispenses variousinterface implementations for Terraform.
- type dispenseServer struct {
- broker *MuxBroker
- plugins map[string]Plugin
- }
- func (d *dispenseServer) Dispense(
- name string, response *uint32) error {
- // Find the function to create this implementation
- p, ok := d.plugins[name]
- if !ok {
- return fmt.Errorf("unknown plugin type: %s", name)
- }
- // Create the implementation first so we know if there is an error.
- impl, err := p.Server(d.broker)
- if err != nil {
- // We turn the error into an errors error so that it works across RPC
- return errors.New(err.Error())
- }
- // Reserve an ID for our implementation
- id := d.broker.NextId()
- *response = id
- // Run the rest in a goroutine since it can only happen once this RPC
- // call returns. We wait for a connection for the plugin implementation
- // and serve it.
- go func() {
- conn, err := d.broker.Accept(id)
- if err != nil {
- log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err)
- return
- }
- serve(conn, "Plugin", impl)
- }()
- return nil
- }
- func serve(conn io.ReadWriteCloser, name string, v interface{}) {
- server := rpc.NewServer()
- if err := server.RegisterName(name, v); err != nil {
- log.Printf("[ERR] go-plugin: plugin dispense error: %s", err)
- return
- }
- server.ServeConn(conn)
- }
|