server.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. package plugin
  2. import (
  3. "crypto/tls"
  4. "crypto/x509"
  5. "encoding/base64"
  6. "errors"
  7. "fmt"
  8. "io/ioutil"
  9. "log"
  10. "net"
  11. "os"
  12. "os/signal"
  13. "runtime"
  14. "sort"
  15. "strconv"
  16. "strings"
  17. "sync/atomic"
  18. "github.com/hashicorp/go-hclog"
  19. "google.golang.org/grpc"
  20. )
  21. // CoreProtocolVersion is the ProtocolVersion of the plugin system itself.
  22. // We will increment this whenever we change any protocol behavior. This
  23. // will invalidate any prior plugins but will at least allow us to iterate
  24. // on the core in a safe way. We will do our best to do this very
  25. // infrequently.
  26. const CoreProtocolVersion = 1
  27. // HandshakeConfig is the configuration used by client and servers to
  28. // handshake before starting a plugin connection. This is embedded by
  29. // both ServeConfig and ClientConfig.
  30. //
  31. // In practice, the plugin host creates a HandshakeConfig that is exported
  32. // and plugins then can easily consume it.
  33. type HandshakeConfig struct {
  34. // ProtocolVersion is the version that clients must match on to
  35. // agree they can communicate. This should match the ProtocolVersion
  36. // set on ClientConfig when using a plugin.
  37. // This field is not required if VersionedPlugins are being used in the
  38. // Client or Server configurations.
  39. ProtocolVersion uint
  40. // MagicCookieKey and value are used as a very basic verification
  41. // that a plugin is intended to be launched. This is not a security
  42. // measure, just a UX feature. If the magic cookie doesn't match,
  43. // we show human-friendly output.
  44. MagicCookieKey string
  45. MagicCookieValue string
  46. }
  47. // PluginSet is a set of plugins provided to be registered in the plugin
  48. // server.
  49. type PluginSet map[string]Plugin
  50. // ServeConfig configures what sorts of plugins are served.
  51. type ServeConfig struct {
  52. // HandshakeConfig is the configuration that must match clients.
  53. HandshakeConfig
  54. // TLSProvider is a function that returns a configured tls.Config.
  55. TLSProvider func() (*tls.Config, error)
  56. // Plugins are the plugins that are served.
  57. // The implied version of this PluginSet is the Handshake.ProtocolVersion.
  58. Plugins PluginSet
  59. // VersionedPlugins is a map of PluginSets for specific protocol versions.
  60. // These can be used to negotiate a compatible version between client and
  61. // server. If this is set, Handshake.ProtocolVersion is not required.
  62. VersionedPlugins map[int]PluginSet
  63. // GRPCServer should be non-nil to enable serving the plugins over
  64. // gRPC. This is a function to create the server when needed with the
  65. // given server options. The server options populated by go-plugin will
  66. // be for TLS if set. You may modify the input slice.
  67. //
  68. // Note that the grpc.Server will automatically be registered with
  69. // the gRPC health checking service. This is not optional since go-plugin
  70. // relies on this to implement Ping().
  71. GRPCServer func([]grpc.ServerOption) *grpc.Server
  72. // Logger is used to pass a logger into the server. If none is provided the
  73. // server will create a default logger.
  74. Logger hclog.Logger
  75. }
  76. // protocolVersion determines the protocol version and plugin set to be used by
  77. // the server. In the event that there is no suitable version, the last version
  78. // in the config is returned leaving the client to report the incompatibility.
  79. func protocolVersion(opts *ServeConfig) (int, Protocol, PluginSet) {
  80. protoVersion := int(opts.ProtocolVersion)
  81. pluginSet := opts.Plugins
  82. protoType := ProtocolNetRPC
  83. // Check if the client sent a list of acceptable versions
  84. var clientVersions []int
  85. if vs := os.Getenv("PLUGIN_PROTOCOL_VERSIONS"); vs != "" {
  86. for _, s := range strings.Split(vs, ",") {
  87. v, err := strconv.Atoi(s)
  88. if err != nil {
  89. fmt.Fprintf(os.Stderr, "server sent invalid plugin version %q", s)
  90. continue
  91. }
  92. clientVersions = append(clientVersions, v)
  93. }
  94. }
  95. // We want to iterate in reverse order, to ensure we match the newest
  96. // compatible plugin version.
  97. sort.Sort(sort.Reverse(sort.IntSlice(clientVersions)))
  98. // set the old un-versioned fields as if they were versioned plugins
  99. if opts.VersionedPlugins == nil {
  100. opts.VersionedPlugins = make(map[int]PluginSet)
  101. }
  102. if pluginSet != nil {
  103. opts.VersionedPlugins[protoVersion] = pluginSet
  104. }
  105. // Sort the version to make sure we match the latest first
  106. var versions []int
  107. for v := range opts.VersionedPlugins {
  108. versions = append(versions, v)
  109. }
  110. sort.Sort(sort.Reverse(sort.IntSlice(versions)))
  111. // See if we have multiple versions of Plugins to choose from
  112. for _, version := range versions {
  113. // Record each version, since we guarantee that this returns valid
  114. // values even if they are not a protocol match.
  115. protoVersion = version
  116. pluginSet = opts.VersionedPlugins[version]
  117. // If we have a configured gRPC server we should select a protocol
  118. if opts.GRPCServer != nil {
  119. // All plugins in a set must use the same transport, so check the first
  120. // for the protocol type
  121. for _, p := range pluginSet {
  122. switch p.(type) {
  123. case GRPCPlugin:
  124. protoType = ProtocolGRPC
  125. default:
  126. protoType = ProtocolNetRPC
  127. }
  128. break
  129. }
  130. }
  131. for _, clientVersion := range clientVersions {
  132. if clientVersion == protoVersion {
  133. return protoVersion, protoType, pluginSet
  134. }
  135. }
  136. }
  137. // Return the lowest version as the fallback.
  138. // Since we iterated over all the versions in reverse order above, these
  139. // values are from the lowest version number plugins (which may be from
  140. // a combination of the Handshake.ProtocolVersion and ServeConfig.Plugins
  141. // fields). This allows serving the oldest version of our plugins to a
  142. // legacy client that did not send a PLUGIN_PROTOCOL_VERSIONS list.
  143. return protoVersion, protoType, pluginSet
  144. }
  145. // Serve serves the plugins given by ServeConfig.
  146. //
  147. // Serve doesn't return until the plugin is done being executed. Any
  148. // errors will be outputted to os.Stderr.
  149. //
  150. // This is the method that plugins should call in their main() functions.
  151. func Serve(opts *ServeConfig) {
  152. // Validate the handshake config
  153. if opts.MagicCookieKey == "" || opts.MagicCookieValue == "" {
  154. fmt.Fprintf(os.Stderr,
  155. "Misconfigured ServeConfig given to serve this plugin: no magic cookie\n"+
  156. "key or value was set. Please notify the plugin author and report\n"+
  157. "this as a bug.\n")
  158. os.Exit(1)
  159. }
  160. // First check the cookie
  161. if os.Getenv(opts.MagicCookieKey) != opts.MagicCookieValue {
  162. fmt.Fprintf(os.Stderr,
  163. "This binary is a plugin. These are not meant to be executed directly.\n"+
  164. "Please execute the program that consumes these plugins, which will\n"+
  165. "load any plugins automatically\n")
  166. os.Exit(1)
  167. }
  168. // negotiate the version and plugins
  169. // start with default version in the handshake config
  170. protoVersion, protoType, pluginSet := protocolVersion(opts)
  171. // Logging goes to the original stderr
  172. log.SetOutput(os.Stderr)
  173. logger := opts.Logger
  174. if logger == nil {
  175. // internal logger to os.Stderr
  176. logger = hclog.New(&hclog.LoggerOptions{
  177. Level: hclog.Trace,
  178. Output: os.Stderr,
  179. JSONFormat: true,
  180. })
  181. }
  182. // Create our new stdout, stderr files. These will override our built-in
  183. // stdout/stderr so that it works across the stream boundary.
  184. stdout_r, stdout_w, err := os.Pipe()
  185. if err != nil {
  186. fmt.Fprintf(os.Stderr, "Error preparing plugin: %s\n", err)
  187. os.Exit(1)
  188. }
  189. stderr_r, stderr_w, err := os.Pipe()
  190. if err != nil {
  191. fmt.Fprintf(os.Stderr, "Error preparing plugin: %s\n", err)
  192. os.Exit(1)
  193. }
  194. // Register a listener so we can accept a connection
  195. listener, err := serverListener()
  196. if err != nil {
  197. logger.Error("plugin init error", "error", err)
  198. return
  199. }
  200. // Close the listener on return. We wrap this in a func() on purpose
  201. // because the "listener" reference may change to TLS.
  202. defer func() {
  203. listener.Close()
  204. }()
  205. var tlsConfig *tls.Config
  206. if opts.TLSProvider != nil {
  207. tlsConfig, err = opts.TLSProvider()
  208. if err != nil {
  209. logger.Error("plugin tls init", "error", err)
  210. return
  211. }
  212. }
  213. var serverCert string
  214. clientCert := os.Getenv("PLUGIN_CLIENT_CERT")
  215. // If the client is configured using AutoMTLS, the certificate will be here,
  216. // and we need to generate our own in response.
  217. if tlsConfig == nil && clientCert != "" {
  218. logger.Info("configuring server automatic mTLS")
  219. clientCertPool := x509.NewCertPool()
  220. if !clientCertPool.AppendCertsFromPEM([]byte(clientCert)) {
  221. logger.Error("client cert provided but failed to parse", "cert", clientCert)
  222. }
  223. certPEM, keyPEM, err := generateCert()
  224. if err != nil {
  225. logger.Error("failed to generate client certificate", "error", err)
  226. panic(err)
  227. }
  228. cert, err := tls.X509KeyPair(certPEM, keyPEM)
  229. if err != nil {
  230. logger.Error("failed to parse client certificate", "error", err)
  231. panic(err)
  232. }
  233. tlsConfig = &tls.Config{
  234. Certificates: []tls.Certificate{cert},
  235. ClientAuth: tls.RequireAndVerifyClientCert,
  236. ClientCAs: clientCertPool,
  237. MinVersion: tls.VersionTLS12,
  238. }
  239. // We send back the raw leaf cert data for the client rather than the
  240. // PEM, since the protocol can't handle newlines.
  241. serverCert = base64.RawStdEncoding.EncodeToString(cert.Certificate[0])
  242. }
  243. // Create the channel to tell us when we're done
  244. doneCh := make(chan struct{})
  245. // Build the server type
  246. var server ServerProtocol
  247. switch protoType {
  248. case ProtocolNetRPC:
  249. // If we have a TLS configuration then we wrap the listener
  250. // ourselves and do it at that level.
  251. if tlsConfig != nil {
  252. listener = tls.NewListener(listener, tlsConfig)
  253. }
  254. // Create the RPC server to dispense
  255. server = &RPCServer{
  256. Plugins: pluginSet,
  257. Stdout: stdout_r,
  258. Stderr: stderr_r,
  259. DoneCh: doneCh,
  260. }
  261. case ProtocolGRPC:
  262. // Create the gRPC server
  263. server = &GRPCServer{
  264. Plugins: pluginSet,
  265. Server: opts.GRPCServer,
  266. TLS: tlsConfig,
  267. Stdout: stdout_r,
  268. Stderr: stderr_r,
  269. DoneCh: doneCh,
  270. logger: logger,
  271. }
  272. default:
  273. panic("unknown server protocol: " + protoType)
  274. }
  275. // Initialize the servers
  276. if err := server.Init(); err != nil {
  277. logger.Error("protocol init", "error", err)
  278. return
  279. }
  280. logger.Debug("plugin address", "network", listener.Addr().Network(), "address", listener.Addr().String())
  281. // Output the address and service name to stdout so that the client can bring it up.
  282. fmt.Printf("%d|%d|%s|%s|%s|%s\n",
  283. CoreProtocolVersion,
  284. protoVersion,
  285. listener.Addr().Network(),
  286. listener.Addr().String(),
  287. protoType,
  288. serverCert)
  289. os.Stdout.Sync()
  290. // Eat the interrupts
  291. ch := make(chan os.Signal, 1)
  292. signal.Notify(ch, os.Interrupt)
  293. go func() {
  294. var count int32 = 0
  295. for {
  296. <-ch
  297. newCount := atomic.AddInt32(&count, 1)
  298. logger.Debug("plugin received interrupt signal, ignoring", "count", newCount)
  299. }
  300. }()
  301. // Set our new out, err
  302. os.Stdout = stdout_w
  303. os.Stderr = stderr_w
  304. // Accept connections and wait for completion
  305. go server.Serve(listener)
  306. <-doneCh
  307. }
  308. func serverListener() (net.Listener, error) {
  309. if runtime.GOOS == "windows" {
  310. return serverListener_tcp()
  311. }
  312. return serverListener_unix()
  313. }
  314. func serverListener_tcp() (net.Listener, error) {
  315. minPort, err := strconv.ParseInt(os.Getenv("PLUGIN_MIN_PORT"), 10, 32)
  316. if err != nil {
  317. return nil, err
  318. }
  319. maxPort, err := strconv.ParseInt(os.Getenv("PLUGIN_MAX_PORT"), 10, 32)
  320. if err != nil {
  321. return nil, err
  322. }
  323. for port := minPort; port <= maxPort; port++ {
  324. address := fmt.Sprintf("127.0.0.1:%d", port)
  325. listener, err := net.Listen("tcp", address)
  326. if err == nil {
  327. return listener, nil
  328. }
  329. }
  330. return nil, errors.New("Couldn't bind plugin TCP listener")
  331. }
  332. func serverListener_unix() (net.Listener, error) {
  333. tf, err := ioutil.TempFile("", "plugin")
  334. if err != nil {
  335. return nil, err
  336. }
  337. path := tf.Name()
  338. // Close the file and remove it because it has to not exist for
  339. // the domain socket.
  340. if err := tf.Close(); err != nil {
  341. return nil, err
  342. }
  343. if err := os.Remove(path); err != nil {
  344. return nil, err
  345. }
  346. l, err := net.Listen("unix", path)
  347. if err != nil {
  348. return nil, err
  349. }
  350. // Wrap the listener in rmListener so that the Unix domain socket file
  351. // is removed on close.
  352. return &rmListener{
  353. Listener: l,
  354. Path: path,
  355. }, nil
  356. }
  357. // rmListener is an implementation of net.Listener that forwards most
  358. // calls to the listener but also removes a file as part of the close. We
  359. // use this to cleanup the unix domain socket on close.
  360. type rmListener struct {
  361. net.Listener
  362. Path string
  363. }
  364. func (l *rmListener) Close() error {
  365. // Close the listener itself
  366. if err := l.Listener.Close(); err != nil {
  367. return err
  368. }
  369. // Remove the file
  370. return os.Remove(l.Path)
  371. }