group.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. // Package run implements an actor-runner with deterministic teardown. It is
  2. // somewhat similar to package errgroup, except it does not require actor
  3. // goroutines to understand context semantics. This makes it suitable for use in
  4. // more circumstances; for example, goroutines which are handling connections
  5. // from net.Listeners, or scanning input from a closable io.Reader.
  6. package run
  7. // Group collects actors (functions) and runs them concurrently.
  8. // When one actor (function) returns, all actors are interrupted.
  9. // The zero value of a Group is useful.
  10. type Group struct {
  11. actors []actor
  12. }
  13. // Add an actor (function) to the group. Each actor must be pre-emptable by an
  14. // interrupt function. That is, if interrupt is invoked, execute should return.
  15. // Also, it must be safe to call interrupt even after execute has returned.
  16. //
  17. // The first actor (function) to return interrupts all running actors.
  18. // The error is passed to the interrupt functions, and is returned by Run.
  19. func (g *Group) Add(execute func() error, interrupt func(error)) {
  20. g.actors = append(g.actors, actor{execute, interrupt})
  21. }
  22. // Run all actors (functions) concurrently.
  23. // When the first actor returns, all others are interrupted.
  24. // Run only returns when all actors have exited.
  25. // Run returns the error returned by the first exiting actor.
  26. func (g *Group) Run() error {
  27. if len(g.actors) == 0 {
  28. return nil
  29. }
  30. // Run each actor.
  31. errors := make(chan error, len(g.actors))
  32. for _, a := range g.actors {
  33. go func(a actor) {
  34. errors <- a.execute()
  35. }(a)
  36. }
  37. // Wait for the first actor to stop.
  38. err := <-errors
  39. // Signal all actors to stop.
  40. for _, a := range g.actors {
  41. a.interrupt(err)
  42. }
  43. // Wait for all actors to stop.
  44. for i := 1; i < cap(errors); i++ {
  45. <-errors
  46. }
  47. // Return the original error.
  48. return err
  49. }
  50. type actor struct {
  51. execute func() error
  52. interrupt func(error)
  53. }