client.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995
  1. package plugin
  2. import (
  3. "bufio"
  4. "context"
  5. "crypto/subtle"
  6. "crypto/tls"
  7. "crypto/x509"
  8. "encoding/base64"
  9. "errors"
  10. "fmt"
  11. "hash"
  12. "io"
  13. "io/ioutil"
  14. "net"
  15. "os"
  16. "os/exec"
  17. "path/filepath"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "sync/atomic"
  22. "time"
  23. hclog "github.com/hashicorp/go-hclog"
  24. )
  25. // If this is 1, then we've called CleanupClients. This can be used
  26. // by plugin RPC implementations to change error behavior since you
  27. // can expected network connection errors at this point. This should be
  28. // read by using sync/atomic.
  29. var Killed uint32 = 0
  30. // This is a slice of the "managed" clients which are cleaned up when
  31. // calling Cleanup
  32. var managedClients = make([]*Client, 0, 5)
  33. var managedClientsLock sync.Mutex
  34. // Error types
  35. var (
  36. // ErrProcessNotFound is returned when a client is instantiated to
  37. // reattach to an existing process and it isn't found.
  38. ErrProcessNotFound = errors.New("Reattachment process not found")
  39. // ErrChecksumsDoNotMatch is returned when binary's checksum doesn't match
  40. // the one provided in the SecureConfig.
  41. ErrChecksumsDoNotMatch = errors.New("checksums did not match")
  42. // ErrSecureNoChecksum is returned when an empty checksum is provided to the
  43. // SecureConfig.
  44. ErrSecureConfigNoChecksum = errors.New("no checksum provided")
  45. // ErrSecureNoHash is returned when a nil Hash object is provided to the
  46. // SecureConfig.
  47. ErrSecureConfigNoHash = errors.New("no hash implementation provided")
  48. // ErrSecureConfigAndReattach is returned when both Reattach and
  49. // SecureConfig are set.
  50. ErrSecureConfigAndReattach = errors.New("only one of Reattach or SecureConfig can be set")
  51. )
  52. // Client handles the lifecycle of a plugin application. It launches
  53. // plugins, connects to them, dispenses interface implementations, and handles
  54. // killing the process.
  55. //
  56. // Plugin hosts should use one Client for each plugin executable. To
  57. // dispense a plugin type, use the `Client.Client` function, and then
  58. // cal `Dispense`. This awkward API is mostly historical but is used to split
  59. // the client that deals with subprocess management and the client that
  60. // does RPC management.
  61. //
  62. // See NewClient and ClientConfig for using a Client.
  63. type Client struct {
  64. config *ClientConfig
  65. exited bool
  66. l sync.Mutex
  67. address net.Addr
  68. process *os.Process
  69. client ClientProtocol
  70. protocol Protocol
  71. logger hclog.Logger
  72. doneCtx context.Context
  73. ctxCancel context.CancelFunc
  74. negotiatedVersion int
  75. // clientWaitGroup is used to manage the lifecycle of the plugin management
  76. // goroutines.
  77. clientWaitGroup sync.WaitGroup
  78. // processKilled is used for testing only, to flag when the process was
  79. // forcefully killed.
  80. processKilled bool
  81. }
  82. // NegotiatedVersion returns the protocol version negotiated with the server.
  83. // This is only valid after Start() is called.
  84. func (c *Client) NegotiatedVersion() int {
  85. return c.negotiatedVersion
  86. }
  87. // ClientConfig is the configuration used to initialize a new
  88. // plugin client. After being used to initialize a plugin client,
  89. // that configuration must not be modified again.
  90. type ClientConfig struct {
  91. // HandshakeConfig is the configuration that must match servers.
  92. HandshakeConfig
  93. // Plugins are the plugins that can be consumed.
  94. // The implied version of this PluginSet is the Handshake.ProtocolVersion.
  95. Plugins PluginSet
  96. // VersionedPlugins is a map of PluginSets for specific protocol versions.
  97. // These can be used to negotiate a compatible version between client and
  98. // server. If this is set, Handshake.ProtocolVersion is not required.
  99. VersionedPlugins map[int]PluginSet
  100. // One of the following must be set, but not both.
  101. //
  102. // Cmd is the unstarted subprocess for starting the plugin. If this is
  103. // set, then the Client starts the plugin process on its own and connects
  104. // to it.
  105. //
  106. // Reattach is configuration for reattaching to an existing plugin process
  107. // that is already running. This isn't common.
  108. Cmd *exec.Cmd
  109. Reattach *ReattachConfig
  110. // SecureConfig is configuration for verifying the integrity of the
  111. // executable. It can not be used with Reattach.
  112. SecureConfig *SecureConfig
  113. // TLSConfig is used to enable TLS on the RPC client.
  114. TLSConfig *tls.Config
  115. // Managed represents if the client should be managed by the
  116. // plugin package or not. If true, then by calling CleanupClients,
  117. // it will automatically be cleaned up. Otherwise, the client
  118. // user is fully responsible for making sure to Kill all plugin
  119. // clients. By default the client is _not_ managed.
  120. Managed bool
  121. // The minimum and maximum port to use for communicating with
  122. // the subprocess. If not set, this defaults to 10,000 and 25,000
  123. // respectively.
  124. MinPort, MaxPort uint
  125. // StartTimeout is the timeout to wait for the plugin to say it
  126. // has started successfully.
  127. StartTimeout time.Duration
  128. // If non-nil, then the stderr of the client will be written to here
  129. // (as well as the log). This is the original os.Stderr of the subprocess.
  130. // This isn't the output of synced stderr.
  131. Stderr io.Writer
  132. // SyncStdout, SyncStderr can be set to override the
  133. // respective os.Std* values in the plugin. Care should be taken to
  134. // avoid races here. If these are nil, then this will automatically be
  135. // hooked up to os.Stdin, Stdout, and Stderr, respectively.
  136. //
  137. // If the default values (nil) are used, then this package will not
  138. // sync any of these streams.
  139. SyncStdout io.Writer
  140. SyncStderr io.Writer
  141. // AllowedProtocols is a list of allowed protocols. If this isn't set,
  142. // then only netrpc is allowed. This is so that older go-plugin systems
  143. // can show friendly errors if they see a plugin with an unknown
  144. // protocol.
  145. //
  146. // By setting this, you can cause an error immediately on plugin start
  147. // if an unsupported protocol is used with a good error message.
  148. //
  149. // If this isn't set at all (nil value), then only net/rpc is accepted.
  150. // This is done for legacy reasons. You must explicitly opt-in to
  151. // new protocols.
  152. AllowedProtocols []Protocol
  153. // Logger is the logger that the client will used. If none is provided,
  154. // it will default to hclog's default logger.
  155. Logger hclog.Logger
  156. // AutoMTLS has the client and server automatically negotiate mTLS for
  157. // transport authentication. This ensures that only the original client will
  158. // be allowed to connect to the server, and all other connections will be
  159. // rejected. The client will also refuse to connect to any server that isn't
  160. // the original instance started by the client.
  161. //
  162. // In this mode of operation, the client generates a one-time use tls
  163. // certificate, sends the public x.509 certificate to the new server, and
  164. // the server generates a one-time use tls certificate, and sends the public
  165. // x.509 certificate back to the client. These are used to authenticate all
  166. // rpc connections between the client and server.
  167. //
  168. // Setting AutoMTLS to true implies that the server must support the
  169. // protocol, and correctly negotiate the tls certificates, or a connection
  170. // failure will result.
  171. //
  172. // The client should not set TLSConfig, nor should the server set a
  173. // TLSProvider, because AutoMTLS implies that a new certificate and tls
  174. // configuration will be generated at startup.
  175. //
  176. // You cannot Reattach to a server with this option enabled.
  177. AutoMTLS bool
  178. }
  179. // ReattachConfig is used to configure a client to reattach to an
  180. // already-running plugin process. You can retrieve this information by
  181. // calling ReattachConfig on Client.
  182. type ReattachConfig struct {
  183. Protocol Protocol
  184. Addr net.Addr
  185. Pid int
  186. }
  187. // SecureConfig is used to configure a client to verify the integrity of an
  188. // executable before running. It does this by verifying the checksum is
  189. // expected. Hash is used to specify the hashing method to use when checksumming
  190. // the file. The configuration is verified by the client by calling the
  191. // SecureConfig.Check() function.
  192. //
  193. // The host process should ensure the checksum was provided by a trusted and
  194. // authoritative source. The binary should be installed in such a way that it
  195. // can not be modified by an unauthorized user between the time of this check
  196. // and the time of execution.
  197. type SecureConfig struct {
  198. Checksum []byte
  199. Hash hash.Hash
  200. }
  201. // Check takes the filepath to an executable and returns true if the checksum of
  202. // the file matches the checksum provided in the SecureConfig.
  203. func (s *SecureConfig) Check(filePath string) (bool, error) {
  204. if len(s.Checksum) == 0 {
  205. return false, ErrSecureConfigNoChecksum
  206. }
  207. if s.Hash == nil {
  208. return false, ErrSecureConfigNoHash
  209. }
  210. file, err := os.Open(filePath)
  211. if err != nil {
  212. return false, err
  213. }
  214. defer file.Close()
  215. _, err = io.Copy(s.Hash, file)
  216. if err != nil {
  217. return false, err
  218. }
  219. sum := s.Hash.Sum(nil)
  220. return subtle.ConstantTimeCompare(sum, s.Checksum) == 1, nil
  221. }
  222. // This makes sure all the managed subprocesses are killed and properly
  223. // logged. This should be called before the parent process running the
  224. // plugins exits.
  225. //
  226. // This must only be called _once_.
  227. func CleanupClients() {
  228. // Set the killed to true so that we don't get unexpected panics
  229. atomic.StoreUint32(&Killed, 1)
  230. // Kill all the managed clients in parallel and use a WaitGroup
  231. // to wait for them all to finish up.
  232. var wg sync.WaitGroup
  233. managedClientsLock.Lock()
  234. for _, client := range managedClients {
  235. wg.Add(1)
  236. go func(client *Client) {
  237. client.Kill()
  238. wg.Done()
  239. }(client)
  240. }
  241. managedClientsLock.Unlock()
  242. wg.Wait()
  243. }
  244. // Creates a new plugin client which manages the lifecycle of an external
  245. // plugin and gets the address for the RPC connection.
  246. //
  247. // The client must be cleaned up at some point by calling Kill(). If
  248. // the client is a managed client (created with NewManagedClient) you
  249. // can just call CleanupClients at the end of your program and they will
  250. // be properly cleaned.
  251. func NewClient(config *ClientConfig) (c *Client) {
  252. if config.MinPort == 0 && config.MaxPort == 0 {
  253. config.MinPort = 10000
  254. config.MaxPort = 25000
  255. }
  256. if config.StartTimeout == 0 {
  257. config.StartTimeout = 1 * time.Minute
  258. }
  259. if config.Stderr == nil {
  260. config.Stderr = ioutil.Discard
  261. }
  262. if config.SyncStdout == nil {
  263. config.SyncStdout = ioutil.Discard
  264. }
  265. if config.SyncStderr == nil {
  266. config.SyncStderr = ioutil.Discard
  267. }
  268. if config.AllowedProtocols == nil {
  269. config.AllowedProtocols = []Protocol{ProtocolNetRPC}
  270. }
  271. if config.Logger == nil {
  272. config.Logger = hclog.New(&hclog.LoggerOptions{
  273. Output: hclog.DefaultOutput,
  274. Level: hclog.Trace,
  275. Name: "plugin",
  276. })
  277. }
  278. c = &Client{
  279. config: config,
  280. logger: config.Logger,
  281. }
  282. if config.Managed {
  283. managedClientsLock.Lock()
  284. managedClients = append(managedClients, c)
  285. managedClientsLock.Unlock()
  286. }
  287. return
  288. }
  289. // Client returns the protocol client for this connection.
  290. //
  291. // Subsequent calls to this will return the same client.
  292. func (c *Client) Client() (ClientProtocol, error) {
  293. _, err := c.Start()
  294. if err != nil {
  295. return nil, err
  296. }
  297. c.l.Lock()
  298. defer c.l.Unlock()
  299. if c.client != nil {
  300. return c.client, nil
  301. }
  302. switch c.protocol {
  303. case ProtocolNetRPC:
  304. c.client, err = newRPCClient(c)
  305. case ProtocolGRPC:
  306. c.client, err = newGRPCClient(c.doneCtx, c)
  307. default:
  308. return nil, fmt.Errorf("unknown server protocol: %s", c.protocol)
  309. }
  310. if err != nil {
  311. c.client = nil
  312. return nil, err
  313. }
  314. return c.client, nil
  315. }
  316. // Tells whether or not the underlying process has exited.
  317. func (c *Client) Exited() bool {
  318. c.l.Lock()
  319. defer c.l.Unlock()
  320. return c.exited
  321. }
  322. // killed is used in tests to check if a process failed to exit gracefully, and
  323. // needed to be killed.
  324. func (c *Client) killed() bool {
  325. c.l.Lock()
  326. defer c.l.Unlock()
  327. return c.processKilled
  328. }
  329. // End the executing subprocess (if it is running) and perform any cleanup
  330. // tasks necessary such as capturing any remaining logs and so on.
  331. //
  332. // This method blocks until the process successfully exits.
  333. //
  334. // This method can safely be called multiple times.
  335. func (c *Client) Kill() {
  336. // Grab a lock to read some private fields.
  337. c.l.Lock()
  338. process := c.process
  339. addr := c.address
  340. c.l.Unlock()
  341. // If there is no process, there is nothing to kill.
  342. if process == nil {
  343. return
  344. }
  345. defer func() {
  346. // Wait for the all client goroutines to finish.
  347. c.clientWaitGroup.Wait()
  348. // Make sure there is no reference to the old process after it has been
  349. // killed.
  350. c.l.Lock()
  351. c.process = nil
  352. c.l.Unlock()
  353. }()
  354. // We need to check for address here. It is possible that the plugin
  355. // started (process != nil) but has no address (addr == nil) if the
  356. // plugin failed at startup. If we do have an address, we need to close
  357. // the plugin net connections.
  358. graceful := false
  359. if addr != nil {
  360. // Close the client to cleanly exit the process.
  361. client, err := c.Client()
  362. if err == nil {
  363. err = client.Close()
  364. // If there is no error, then we attempt to wait for a graceful
  365. // exit. If there was an error, we assume that graceful cleanup
  366. // won't happen and just force kill.
  367. graceful = err == nil
  368. if err != nil {
  369. // If there was an error just log it. We're going to force
  370. // kill in a moment anyways.
  371. c.logger.Warn("error closing client during Kill", "err", err)
  372. }
  373. } else {
  374. c.logger.Error("client", "error", err)
  375. }
  376. }
  377. // If we're attempting a graceful exit, then we wait for a short period
  378. // of time to allow that to happen. To wait for this we just wait on the
  379. // doneCh which would be closed if the process exits.
  380. if graceful {
  381. select {
  382. case <-c.doneCtx.Done():
  383. c.logger.Debug("plugin exited")
  384. return
  385. case <-time.After(2 * time.Second):
  386. }
  387. }
  388. // If graceful exiting failed, just kill it
  389. c.logger.Warn("plugin failed to exit gracefully")
  390. process.Kill()
  391. c.l.Lock()
  392. c.processKilled = true
  393. c.l.Unlock()
  394. }
  395. // Starts the underlying subprocess, communicating with it to negotiate
  396. // a port for RPC connections, and returning the address to connect via RPC.
  397. //
  398. // This method is safe to call multiple times. Subsequent calls have no effect.
  399. // Once a client has been started once, it cannot be started again, even if
  400. // it was killed.
  401. func (c *Client) Start() (addr net.Addr, err error) {
  402. c.l.Lock()
  403. defer c.l.Unlock()
  404. if c.address != nil {
  405. return c.address, nil
  406. }
  407. // If one of cmd or reattach isn't set, then it is an error. We wrap
  408. // this in a {} for scoping reasons, and hopeful that the escape
  409. // analysis will pop the stack here.
  410. {
  411. cmdSet := c.config.Cmd != nil
  412. attachSet := c.config.Reattach != nil
  413. secureSet := c.config.SecureConfig != nil
  414. if cmdSet == attachSet {
  415. return nil, fmt.Errorf("Only one of Cmd or Reattach must be set")
  416. }
  417. if secureSet && attachSet {
  418. return nil, ErrSecureConfigAndReattach
  419. }
  420. }
  421. if c.config.Reattach != nil {
  422. return c.reattach()
  423. }
  424. if c.config.VersionedPlugins == nil {
  425. c.config.VersionedPlugins = make(map[int]PluginSet)
  426. }
  427. // handle all plugins as versioned, using the handshake config as the default.
  428. version := int(c.config.ProtocolVersion)
  429. // Make sure we're not overwriting a real version 0. If ProtocolVersion was
  430. // non-zero, then we have to just assume the user made sure that
  431. // VersionedPlugins doesn't conflict.
  432. if _, ok := c.config.VersionedPlugins[version]; !ok && c.config.Plugins != nil {
  433. c.config.VersionedPlugins[version] = c.config.Plugins
  434. }
  435. var versionStrings []string
  436. for v := range c.config.VersionedPlugins {
  437. versionStrings = append(versionStrings, strconv.Itoa(v))
  438. }
  439. env := []string{
  440. fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue),
  441. fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort),
  442. fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort),
  443. fmt.Sprintf("PLUGIN_PROTOCOL_VERSIONS=%s", strings.Join(versionStrings, ",")),
  444. }
  445. cmd := c.config.Cmd
  446. cmd.Env = append(cmd.Env, os.Environ()...)
  447. cmd.Env = append(cmd.Env, env...)
  448. cmd.Stdin = os.Stdin
  449. cmdStdout, err := cmd.StdoutPipe()
  450. if err != nil {
  451. return nil, err
  452. }
  453. cmdStderr, err := cmd.StderrPipe()
  454. if err != nil {
  455. return nil, err
  456. }
  457. if c.config.SecureConfig != nil {
  458. if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil {
  459. return nil, fmt.Errorf("error verifying checksum: %s", err)
  460. } else if !ok {
  461. return nil, ErrChecksumsDoNotMatch
  462. }
  463. }
  464. // Setup a temporary certificate for client/server mtls, and send the public
  465. // certificate to the plugin.
  466. if c.config.AutoMTLS {
  467. c.logger.Info("configuring client automatic mTLS")
  468. certPEM, keyPEM, err := generateCert()
  469. if err != nil {
  470. c.logger.Error("failed to generate client certificate", "error", err)
  471. return nil, err
  472. }
  473. cert, err := tls.X509KeyPair(certPEM, keyPEM)
  474. if err != nil {
  475. c.logger.Error("failed to parse client certificate", "error", err)
  476. return nil, err
  477. }
  478. cmd.Env = append(cmd.Env, fmt.Sprintf("PLUGIN_CLIENT_CERT=%s", certPEM))
  479. c.config.TLSConfig = &tls.Config{
  480. Certificates: []tls.Certificate{cert},
  481. ServerName: "localhost",
  482. }
  483. }
  484. c.logger.Debug("starting plugin", "path", cmd.Path, "args", cmd.Args)
  485. err = cmd.Start()
  486. if err != nil {
  487. return
  488. }
  489. // Set the process
  490. c.process = cmd.Process
  491. c.logger.Debug("plugin started", "path", cmd.Path, "pid", c.process.Pid)
  492. // Make sure the command is properly cleaned up if there is an error
  493. defer func() {
  494. r := recover()
  495. if err != nil || r != nil {
  496. cmd.Process.Kill()
  497. }
  498. if r != nil {
  499. panic(r)
  500. }
  501. }()
  502. // Create a context for when we kill
  503. c.doneCtx, c.ctxCancel = context.WithCancel(context.Background())
  504. c.clientWaitGroup.Add(1)
  505. go func() {
  506. // ensure the context is cancelled when we're done
  507. defer c.ctxCancel()
  508. defer c.clientWaitGroup.Done()
  509. // get the cmd info early, since the process information will be removed
  510. // in Kill.
  511. pid := c.process.Pid
  512. path := cmd.Path
  513. // Wait for the command to end.
  514. err := cmd.Wait()
  515. debugMsgArgs := []interface{}{
  516. "path", path,
  517. "pid", pid,
  518. }
  519. if err != nil {
  520. debugMsgArgs = append(debugMsgArgs,
  521. []interface{}{"error", err.Error()}...)
  522. }
  523. // Log and make sure to flush the logs write away
  524. c.logger.Debug("plugin process exited", debugMsgArgs...)
  525. os.Stderr.Sync()
  526. // Set that we exited, which takes a lock
  527. c.l.Lock()
  528. defer c.l.Unlock()
  529. c.exited = true
  530. }()
  531. // Start goroutine that logs the stderr
  532. c.clientWaitGroup.Add(1)
  533. // logStderr calls Done()
  534. go c.logStderr(cmdStderr)
  535. // Start a goroutine that is going to be reading the lines
  536. // out of stdout
  537. linesCh := make(chan string)
  538. c.clientWaitGroup.Add(1)
  539. go func() {
  540. defer c.clientWaitGroup.Done()
  541. defer close(linesCh)
  542. scanner := bufio.NewScanner(cmdStdout)
  543. for scanner.Scan() {
  544. linesCh <- scanner.Text()
  545. }
  546. }()
  547. // Make sure after we exit we read the lines from stdout forever
  548. // so they don't block since it is a pipe.
  549. // The scanner goroutine above will close this, but track it with a wait
  550. // group for completeness.
  551. c.clientWaitGroup.Add(1)
  552. defer func() {
  553. go func() {
  554. defer c.clientWaitGroup.Done()
  555. for range linesCh {
  556. }
  557. }()
  558. }()
  559. // Some channels for the next step
  560. timeout := time.After(c.config.StartTimeout)
  561. // Start looking for the address
  562. c.logger.Debug("waiting for RPC address", "path", cmd.Path)
  563. select {
  564. case <-timeout:
  565. err = errors.New("timeout while waiting for plugin to start")
  566. case <-c.doneCtx.Done():
  567. err = errors.New("plugin exited before we could connect")
  568. case line := <-linesCh:
  569. // Trim the line and split by "|" in order to get the parts of
  570. // the output.
  571. line = strings.TrimSpace(line)
  572. parts := strings.SplitN(line, "|", 6)
  573. if len(parts) < 4 {
  574. err = fmt.Errorf(
  575. "Unrecognized remote plugin message: %s\n\n"+
  576. "This usually means that the plugin is either invalid or simply\n"+
  577. "needs to be recompiled to support the latest protocol.", line)
  578. return
  579. }
  580. // Check the core protocol. Wrapped in a {} for scoping.
  581. {
  582. var coreProtocol int64
  583. coreProtocol, err = strconv.ParseInt(parts[0], 10, 0)
  584. if err != nil {
  585. err = fmt.Errorf("Error parsing core protocol version: %s", err)
  586. return
  587. }
  588. if int(coreProtocol) != CoreProtocolVersion {
  589. err = fmt.Errorf("Incompatible core API version with plugin. "+
  590. "Plugin version: %s, Core version: %d\n\n"+
  591. "To fix this, the plugin usually only needs to be recompiled.\n"+
  592. "Please report this to the plugin author.", parts[0], CoreProtocolVersion)
  593. return
  594. }
  595. }
  596. // Test the API version
  597. version, pluginSet, err := c.checkProtoVersion(parts[1])
  598. if err != nil {
  599. return addr, err
  600. }
  601. // set the Plugins value to the compatible set, so the version
  602. // doesn't need to be passed through to the ClientProtocol
  603. // implementation.
  604. c.config.Plugins = pluginSet
  605. c.negotiatedVersion = version
  606. c.logger.Debug("using plugin", "version", version)
  607. switch parts[2] {
  608. case "tcp":
  609. addr, err = net.ResolveTCPAddr("tcp", parts[3])
  610. case "unix":
  611. addr, err = net.ResolveUnixAddr("unix", parts[3])
  612. default:
  613. err = fmt.Errorf("Unknown address type: %s", parts[3])
  614. }
  615. // If we have a server type, then record that. We default to net/rpc
  616. // for backwards compatibility.
  617. c.protocol = ProtocolNetRPC
  618. if len(parts) >= 5 {
  619. c.protocol = Protocol(parts[4])
  620. }
  621. found := false
  622. for _, p := range c.config.AllowedProtocols {
  623. if p == c.protocol {
  624. found = true
  625. break
  626. }
  627. }
  628. if !found {
  629. err = fmt.Errorf("Unsupported plugin protocol %q. Supported: %v",
  630. c.protocol, c.config.AllowedProtocols)
  631. return addr, err
  632. }
  633. // See if we have a TLS certificate from the server.
  634. // Checking if the length is > 50 rules out catching the unused "extra"
  635. // data returned from some older implementations.
  636. if len(parts) >= 6 && len(parts[5]) > 50 {
  637. err := c.loadServerCert(parts[5])
  638. if err != nil {
  639. return nil, fmt.Errorf("error parsing server cert: %s", err)
  640. }
  641. }
  642. }
  643. c.address = addr
  644. return
  645. }
  646. // loadServerCert is used by AutoMTLS to read an x.509 cert returned by the
  647. // server, and load it as the RootCA for the client TLSConfig.
  648. func (c *Client) loadServerCert(cert string) error {
  649. certPool := x509.NewCertPool()
  650. asn1, err := base64.RawStdEncoding.DecodeString(cert)
  651. if err != nil {
  652. return err
  653. }
  654. x509Cert, err := x509.ParseCertificate([]byte(asn1))
  655. if err != nil {
  656. return err
  657. }
  658. certPool.AddCert(x509Cert)
  659. c.config.TLSConfig.RootCAs = certPool
  660. return nil
  661. }
  662. func (c *Client) reattach() (net.Addr, error) {
  663. // Verify the process still exists. If not, then it is an error
  664. p, err := os.FindProcess(c.config.Reattach.Pid)
  665. if err != nil {
  666. return nil, err
  667. }
  668. // Attempt to connect to the addr since on Unix systems FindProcess
  669. // doesn't actually return an error if it can't find the process.
  670. conn, err := net.Dial(
  671. c.config.Reattach.Addr.Network(),
  672. c.config.Reattach.Addr.String())
  673. if err != nil {
  674. p.Kill()
  675. return nil, ErrProcessNotFound
  676. }
  677. conn.Close()
  678. // Create a context for when we kill
  679. c.doneCtx, c.ctxCancel = context.WithCancel(context.Background())
  680. c.clientWaitGroup.Add(1)
  681. // Goroutine to mark exit status
  682. go func(pid int) {
  683. defer c.clientWaitGroup.Done()
  684. // ensure the context is cancelled when we're done
  685. defer c.ctxCancel()
  686. // Wait for the process to die
  687. pidWait(pid)
  688. // Log so we can see it
  689. c.logger.Debug("reattached plugin process exited")
  690. // Mark it
  691. c.l.Lock()
  692. defer c.l.Unlock()
  693. c.exited = true
  694. }(p.Pid)
  695. // Set the address and process
  696. c.address = c.config.Reattach.Addr
  697. c.process = p
  698. c.protocol = c.config.Reattach.Protocol
  699. if c.protocol == "" {
  700. // Default the protocol to net/rpc for backwards compatibility
  701. c.protocol = ProtocolNetRPC
  702. }
  703. return c.address, nil
  704. }
  705. // checkProtoVersion returns the negotiated version and PluginSet.
  706. // This returns an error if the server returned an incompatible protocol
  707. // version, or an invalid handshake response.
  708. func (c *Client) checkProtoVersion(protoVersion string) (int, PluginSet, error) {
  709. serverVersion, err := strconv.Atoi(protoVersion)
  710. if err != nil {
  711. return 0, nil, fmt.Errorf("Error parsing protocol version %q: %s", protoVersion, err)
  712. }
  713. // record these for the error message
  714. var clientVersions []int
  715. // all versions, including the legacy ProtocolVersion have been added to
  716. // the versions set
  717. for version, plugins := range c.config.VersionedPlugins {
  718. clientVersions = append(clientVersions, version)
  719. if serverVersion != version {
  720. continue
  721. }
  722. return version, plugins, nil
  723. }
  724. return 0, nil, fmt.Errorf("Incompatible API version with plugin. "+
  725. "Plugin version: %d, Client versions: %d", serverVersion, clientVersions)
  726. }
  727. // ReattachConfig returns the information that must be provided to NewClient
  728. // to reattach to the plugin process that this client started. This is
  729. // useful for plugins that detach from their parent process.
  730. //
  731. // If this returns nil then the process hasn't been started yet. Please
  732. // call Start or Client before calling this.
  733. func (c *Client) ReattachConfig() *ReattachConfig {
  734. c.l.Lock()
  735. defer c.l.Unlock()
  736. if c.address == nil {
  737. return nil
  738. }
  739. if c.config.Cmd != nil && c.config.Cmd.Process == nil {
  740. return nil
  741. }
  742. // If we connected via reattach, just return the information as-is
  743. if c.config.Reattach != nil {
  744. return c.config.Reattach
  745. }
  746. return &ReattachConfig{
  747. Protocol: c.protocol,
  748. Addr: c.address,
  749. Pid: c.config.Cmd.Process.Pid,
  750. }
  751. }
  752. // Protocol returns the protocol of server on the remote end. This will
  753. // start the plugin process if it isn't already started. Errors from
  754. // starting the plugin are surpressed and ProtocolInvalid is returned. It
  755. // is recommended you call Start explicitly before calling Protocol to ensure
  756. // no errors occur.
  757. func (c *Client) Protocol() Protocol {
  758. _, err := c.Start()
  759. if err != nil {
  760. return ProtocolInvalid
  761. }
  762. return c.protocol
  763. }
  764. func netAddrDialer(addr net.Addr) func(string, time.Duration) (net.Conn, error) {
  765. return func(_ string, _ time.Duration) (net.Conn, error) {
  766. // Connect to the client
  767. conn, err := net.Dial(addr.Network(), addr.String())
  768. if err != nil {
  769. return nil, err
  770. }
  771. if tcpConn, ok := conn.(*net.TCPConn); ok {
  772. // Make sure to set keep alive so that the connection doesn't die
  773. tcpConn.SetKeepAlive(true)
  774. }
  775. return conn, nil
  776. }
  777. }
  778. // dialer is compatible with grpc.WithDialer and creates the connection
  779. // to the plugin.
  780. func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) {
  781. conn, err := netAddrDialer(c.address)("", timeout)
  782. if err != nil {
  783. return nil, err
  784. }
  785. // If we have a TLS config we wrap our connection. We only do this
  786. // for net/rpc since gRPC uses its own mechanism for TLS.
  787. if c.protocol == ProtocolNetRPC && c.config.TLSConfig != nil {
  788. conn = tls.Client(conn, c.config.TLSConfig)
  789. }
  790. return conn, nil
  791. }
  792. var stdErrBufferSize = 64 * 1024
  793. func (c *Client) logStderr(r io.Reader) {
  794. defer c.clientWaitGroup.Done()
  795. l := c.logger.Named(filepath.Base(c.config.Cmd.Path))
  796. reader := bufio.NewReaderSize(r, stdErrBufferSize)
  797. // continuation indicates the previous line was a prefix
  798. continuation := false
  799. for {
  800. line, isPrefix, err := reader.ReadLine()
  801. switch {
  802. case err == io.EOF:
  803. return
  804. case err != nil:
  805. l.Error("reading plugin stderr", "error", err)
  806. return
  807. }
  808. c.config.Stderr.Write(line)
  809. // The line was longer than our max token size, so it's likely
  810. // incomplete and won't unmarshal.
  811. if isPrefix || continuation {
  812. l.Debug(string(line))
  813. // if we're finishing a continued line, add the newline back in
  814. if !isPrefix {
  815. c.config.Stderr.Write([]byte{'\n'})
  816. }
  817. continuation = isPrefix
  818. continue
  819. }
  820. c.config.Stderr.Write([]byte{'\n'})
  821. entry, err := parseJSON(line)
  822. // If output is not JSON format, print directly to Debug
  823. if err != nil {
  824. l.Debug(string(line))
  825. } else {
  826. out := flattenKVPairs(entry.KVPairs)
  827. out = append(out, "timestamp", entry.Timestamp.Format(hclog.TimeFormat))
  828. switch hclog.LevelFromString(entry.Level) {
  829. case hclog.Trace:
  830. l.Trace(entry.Message, out...)
  831. case hclog.Debug:
  832. l.Debug(entry.Message, out...)
  833. case hclog.Info:
  834. l.Info(entry.Message, out...)
  835. case hclog.Warn:
  836. l.Warn(entry.Message, out...)
  837. case hclog.Error:
  838. l.Error(entry.Message, out...)
  839. }
  840. }
  841. }
  842. }