client.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792
  1. package plugin
  2. import (
  3. "bufio"
  4. "context"
  5. "crypto/subtle"
  6. "crypto/tls"
  7. "errors"
  8. "fmt"
  9. "hash"
  10. "io"
  11. "io/ioutil"
  12. "net"
  13. "os"
  14. "os/exec"
  15. "path/filepath"
  16. "strconv"
  17. "strings"
  18. "sync"
  19. "sync/atomic"
  20. "time"
  21. "unicode"
  22. hclog "github.com/hashicorp/go-hclog"
  23. )
  24. // If this is 1, then we've called CleanupClients. This can be used
  25. // by plugin RPC implementations to change error behavior since you
  26. // can expected network connection errors at this point. This should be
  27. // read by using sync/atomic.
  28. var Killed uint32 = 0
  29. // This is a slice of the "managed" clients which are cleaned up when
  30. // calling Cleanup
  31. var managedClients = make([]*Client, 0, 5)
  32. var managedClientsLock sync.Mutex
  33. // Error types
  34. var (
  35. // ErrProcessNotFound is returned when a client is instantiated to
  36. // reattach to an existing process and it isn't found.
  37. ErrProcessNotFound = errors.New("Reattachment process not found")
  38. // ErrChecksumsDoNotMatch is returned when binary's checksum doesn't match
  39. // the one provided in the SecureConfig.
  40. ErrChecksumsDoNotMatch = errors.New("checksums did not match")
  41. // ErrSecureNoChecksum is returned when an empty checksum is provided to the
  42. // SecureConfig.
  43. ErrSecureConfigNoChecksum = errors.New("no checksum provided")
  44. // ErrSecureNoHash is returned when a nil Hash object is provided to the
  45. // SecureConfig.
  46. ErrSecureConfigNoHash = errors.New("no hash implementation provided")
  47. // ErrSecureConfigAndReattach is returned when both Reattach and
  48. // SecureConfig are set.
  49. ErrSecureConfigAndReattach = errors.New("only one of Reattach or SecureConfig can be set")
  50. )
  51. // Client handles the lifecycle of a plugin application. It launches
  52. // plugins, connects to them, dispenses interface implementations, and handles
  53. // killing the process.
  54. //
  55. // Plugin hosts should use one Client for each plugin executable. To
  56. // dispense a plugin type, use the `Client.Client` function, and then
  57. // cal `Dispense`. This awkward API is mostly historical but is used to split
  58. // the client that deals with subprocess management and the client that
  59. // does RPC management.
  60. //
  61. // See NewClient and ClientConfig for using a Client.
  62. type Client struct {
  63. config *ClientConfig
  64. exited bool
  65. doneLogging chan struct{}
  66. l sync.Mutex
  67. address net.Addr
  68. process *os.Process
  69. client ClientProtocol
  70. protocol Protocol
  71. logger hclog.Logger
  72. doneCtx context.Context
  73. }
  74. // ClientConfig is the configuration used to initialize a new
  75. // plugin client. After being used to initialize a plugin client,
  76. // that configuration must not be modified again.
  77. type ClientConfig struct {
  78. // HandshakeConfig is the configuration that must match servers.
  79. HandshakeConfig
  80. // Plugins are the plugins that can be consumed.
  81. Plugins map[string]Plugin
  82. // One of the following must be set, but not both.
  83. //
  84. // Cmd is the unstarted subprocess for starting the plugin. If this is
  85. // set, then the Client starts the plugin process on its own and connects
  86. // to it.
  87. //
  88. // Reattach is configuration for reattaching to an existing plugin process
  89. // that is already running. This isn't common.
  90. Cmd *exec.Cmd
  91. Reattach *ReattachConfig
  92. // SecureConfig is configuration for verifying the integrity of the
  93. // executable. It can not be used with Reattach.
  94. SecureConfig *SecureConfig
  95. // TLSConfig is used to enable TLS on the RPC client.
  96. TLSConfig *tls.Config
  97. // Managed represents if the client should be managed by the
  98. // plugin package or not. If true, then by calling CleanupClients,
  99. // it will automatically be cleaned up. Otherwise, the client
  100. // user is fully responsible for making sure to Kill all plugin
  101. // clients. By default the client is _not_ managed.
  102. Managed bool
  103. // The minimum and maximum port to use for communicating with
  104. // the subprocess. If not set, this defaults to 10,000 and 25,000
  105. // respectively.
  106. MinPort, MaxPort uint
  107. // StartTimeout is the timeout to wait for the plugin to say it
  108. // has started successfully.
  109. StartTimeout time.Duration
  110. // If non-nil, then the stderr of the client will be written to here
  111. // (as well as the log). This is the original os.Stderr of the subprocess.
  112. // This isn't the output of synced stderr.
  113. Stderr io.Writer
  114. // SyncStdout, SyncStderr can be set to override the
  115. // respective os.Std* values in the plugin. Care should be taken to
  116. // avoid races here. If these are nil, then this will automatically be
  117. // hooked up to os.Stdin, Stdout, and Stderr, respectively.
  118. //
  119. // If the default values (nil) are used, then this package will not
  120. // sync any of these streams.
  121. SyncStdout io.Writer
  122. SyncStderr io.Writer
  123. // AllowedProtocols is a list of allowed protocols. If this isn't set,
  124. // then only netrpc is allowed. This is so that older go-plugin systems
  125. // can show friendly errors if they see a plugin with an unknown
  126. // protocol.
  127. //
  128. // By setting this, you can cause an error immediately on plugin start
  129. // if an unsupported protocol is used with a good error message.
  130. //
  131. // If this isn't set at all (nil value), then only net/rpc is accepted.
  132. // This is done for legacy reasons. You must explicitly opt-in to
  133. // new protocols.
  134. AllowedProtocols []Protocol
  135. // Logger is the logger that the client will used. If none is provided,
  136. // it will default to hclog's default logger.
  137. Logger hclog.Logger
  138. }
  139. // ReattachConfig is used to configure a client to reattach to an
  140. // already-running plugin process. You can retrieve this information by
  141. // calling ReattachConfig on Client.
  142. type ReattachConfig struct {
  143. Protocol Protocol
  144. Addr net.Addr
  145. Pid int
  146. }
  147. // SecureConfig is used to configure a client to verify the integrity of an
  148. // executable before running. It does this by verifying the checksum is
  149. // expected. Hash is used to specify the hashing method to use when checksumming
  150. // the file. The configuration is verified by the client by calling the
  151. // SecureConfig.Check() function.
  152. //
  153. // The host process should ensure the checksum was provided by a trusted and
  154. // authoritative source. The binary should be installed in such a way that it
  155. // can not be modified by an unauthorized user between the time of this check
  156. // and the time of execution.
  157. type SecureConfig struct {
  158. Checksum []byte
  159. Hash hash.Hash
  160. }
  161. // Check takes the filepath to an executable and returns true if the checksum of
  162. // the file matches the checksum provided in the SecureConfig.
  163. func (s *SecureConfig) Check(filePath string) (bool, error) {
  164. if len(s.Checksum) == 0 {
  165. return false, ErrSecureConfigNoChecksum
  166. }
  167. if s.Hash == nil {
  168. return false, ErrSecureConfigNoHash
  169. }
  170. file, err := os.Open(filePath)
  171. if err != nil {
  172. return false, err
  173. }
  174. defer file.Close()
  175. _, err = io.Copy(s.Hash, file)
  176. if err != nil {
  177. return false, err
  178. }
  179. sum := s.Hash.Sum(nil)
  180. return subtle.ConstantTimeCompare(sum, s.Checksum) == 1, nil
  181. }
  182. // This makes sure all the managed subprocesses are killed and properly
  183. // logged. This should be called before the parent process running the
  184. // plugins exits.
  185. //
  186. // This must only be called _once_.
  187. func CleanupClients() {
  188. // Set the killed to true so that we don't get unexpected panics
  189. atomic.StoreUint32(&Killed, 1)
  190. // Kill all the managed clients in parallel and use a WaitGroup
  191. // to wait for them all to finish up.
  192. var wg sync.WaitGroup
  193. managedClientsLock.Lock()
  194. for _, client := range managedClients {
  195. wg.Add(1)
  196. go func(client *Client) {
  197. client.Kill()
  198. wg.Done()
  199. }(client)
  200. }
  201. managedClientsLock.Unlock()
  202. wg.Wait()
  203. }
  204. // Creates a new plugin client which manages the lifecycle of an external
  205. // plugin and gets the address for the RPC connection.
  206. //
  207. // The client must be cleaned up at some point by calling Kill(). If
  208. // the client is a managed client (created with NewManagedClient) you
  209. // can just call CleanupClients at the end of your program and they will
  210. // be properly cleaned.
  211. func NewClient(config *ClientConfig) (c *Client) {
  212. if config.MinPort == 0 && config.MaxPort == 0 {
  213. config.MinPort = 10000
  214. config.MaxPort = 25000
  215. }
  216. if config.StartTimeout == 0 {
  217. config.StartTimeout = 1 * time.Minute
  218. }
  219. if config.Stderr == nil {
  220. config.Stderr = ioutil.Discard
  221. }
  222. if config.SyncStdout == nil {
  223. config.SyncStdout = ioutil.Discard
  224. }
  225. if config.SyncStderr == nil {
  226. config.SyncStderr = ioutil.Discard
  227. }
  228. if config.AllowedProtocols == nil {
  229. config.AllowedProtocols = []Protocol{ProtocolNetRPC}
  230. }
  231. if config.Logger == nil {
  232. config.Logger = hclog.New(&hclog.LoggerOptions{
  233. Output: hclog.DefaultOutput,
  234. Level: hclog.Trace,
  235. Name: "plugin",
  236. })
  237. }
  238. c = &Client{
  239. config: config,
  240. logger: config.Logger,
  241. }
  242. if config.Managed {
  243. managedClientsLock.Lock()
  244. managedClients = append(managedClients, c)
  245. managedClientsLock.Unlock()
  246. }
  247. return
  248. }
  249. // Client returns the protocol client for this connection.
  250. //
  251. // Subsequent calls to this will return the same client.
  252. func (c *Client) Client() (ClientProtocol, error) {
  253. _, err := c.Start()
  254. if err != nil {
  255. return nil, err
  256. }
  257. c.l.Lock()
  258. defer c.l.Unlock()
  259. if c.client != nil {
  260. return c.client, nil
  261. }
  262. switch c.protocol {
  263. case ProtocolNetRPC:
  264. c.client, err = newRPCClient(c)
  265. case ProtocolGRPC:
  266. c.client, err = newGRPCClient(c.doneCtx, c)
  267. default:
  268. return nil, fmt.Errorf("unknown server protocol: %s", c.protocol)
  269. }
  270. if err != nil {
  271. c.client = nil
  272. return nil, err
  273. }
  274. return c.client, nil
  275. }
  276. // Tells whether or not the underlying process has exited.
  277. func (c *Client) Exited() bool {
  278. c.l.Lock()
  279. defer c.l.Unlock()
  280. return c.exited
  281. }
  282. // End the executing subprocess (if it is running) and perform any cleanup
  283. // tasks necessary such as capturing any remaining logs and so on.
  284. //
  285. // This method blocks until the process successfully exits.
  286. //
  287. // This method can safely be called multiple times.
  288. func (c *Client) Kill() {
  289. // Grab a lock to read some private fields.
  290. c.l.Lock()
  291. process := c.process
  292. addr := c.address
  293. doneCh := c.doneLogging
  294. c.l.Unlock()
  295. // If there is no process, we never started anything. Nothing to kill.
  296. if process == nil {
  297. return
  298. }
  299. // We need to check for address here. It is possible that the plugin
  300. // started (process != nil) but has no address (addr == nil) if the
  301. // plugin failed at startup. If we do have an address, we need to close
  302. // the plugin net connections.
  303. graceful := false
  304. if addr != nil {
  305. // Close the client to cleanly exit the process.
  306. client, err := c.Client()
  307. if err == nil {
  308. err = client.Close()
  309. // If there is no error, then we attempt to wait for a graceful
  310. // exit. If there was an error, we assume that graceful cleanup
  311. // won't happen and just force kill.
  312. graceful = err == nil
  313. if err != nil {
  314. // If there was an error just log it. We're going to force
  315. // kill in a moment anyways.
  316. c.logger.Warn("error closing client during Kill", "err", err)
  317. }
  318. }
  319. }
  320. // If we're attempting a graceful exit, then we wait for a short period
  321. // of time to allow that to happen. To wait for this we just wait on the
  322. // doneCh which would be closed if the process exits.
  323. if graceful {
  324. select {
  325. case <-doneCh:
  326. return
  327. case <-time.After(250 * time.Millisecond):
  328. }
  329. }
  330. // If graceful exiting failed, just kill it
  331. process.Kill()
  332. // Wait for the client to finish logging so we have a complete log
  333. <-doneCh
  334. }
  335. // Starts the underlying subprocess, communicating with it to negotiate
  336. // a port for RPC connections, and returning the address to connect via RPC.
  337. //
  338. // This method is safe to call multiple times. Subsequent calls have no effect.
  339. // Once a client has been started once, it cannot be started again, even if
  340. // it was killed.
  341. func (c *Client) Start() (addr net.Addr, err error) {
  342. c.l.Lock()
  343. defer c.l.Unlock()
  344. if c.address != nil {
  345. return c.address, nil
  346. }
  347. // If one of cmd or reattach isn't set, then it is an error. We wrap
  348. // this in a {} for scoping reasons, and hopeful that the escape
  349. // analysis will pop the stock here.
  350. {
  351. cmdSet := c.config.Cmd != nil
  352. attachSet := c.config.Reattach != nil
  353. secureSet := c.config.SecureConfig != nil
  354. if cmdSet == attachSet {
  355. return nil, fmt.Errorf("Only one of Cmd or Reattach must be set")
  356. }
  357. if secureSet && attachSet {
  358. return nil, ErrSecureConfigAndReattach
  359. }
  360. }
  361. // Create the logging channel for when we kill
  362. c.doneLogging = make(chan struct{})
  363. // Create a context for when we kill
  364. var ctxCancel context.CancelFunc
  365. c.doneCtx, ctxCancel = context.WithCancel(context.Background())
  366. if c.config.Reattach != nil {
  367. // Verify the process still exists. If not, then it is an error
  368. p, err := os.FindProcess(c.config.Reattach.Pid)
  369. if err != nil {
  370. return nil, err
  371. }
  372. // Attempt to connect to the addr since on Unix systems FindProcess
  373. // doesn't actually return an error if it can't find the process.
  374. conn, err := net.Dial(
  375. c.config.Reattach.Addr.Network(),
  376. c.config.Reattach.Addr.String())
  377. if err != nil {
  378. p.Kill()
  379. return nil, ErrProcessNotFound
  380. }
  381. conn.Close()
  382. // Goroutine to mark exit status
  383. go func(pid int) {
  384. // Wait for the process to die
  385. pidWait(pid)
  386. // Log so we can see it
  387. c.logger.Debug("reattached plugin process exited")
  388. // Mark it
  389. c.l.Lock()
  390. defer c.l.Unlock()
  391. c.exited = true
  392. // Close the logging channel since that doesn't work on reattach
  393. close(c.doneLogging)
  394. // Cancel the context
  395. ctxCancel()
  396. }(p.Pid)
  397. // Set the address and process
  398. c.address = c.config.Reattach.Addr
  399. c.process = p
  400. c.protocol = c.config.Reattach.Protocol
  401. if c.protocol == "" {
  402. // Default the protocol to net/rpc for backwards compatibility
  403. c.protocol = ProtocolNetRPC
  404. }
  405. return c.address, nil
  406. }
  407. env := []string{
  408. fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue),
  409. fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort),
  410. fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort),
  411. }
  412. stdout_r, stdout_w := io.Pipe()
  413. stderr_r, stderr_w := io.Pipe()
  414. cmd := c.config.Cmd
  415. cmd.Env = append(cmd.Env, os.Environ()...)
  416. cmd.Env = append(cmd.Env, env...)
  417. cmd.Stdin = os.Stdin
  418. cmd.Stderr = stderr_w
  419. cmd.Stdout = stdout_w
  420. if c.config.SecureConfig != nil {
  421. if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil {
  422. return nil, fmt.Errorf("error verifying checksum: %s", err)
  423. } else if !ok {
  424. return nil, ErrChecksumsDoNotMatch
  425. }
  426. }
  427. c.logger.Debug("starting plugin", "path", cmd.Path, "args", cmd.Args)
  428. err = cmd.Start()
  429. if err != nil {
  430. return
  431. }
  432. // Set the process
  433. c.process = cmd.Process
  434. // Make sure the command is properly cleaned up if there is an error
  435. defer func() {
  436. r := recover()
  437. if err != nil || r != nil {
  438. cmd.Process.Kill()
  439. }
  440. if r != nil {
  441. panic(r)
  442. }
  443. }()
  444. // Start goroutine to wait for process to exit
  445. exitCh := make(chan struct{})
  446. go func() {
  447. // Make sure we close the write end of our stderr/stdout so
  448. // that the readers send EOF properly.
  449. defer stderr_w.Close()
  450. defer stdout_w.Close()
  451. // Wait for the command to end.
  452. cmd.Wait()
  453. // Log and make sure to flush the logs write away
  454. c.logger.Debug("plugin process exited", "path", cmd.Path)
  455. os.Stderr.Sync()
  456. // Mark that we exited
  457. close(exitCh)
  458. // Cancel the context, marking that we exited
  459. ctxCancel()
  460. // Set that we exited, which takes a lock
  461. c.l.Lock()
  462. defer c.l.Unlock()
  463. c.exited = true
  464. }()
  465. // Start goroutine that logs the stderr
  466. go c.logStderr(stderr_r)
  467. // Start a goroutine that is going to be reading the lines
  468. // out of stdout
  469. linesCh := make(chan []byte)
  470. go func() {
  471. defer close(linesCh)
  472. buf := bufio.NewReader(stdout_r)
  473. for {
  474. line, err := buf.ReadBytes('\n')
  475. if line != nil {
  476. linesCh <- line
  477. }
  478. if err == io.EOF {
  479. return
  480. }
  481. }
  482. }()
  483. // Make sure after we exit we read the lines from stdout forever
  484. // so they don't block since it is an io.Pipe
  485. defer func() {
  486. go func() {
  487. for _ = range linesCh {
  488. }
  489. }()
  490. }()
  491. // Some channels for the next step
  492. timeout := time.After(c.config.StartTimeout)
  493. // Start looking for the address
  494. c.logger.Debug("waiting for RPC address", "path", cmd.Path)
  495. select {
  496. case <-timeout:
  497. err = errors.New("timeout while waiting for plugin to start")
  498. case <-exitCh:
  499. err = errors.New("plugin exited before we could connect")
  500. case lineBytes := <-linesCh:
  501. // Trim the line and split by "|" in order to get the parts of
  502. // the output.
  503. line := strings.TrimSpace(string(lineBytes))
  504. parts := strings.SplitN(line, "|", 6)
  505. if len(parts) < 4 {
  506. err = fmt.Errorf(
  507. "Unrecognized remote plugin message: %s\n\n"+
  508. "This usually means that the plugin is either invalid or simply\n"+
  509. "needs to be recompiled to support the latest protocol.", line)
  510. return
  511. }
  512. // Check the core protocol. Wrapped in a {} for scoping.
  513. {
  514. var coreProtocol int64
  515. coreProtocol, err = strconv.ParseInt(parts[0], 10, 0)
  516. if err != nil {
  517. err = fmt.Errorf("Error parsing core protocol version: %s", err)
  518. return
  519. }
  520. if int(coreProtocol) != CoreProtocolVersion {
  521. err = fmt.Errorf("Incompatible core API version with plugin. "+
  522. "Plugin version: %s, Core version: %d\n\n"+
  523. "To fix this, the plugin usually only needs to be recompiled.\n"+
  524. "Please report this to the plugin author.", parts[0], CoreProtocolVersion)
  525. return
  526. }
  527. }
  528. // Parse the protocol version
  529. var protocol int64
  530. protocol, err = strconv.ParseInt(parts[1], 10, 0)
  531. if err != nil {
  532. err = fmt.Errorf("Error parsing protocol version: %s", err)
  533. return
  534. }
  535. // Test the API version
  536. if uint(protocol) != c.config.ProtocolVersion {
  537. err = fmt.Errorf("Incompatible API version with plugin. "+
  538. "Plugin version: %s, Core version: %d", parts[1], c.config.ProtocolVersion)
  539. return
  540. }
  541. switch parts[2] {
  542. case "tcp":
  543. addr, err = net.ResolveTCPAddr("tcp", parts[3])
  544. case "unix":
  545. addr, err = net.ResolveUnixAddr("unix", parts[3])
  546. default:
  547. err = fmt.Errorf("Unknown address type: %s", parts[3])
  548. }
  549. // If we have a server type, then record that. We default to net/rpc
  550. // for backwards compatibility.
  551. c.protocol = ProtocolNetRPC
  552. if len(parts) >= 5 {
  553. c.protocol = Protocol(parts[4])
  554. }
  555. found := false
  556. for _, p := range c.config.AllowedProtocols {
  557. if p == c.protocol {
  558. found = true
  559. break
  560. }
  561. }
  562. if !found {
  563. err = fmt.Errorf("Unsupported plugin protocol %q. Supported: %v",
  564. c.protocol, c.config.AllowedProtocols)
  565. return
  566. }
  567. }
  568. c.address = addr
  569. return
  570. }
  571. // ReattachConfig returns the information that must be provided to NewClient
  572. // to reattach to the plugin process that this client started. This is
  573. // useful for plugins that detach from their parent process.
  574. //
  575. // If this returns nil then the process hasn't been started yet. Please
  576. // call Start or Client before calling this.
  577. func (c *Client) ReattachConfig() *ReattachConfig {
  578. c.l.Lock()
  579. defer c.l.Unlock()
  580. if c.address == nil {
  581. return nil
  582. }
  583. if c.config.Cmd != nil && c.config.Cmd.Process == nil {
  584. return nil
  585. }
  586. // If we connected via reattach, just return the information as-is
  587. if c.config.Reattach != nil {
  588. return c.config.Reattach
  589. }
  590. return &ReattachConfig{
  591. Protocol: c.protocol,
  592. Addr: c.address,
  593. Pid: c.config.Cmd.Process.Pid,
  594. }
  595. }
  596. // Protocol returns the protocol of server on the remote end. This will
  597. // start the plugin process if it isn't already started. Errors from
  598. // starting the plugin are surpressed and ProtocolInvalid is returned. It
  599. // is recommended you call Start explicitly before calling Protocol to ensure
  600. // no errors occur.
  601. func (c *Client) Protocol() Protocol {
  602. _, err := c.Start()
  603. if err != nil {
  604. return ProtocolInvalid
  605. }
  606. return c.protocol
  607. }
  608. func netAddrDialer(addr net.Addr) func(string, time.Duration) (net.Conn, error) {
  609. return func(_ string, _ time.Duration) (net.Conn, error) {
  610. // Connect to the client
  611. conn, err := net.Dial(addr.Network(), addr.String())
  612. if err != nil {
  613. return nil, err
  614. }
  615. if tcpConn, ok := conn.(*net.TCPConn); ok {
  616. // Make sure to set keep alive so that the connection doesn't die
  617. tcpConn.SetKeepAlive(true)
  618. }
  619. return conn, nil
  620. }
  621. }
  622. // dialer is compatible with grpc.WithDialer and creates the connection
  623. // to the plugin.
  624. func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) {
  625. conn, err := netAddrDialer(c.address)("", timeout)
  626. if err != nil {
  627. return nil, err
  628. }
  629. // If we have a TLS config we wrap our connection. We only do this
  630. // for net/rpc since gRPC uses its own mechanism for TLS.
  631. if c.protocol == ProtocolNetRPC && c.config.TLSConfig != nil {
  632. conn = tls.Client(conn, c.config.TLSConfig)
  633. }
  634. return conn, nil
  635. }
  636. func (c *Client) logStderr(r io.Reader) {
  637. bufR := bufio.NewReader(r)
  638. l := c.logger.Named(filepath.Base(c.config.Cmd.Path))
  639. for {
  640. line, err := bufR.ReadString('\n')
  641. if line != "" {
  642. c.config.Stderr.Write([]byte(line))
  643. line = strings.TrimRightFunc(line, unicode.IsSpace)
  644. entry, err := parseJSON(line)
  645. // If output is not JSON format, print directly to Debug
  646. if err != nil {
  647. l.Debug(line)
  648. } else {
  649. out := flattenKVPairs(entry.KVPairs)
  650. out = append(out, "timestamp", entry.Timestamp.Format(hclog.TimeFormat))
  651. switch hclog.LevelFromString(entry.Level) {
  652. case hclog.Trace:
  653. l.Trace(entry.Message, out...)
  654. case hclog.Debug:
  655. l.Debug(entry.Message, out...)
  656. case hclog.Info:
  657. l.Info(entry.Message, out...)
  658. case hclog.Warn:
  659. l.Warn(entry.Message, out...)
  660. case hclog.Error:
  661. l.Error(entry.Message, out...)
  662. }
  663. }
  664. }
  665. if err == io.EOF {
  666. break
  667. }
  668. }
  669. // Flag that we've completed logging for others
  670. close(c.doneLogging)
  671. }