| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995 |
- package plugin
- import (
- "bufio"
- "context"
- "crypto/subtle"
- "crypto/tls"
- "crypto/x509"
- "encoding/base64"
- "errors"
- "fmt"
- "hash"
- "io"
- "io/ioutil"
- "net"
- "os"
- "os/exec"
- "path/filepath"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- hclog "github.com/hashicorp/go-hclog"
- )
- // If this is 1, then we've called CleanupClients. This can be used
- // by plugin RPC implementations to change error behavior since you
- // can expected network connection errors at this point. This should be
- // read by using sync/atomic.
- var Killed uint32 = 0
- // This is a slice of the "managed" clients which are cleaned up when
- // calling Cleanup
- var managedClients = make([]*Client, 0, 5)
- var managedClientsLock sync.Mutex
- // Error types
- var (
- // ErrProcessNotFound is returned when a client is instantiated to
- // reattach to an existing process and it isn't found.
- ErrProcessNotFound = errors.New("Reattachment process not found")
- // ErrChecksumsDoNotMatch is returned when binary's checksum doesn't match
- // the one provided in the SecureConfig.
- ErrChecksumsDoNotMatch = errors.New("checksums did not match")
- // ErrSecureNoChecksum is returned when an empty checksum is provided to the
- // SecureConfig.
- ErrSecureConfigNoChecksum = errors.New("no checksum provided")
- // ErrSecureNoHash is returned when a nil Hash object is provided to the
- // SecureConfig.
- ErrSecureConfigNoHash = errors.New("no hash implementation provided")
- // ErrSecureConfigAndReattach is returned when both Reattach and
- // SecureConfig are set.
- ErrSecureConfigAndReattach = errors.New("only one of Reattach or SecureConfig can be set")
- )
- // Client handles the lifecycle of a plugin application. It launches
- // plugins, connects to them, dispenses interface implementations, and handles
- // killing the process.
- //
- // Plugin hosts should use one Client for each plugin executable. To
- // dispense a plugin type, use the `Client.Client` function, and then
- // cal `Dispense`. This awkward API is mostly historical but is used to split
- // the client that deals with subprocess management and the client that
- // does RPC management.
- //
- // See NewClient and ClientConfig for using a Client.
- type Client struct {
- config *ClientConfig
- exited bool
- l sync.Mutex
- address net.Addr
- process *os.Process
- client ClientProtocol
- protocol Protocol
- logger hclog.Logger
- doneCtx context.Context
- ctxCancel context.CancelFunc
- negotiatedVersion int
- // clientWaitGroup is used to manage the lifecycle of the plugin management
- // goroutines.
- clientWaitGroup sync.WaitGroup
- // processKilled is used for testing only, to flag when the process was
- // forcefully killed.
- processKilled bool
- }
- // NegotiatedVersion returns the protocol version negotiated with the server.
- // This is only valid after Start() is called.
- func (c *Client) NegotiatedVersion() int {
- return c.negotiatedVersion
- }
- // ClientConfig is the configuration used to initialize a new
- // plugin client. After being used to initialize a plugin client,
- // that configuration must not be modified again.
- type ClientConfig struct {
- // HandshakeConfig is the configuration that must match servers.
- HandshakeConfig
- // Plugins are the plugins that can be consumed.
- // The implied version of this PluginSet is the Handshake.ProtocolVersion.
- Plugins PluginSet
- // VersionedPlugins is a map of PluginSets for specific protocol versions.
- // These can be used to negotiate a compatible version between client and
- // server. If this is set, Handshake.ProtocolVersion is not required.
- VersionedPlugins map[int]PluginSet
- // One of the following must be set, but not both.
- //
- // Cmd is the unstarted subprocess for starting the plugin. If this is
- // set, then the Client starts the plugin process on its own and connects
- // to it.
- //
- // Reattach is configuration for reattaching to an existing plugin process
- // that is already running. This isn't common.
- Cmd *exec.Cmd
- Reattach *ReattachConfig
- // SecureConfig is configuration for verifying the integrity of the
- // executable. It can not be used with Reattach.
- SecureConfig *SecureConfig
- // TLSConfig is used to enable TLS on the RPC client.
- TLSConfig *tls.Config
- // Managed represents if the client should be managed by the
- // plugin package or not. If true, then by calling CleanupClients,
- // it will automatically be cleaned up. Otherwise, the client
- // user is fully responsible for making sure to Kill all plugin
- // clients. By default the client is _not_ managed.
- Managed bool
- // The minimum and maximum port to use for communicating with
- // the subprocess. If not set, this defaults to 10,000 and 25,000
- // respectively.
- MinPort, MaxPort uint
- // StartTimeout is the timeout to wait for the plugin to say it
- // has started successfully.
- StartTimeout time.Duration
- // If non-nil, then the stderr of the client will be written to here
- // (as well as the log). This is the original os.Stderr of the subprocess.
- // This isn't the output of synced stderr.
- Stderr io.Writer
- // SyncStdout, SyncStderr can be set to override the
- // respective os.Std* values in the plugin. Care should be taken to
- // avoid races here. If these are nil, then this will automatically be
- // hooked up to os.Stdin, Stdout, and Stderr, respectively.
- //
- // If the default values (nil) are used, then this package will not
- // sync any of these streams.
- SyncStdout io.Writer
- SyncStderr io.Writer
- // AllowedProtocols is a list of allowed protocols. If this isn't set,
- // then only netrpc is allowed. This is so that older go-plugin systems
- // can show friendly errors if they see a plugin with an unknown
- // protocol.
- //
- // By setting this, you can cause an error immediately on plugin start
- // if an unsupported protocol is used with a good error message.
- //
- // If this isn't set at all (nil value), then only net/rpc is accepted.
- // This is done for legacy reasons. You must explicitly opt-in to
- // new protocols.
- AllowedProtocols []Protocol
- // Logger is the logger that the client will used. If none is provided,
- // it will default to hclog's default logger.
- Logger hclog.Logger
- // AutoMTLS has the client and server automatically negotiate mTLS for
- // transport authentication. This ensures that only the original client will
- // be allowed to connect to the server, and all other connections will be
- // rejected. The client will also refuse to connect to any server that isn't
- // the original instance started by the client.
- //
- // In this mode of operation, the client generates a one-time use tls
- // certificate, sends the public x.509 certificate to the new server, and
- // the server generates a one-time use tls certificate, and sends the public
- // x.509 certificate back to the client. These are used to authenticate all
- // rpc connections between the client and server.
- //
- // Setting AutoMTLS to true implies that the server must support the
- // protocol, and correctly negotiate the tls certificates, or a connection
- // failure will result.
- //
- // The client should not set TLSConfig, nor should the server set a
- // TLSProvider, because AutoMTLS implies that a new certificate and tls
- // configuration will be generated at startup.
- //
- // You cannot Reattach to a server with this option enabled.
- AutoMTLS bool
- }
- // ReattachConfig is used to configure a client to reattach to an
- // already-running plugin process. You can retrieve this information by
- // calling ReattachConfig on Client.
- type ReattachConfig struct {
- Protocol Protocol
- Addr net.Addr
- Pid int
- }
- // SecureConfig is used to configure a client to verify the integrity of an
- // executable before running. It does this by verifying the checksum is
- // expected. Hash is used to specify the hashing method to use when checksumming
- // the file. The configuration is verified by the client by calling the
- // SecureConfig.Check() function.
- //
- // The host process should ensure the checksum was provided by a trusted and
- // authoritative source. The binary should be installed in such a way that it
- // can not be modified by an unauthorized user between the time of this check
- // and the time of execution.
- type SecureConfig struct {
- Checksum []byte
- Hash hash.Hash
- }
- // Check takes the filepath to an executable and returns true if the checksum of
- // the file matches the checksum provided in the SecureConfig.
- func (s *SecureConfig) Check(filePath string) (bool, error) {
- if len(s.Checksum) == 0 {
- return false, ErrSecureConfigNoChecksum
- }
- if s.Hash == nil {
- return false, ErrSecureConfigNoHash
- }
- file, err := os.Open(filePath)
- if err != nil {
- return false, err
- }
- defer file.Close()
- _, err = io.Copy(s.Hash, file)
- if err != nil {
- return false, err
- }
- sum := s.Hash.Sum(nil)
- return subtle.ConstantTimeCompare(sum, s.Checksum) == 1, nil
- }
- // This makes sure all the managed subprocesses are killed and properly
- // logged. This should be called before the parent process running the
- // plugins exits.
- //
- // This must only be called _once_.
- func CleanupClients() {
- // Set the killed to true so that we don't get unexpected panics
- atomic.StoreUint32(&Killed, 1)
- // Kill all the managed clients in parallel and use a WaitGroup
- // to wait for them all to finish up.
- var wg sync.WaitGroup
- managedClientsLock.Lock()
- for _, client := range managedClients {
- wg.Add(1)
- go func(client *Client) {
- client.Kill()
- wg.Done()
- }(client)
- }
- managedClientsLock.Unlock()
- wg.Wait()
- }
- // Creates a new plugin client which manages the lifecycle of an external
- // plugin and gets the address for the RPC connection.
- //
- // The client must be cleaned up at some point by calling Kill(). If
- // the client is a managed client (created with NewManagedClient) you
- // can just call CleanupClients at the end of your program and they will
- // be properly cleaned.
- func NewClient(config *ClientConfig) (c *Client) {
- if config.MinPort == 0 && config.MaxPort == 0 {
- config.MinPort = 10000
- config.MaxPort = 25000
- }
- if config.StartTimeout == 0 {
- config.StartTimeout = 1 * time.Minute
- }
- if config.Stderr == nil {
- config.Stderr = ioutil.Discard
- }
- if config.SyncStdout == nil {
- config.SyncStdout = ioutil.Discard
- }
- if config.SyncStderr == nil {
- config.SyncStderr = ioutil.Discard
- }
- if config.AllowedProtocols == nil {
- config.AllowedProtocols = []Protocol{ProtocolNetRPC}
- }
- if config.Logger == nil {
- config.Logger = hclog.New(&hclog.LoggerOptions{
- Output: hclog.DefaultOutput,
- Level: hclog.Trace,
- Name: "plugin",
- })
- }
- c = &Client{
- config: config,
- logger: config.Logger,
- }
- if config.Managed {
- managedClientsLock.Lock()
- managedClients = append(managedClients, c)
- managedClientsLock.Unlock()
- }
- return
- }
- // Client returns the protocol client for this connection.
- //
- // Subsequent calls to this will return the same client.
- func (c *Client) Client() (ClientProtocol, error) {
- _, err := c.Start()
- if err != nil {
- return nil, err
- }
- c.l.Lock()
- defer c.l.Unlock()
- if c.client != nil {
- return c.client, nil
- }
- switch c.protocol {
- case ProtocolNetRPC:
- c.client, err = newRPCClient(c)
- case ProtocolGRPC:
- c.client, err = newGRPCClient(c.doneCtx, c)
- default:
- return nil, fmt.Errorf("unknown server protocol: %s", c.protocol)
- }
- if err != nil {
- c.client = nil
- return nil, err
- }
- return c.client, nil
- }
- // Tells whether or not the underlying process has exited.
- func (c *Client) Exited() bool {
- c.l.Lock()
- defer c.l.Unlock()
- return c.exited
- }
- // killed is used in tests to check if a process failed to exit gracefully, and
- // needed to be killed.
- func (c *Client) killed() bool {
- c.l.Lock()
- defer c.l.Unlock()
- return c.processKilled
- }
- // End the executing subprocess (if it is running) and perform any cleanup
- // tasks necessary such as capturing any remaining logs and so on.
- //
- // This method blocks until the process successfully exits.
- //
- // This method can safely be called multiple times.
- func (c *Client) Kill() {
- // Grab a lock to read some private fields.
- c.l.Lock()
- process := c.process
- addr := c.address
- c.l.Unlock()
- // If there is no process, there is nothing to kill.
- if process == nil {
- return
- }
- defer func() {
- // Wait for the all client goroutines to finish.
- c.clientWaitGroup.Wait()
- // Make sure there is no reference to the old process after it has been
- // killed.
- c.l.Lock()
- c.process = nil
- c.l.Unlock()
- }()
- // We need to check for address here. It is possible that the plugin
- // started (process != nil) but has no address (addr == nil) if the
- // plugin failed at startup. If we do have an address, we need to close
- // the plugin net connections.
- graceful := false
- if addr != nil {
- // Close the client to cleanly exit the process.
- client, err := c.Client()
- if err == nil {
- err = client.Close()
- // If there is no error, then we attempt to wait for a graceful
- // exit. If there was an error, we assume that graceful cleanup
- // won't happen and just force kill.
- graceful = err == nil
- if err != nil {
- // If there was an error just log it. We're going to force
- // kill in a moment anyways.
- c.logger.Warn("error closing client during Kill", "err", err)
- }
- } else {
- c.logger.Error("client", "error", err)
- }
- }
- // If we're attempting a graceful exit, then we wait for a short period
- // of time to allow that to happen. To wait for this we just wait on the
- // doneCh which would be closed if the process exits.
- if graceful {
- select {
- case <-c.doneCtx.Done():
- c.logger.Debug("plugin exited")
- return
- case <-time.After(2 * time.Second):
- }
- }
- // If graceful exiting failed, just kill it
- c.logger.Warn("plugin failed to exit gracefully")
- process.Kill()
- c.l.Lock()
- c.processKilled = true
- c.l.Unlock()
- }
- // Starts the underlying subprocess, communicating with it to negotiate
- // a port for RPC connections, and returning the address to connect via RPC.
- //
- // This method is safe to call multiple times. Subsequent calls have no effect.
- // Once a client has been started once, it cannot be started again, even if
- // it was killed.
- func (c *Client) Start() (addr net.Addr, err error) {
- c.l.Lock()
- defer c.l.Unlock()
- if c.address != nil {
- return c.address, nil
- }
- // If one of cmd or reattach isn't set, then it is an error. We wrap
- // this in a {} for scoping reasons, and hopeful that the escape
- // analysis will pop the stack here.
- {
- cmdSet := c.config.Cmd != nil
- attachSet := c.config.Reattach != nil
- secureSet := c.config.SecureConfig != nil
- if cmdSet == attachSet {
- return nil, fmt.Errorf("Only one of Cmd or Reattach must be set")
- }
- if secureSet && attachSet {
- return nil, ErrSecureConfigAndReattach
- }
- }
- if c.config.Reattach != nil {
- return c.reattach()
- }
- if c.config.VersionedPlugins == nil {
- c.config.VersionedPlugins = make(map[int]PluginSet)
- }
- // handle all plugins as versioned, using the handshake config as the default.
- version := int(c.config.ProtocolVersion)
- // Make sure we're not overwriting a real version 0. If ProtocolVersion was
- // non-zero, then we have to just assume the user made sure that
- // VersionedPlugins doesn't conflict.
- if _, ok := c.config.VersionedPlugins[version]; !ok && c.config.Plugins != nil {
- c.config.VersionedPlugins[version] = c.config.Plugins
- }
- var versionStrings []string
- for v := range c.config.VersionedPlugins {
- versionStrings = append(versionStrings, strconv.Itoa(v))
- }
- env := []string{
- fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue),
- fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort),
- fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort),
- fmt.Sprintf("PLUGIN_PROTOCOL_VERSIONS=%s", strings.Join(versionStrings, ",")),
- }
- cmd := c.config.Cmd
- cmd.Env = append(cmd.Env, os.Environ()...)
- cmd.Env = append(cmd.Env, env...)
- cmd.Stdin = os.Stdin
- cmdStdout, err := cmd.StdoutPipe()
- if err != nil {
- return nil, err
- }
- cmdStderr, err := cmd.StderrPipe()
- if err != nil {
- return nil, err
- }
- if c.config.SecureConfig != nil {
- if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil {
- return nil, fmt.Errorf("error verifying checksum: %s", err)
- } else if !ok {
- return nil, ErrChecksumsDoNotMatch
- }
- }
- // Setup a temporary certificate for client/server mtls, and send the public
- // certificate to the plugin.
- if c.config.AutoMTLS {
- c.logger.Info("configuring client automatic mTLS")
- certPEM, keyPEM, err := generateCert()
- if err != nil {
- c.logger.Error("failed to generate client certificate", "error", err)
- return nil, err
- }
- cert, err := tls.X509KeyPair(certPEM, keyPEM)
- if err != nil {
- c.logger.Error("failed to parse client certificate", "error", err)
- return nil, err
- }
- cmd.Env = append(cmd.Env, fmt.Sprintf("PLUGIN_CLIENT_CERT=%s", certPEM))
- c.config.TLSConfig = &tls.Config{
- Certificates: []tls.Certificate{cert},
- ServerName: "localhost",
- }
- }
- c.logger.Debug("starting plugin", "path", cmd.Path, "args", cmd.Args)
- err = cmd.Start()
- if err != nil {
- return
- }
- // Set the process
- c.process = cmd.Process
- c.logger.Debug("plugin started", "path", cmd.Path, "pid", c.process.Pid)
- // Make sure the command is properly cleaned up if there is an error
- defer func() {
- r := recover()
- if err != nil || r != nil {
- cmd.Process.Kill()
- }
- if r != nil {
- panic(r)
- }
- }()
- // Create a context for when we kill
- c.doneCtx, c.ctxCancel = context.WithCancel(context.Background())
- c.clientWaitGroup.Add(1)
- go func() {
- // ensure the context is cancelled when we're done
- defer c.ctxCancel()
- defer c.clientWaitGroup.Done()
- // get the cmd info early, since the process information will be removed
- // in Kill.
- pid := c.process.Pid
- path := cmd.Path
- // Wait for the command to end.
- err := cmd.Wait()
- debugMsgArgs := []interface{}{
- "path", path,
- "pid", pid,
- }
- if err != nil {
- debugMsgArgs = append(debugMsgArgs,
- []interface{}{"error", err.Error()}...)
- }
- // Log and make sure to flush the logs write away
- c.logger.Debug("plugin process exited", debugMsgArgs...)
- os.Stderr.Sync()
- // Set that we exited, which takes a lock
- c.l.Lock()
- defer c.l.Unlock()
- c.exited = true
- }()
- // Start goroutine that logs the stderr
- c.clientWaitGroup.Add(1)
- // logStderr calls Done()
- go c.logStderr(cmdStderr)
- // Start a goroutine that is going to be reading the lines
- // out of stdout
- linesCh := make(chan string)
- c.clientWaitGroup.Add(1)
- go func() {
- defer c.clientWaitGroup.Done()
- defer close(linesCh)
- scanner := bufio.NewScanner(cmdStdout)
- for scanner.Scan() {
- linesCh <- scanner.Text()
- }
- }()
- // Make sure after we exit we read the lines from stdout forever
- // so they don't block since it is a pipe.
- // The scanner goroutine above will close this, but track it with a wait
- // group for completeness.
- c.clientWaitGroup.Add(1)
- defer func() {
- go func() {
- defer c.clientWaitGroup.Done()
- for range linesCh {
- }
- }()
- }()
- // Some channels for the next step
- timeout := time.After(c.config.StartTimeout)
- // Start looking for the address
- c.logger.Debug("waiting for RPC address", "path", cmd.Path)
- select {
- case <-timeout:
- err = errors.New("timeout while waiting for plugin to start")
- case <-c.doneCtx.Done():
- err = errors.New("plugin exited before we could connect")
- case line := <-linesCh:
- // Trim the line and split by "|" in order to get the parts of
- // the output.
- line = strings.TrimSpace(line)
- parts := strings.SplitN(line, "|", 6)
- if len(parts) < 4 {
- err = fmt.Errorf(
- "Unrecognized remote plugin message: %s\n\n"+
- "This usually means that the plugin is either invalid or simply\n"+
- "needs to be recompiled to support the latest protocol.", line)
- return
- }
- // Check the core protocol. Wrapped in a {} for scoping.
- {
- var coreProtocol int64
- coreProtocol, err = strconv.ParseInt(parts[0], 10, 0)
- if err != nil {
- err = fmt.Errorf("Error parsing core protocol version: %s", err)
- return
- }
- if int(coreProtocol) != CoreProtocolVersion {
- err = fmt.Errorf("Incompatible core API version with plugin. "+
- "Plugin version: %s, Core version: %d\n\n"+
- "To fix this, the plugin usually only needs to be recompiled.\n"+
- "Please report this to the plugin author.", parts[0], CoreProtocolVersion)
- return
- }
- }
- // Test the API version
- version, pluginSet, err := c.checkProtoVersion(parts[1])
- if err != nil {
- return addr, err
- }
- // set the Plugins value to the compatible set, so the version
- // doesn't need to be passed through to the ClientProtocol
- // implementation.
- c.config.Plugins = pluginSet
- c.negotiatedVersion = version
- c.logger.Debug("using plugin", "version", version)
- switch parts[2] {
- case "tcp":
- addr, err = net.ResolveTCPAddr("tcp", parts[3])
- case "unix":
- addr, err = net.ResolveUnixAddr("unix", parts[3])
- default:
- err = fmt.Errorf("Unknown address type: %s", parts[3])
- }
- // If we have a server type, then record that. We default to net/rpc
- // for backwards compatibility.
- c.protocol = ProtocolNetRPC
- if len(parts) >= 5 {
- c.protocol = Protocol(parts[4])
- }
- found := false
- for _, p := range c.config.AllowedProtocols {
- if p == c.protocol {
- found = true
- break
- }
- }
- if !found {
- err = fmt.Errorf("Unsupported plugin protocol %q. Supported: %v",
- c.protocol, c.config.AllowedProtocols)
- return addr, err
- }
- // See if we have a TLS certificate from the server.
- // Checking if the length is > 50 rules out catching the unused "extra"
- // data returned from some older implementations.
- if len(parts) >= 6 && len(parts[5]) > 50 {
- err := c.loadServerCert(parts[5])
- if err != nil {
- return nil, fmt.Errorf("error parsing server cert: %s", err)
- }
- }
- }
- c.address = addr
- return
- }
- // loadServerCert is used by AutoMTLS to read an x.509 cert returned by the
- // server, and load it as the RootCA for the client TLSConfig.
- func (c *Client) loadServerCert(cert string) error {
- certPool := x509.NewCertPool()
- asn1, err := base64.RawStdEncoding.DecodeString(cert)
- if err != nil {
- return err
- }
- x509Cert, err := x509.ParseCertificate([]byte(asn1))
- if err != nil {
- return err
- }
- certPool.AddCert(x509Cert)
- c.config.TLSConfig.RootCAs = certPool
- return nil
- }
- func (c *Client) reattach() (net.Addr, error) {
- // Verify the process still exists. If not, then it is an error
- p, err := os.FindProcess(c.config.Reattach.Pid)
- if err != nil {
- return nil, err
- }
- // Attempt to connect to the addr since on Unix systems FindProcess
- // doesn't actually return an error if it can't find the process.
- conn, err := net.Dial(
- c.config.Reattach.Addr.Network(),
- c.config.Reattach.Addr.String())
- if err != nil {
- p.Kill()
- return nil, ErrProcessNotFound
- }
- conn.Close()
- // Create a context for when we kill
- c.doneCtx, c.ctxCancel = context.WithCancel(context.Background())
- c.clientWaitGroup.Add(1)
- // Goroutine to mark exit status
- go func(pid int) {
- defer c.clientWaitGroup.Done()
- // ensure the context is cancelled when we're done
- defer c.ctxCancel()
- // Wait for the process to die
- pidWait(pid)
- // Log so we can see it
- c.logger.Debug("reattached plugin process exited")
- // Mark it
- c.l.Lock()
- defer c.l.Unlock()
- c.exited = true
- }(p.Pid)
- // Set the address and process
- c.address = c.config.Reattach.Addr
- c.process = p
- c.protocol = c.config.Reattach.Protocol
- if c.protocol == "" {
- // Default the protocol to net/rpc for backwards compatibility
- c.protocol = ProtocolNetRPC
- }
- return c.address, nil
- }
- // checkProtoVersion returns the negotiated version and PluginSet.
- // This returns an error if the server returned an incompatible protocol
- // version, or an invalid handshake response.
- func (c *Client) checkProtoVersion(protoVersion string) (int, PluginSet, error) {
- serverVersion, err := strconv.Atoi(protoVersion)
- if err != nil {
- return 0, nil, fmt.Errorf("Error parsing protocol version %q: %s", protoVersion, err)
- }
- // record these for the error message
- var clientVersions []int
- // all versions, including the legacy ProtocolVersion have been added to
- // the versions set
- for version, plugins := range c.config.VersionedPlugins {
- clientVersions = append(clientVersions, version)
- if serverVersion != version {
- continue
- }
- return version, plugins, nil
- }
- return 0, nil, fmt.Errorf("Incompatible API version with plugin. "+
- "Plugin version: %d, Client versions: %d", serverVersion, clientVersions)
- }
- // ReattachConfig returns the information that must be provided to NewClient
- // to reattach to the plugin process that this client started. This is
- // useful for plugins that detach from their parent process.
- //
- // If this returns nil then the process hasn't been started yet. Please
- // call Start or Client before calling this.
- func (c *Client) ReattachConfig() *ReattachConfig {
- c.l.Lock()
- defer c.l.Unlock()
- if c.address == nil {
- return nil
- }
- if c.config.Cmd != nil && c.config.Cmd.Process == nil {
- return nil
- }
- // If we connected via reattach, just return the information as-is
- if c.config.Reattach != nil {
- return c.config.Reattach
- }
- return &ReattachConfig{
- Protocol: c.protocol,
- Addr: c.address,
- Pid: c.config.Cmd.Process.Pid,
- }
- }
- // Protocol returns the protocol of server on the remote end. This will
- // start the plugin process if it isn't already started. Errors from
- // starting the plugin are surpressed and ProtocolInvalid is returned. It
- // is recommended you call Start explicitly before calling Protocol to ensure
- // no errors occur.
- func (c *Client) Protocol() Protocol {
- _, err := c.Start()
- if err != nil {
- return ProtocolInvalid
- }
- return c.protocol
- }
- func netAddrDialer(addr net.Addr) func(string, time.Duration) (net.Conn, error) {
- return func(_ string, _ time.Duration) (net.Conn, error) {
- // Connect to the client
- conn, err := net.Dial(addr.Network(), addr.String())
- if err != nil {
- return nil, err
- }
- if tcpConn, ok := conn.(*net.TCPConn); ok {
- // Make sure to set keep alive so that the connection doesn't die
- tcpConn.SetKeepAlive(true)
- }
- return conn, nil
- }
- }
- // dialer is compatible with grpc.WithDialer and creates the connection
- // to the plugin.
- func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) {
- conn, err := netAddrDialer(c.address)("", timeout)
- if err != nil {
- return nil, err
- }
- // If we have a TLS config we wrap our connection. We only do this
- // for net/rpc since gRPC uses its own mechanism for TLS.
- if c.protocol == ProtocolNetRPC && c.config.TLSConfig != nil {
- conn = tls.Client(conn, c.config.TLSConfig)
- }
- return conn, nil
- }
- var stdErrBufferSize = 64 * 1024
- func (c *Client) logStderr(r io.Reader) {
- defer c.clientWaitGroup.Done()
- l := c.logger.Named(filepath.Base(c.config.Cmd.Path))
- reader := bufio.NewReaderSize(r, stdErrBufferSize)
- // continuation indicates the previous line was a prefix
- continuation := false
- for {
- line, isPrefix, err := reader.ReadLine()
- switch {
- case err == io.EOF:
- return
- case err != nil:
- l.Error("reading plugin stderr", "error", err)
- return
- }
- c.config.Stderr.Write(line)
- // The line was longer than our max token size, so it's likely
- // incomplete and won't unmarshal.
- if isPrefix || continuation {
- l.Debug(string(line))
- // if we're finishing a continued line, add the newline back in
- if !isPrefix {
- c.config.Stderr.Write([]byte{'\n'})
- }
- continuation = isPrefix
- continue
- }
- c.config.Stderr.Write([]byte{'\n'})
- entry, err := parseJSON(line)
- // If output is not JSON format, print directly to Debug
- if err != nil {
- l.Debug(string(line))
- } else {
- out := flattenKVPairs(entry.KVPairs)
- out = append(out, "timestamp", entry.Timestamp.Format(hclog.TimeFormat))
- switch hclog.LevelFromString(entry.Level) {
- case hclog.Trace:
- l.Trace(entry.Message, out...)
- case hclog.Debug:
- l.Debug(entry.Message, out...)
- case hclog.Info:
- l.Info(entry.Message, out...)
- case hclog.Warn:
- l.Warn(entry.Message, out...)
- case hclog.Error:
- l.Error(entry.Message, out...)
- }
- }
- }
- }
|