ticker.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. package alerting
  2. import (
  3. "time"
  4. "github.com/benbjohnson/clock"
  5. )
  6. // ticker is a ticker to power the alerting scheduler. it's like a time.Ticker, except:
  7. // * it doesn't drop ticks for slow receivers, rather, it queues up. so that callers are in control to instrument what's going on.
  8. // * it automatically ticks every second, which is the right thing in our current design
  9. // * it ticks on second marks or very shortly after. this provides a predictable load pattern
  10. // (this shouldn't cause too much load contention issues because the next steps in the pipeline just process at their own pace)
  11. // * the timestamps are used to mark "last datapoint to query for" and as such, are a configurable amount of seconds in the past
  12. // * because we want to allow:
  13. // - a clean "resume where we left off" and "don't yield ticks we already did"
  14. // - adjusting offset over time to compensate for storage backing up or getting fast and providing lower latency
  15. // you specify a lastProcessed timestamp as well as an offset at creation, or runtime
  16. type Ticker struct {
  17. C chan time.Time
  18. clock clock.Clock
  19. last time.Time
  20. offset time.Duration
  21. newOffset chan time.Duration
  22. }
  23. // NewTicker returns a ticker that ticks on second marks or very shortly after, and never drops ticks
  24. func NewTicker(last time.Time, initialOffset time.Duration, c clock.Clock) *Ticker {
  25. t := &Ticker{
  26. C: make(chan time.Time),
  27. clock: c,
  28. last: last,
  29. offset: initialOffset,
  30. newOffset: make(chan time.Duration),
  31. }
  32. go t.run()
  33. return t
  34. }
  35. func (t *Ticker) run() {
  36. for {
  37. next := t.last.Add(time.Duration(1) * time.Second)
  38. diff := t.clock.Now().Add(-t.offset).Sub(next)
  39. if diff >= 0 {
  40. t.C <- next
  41. t.last = next
  42. continue
  43. }
  44. // tick is too young. try again when ...
  45. select {
  46. case <-t.clock.After(-diff): // ...it'll definitely be old enough
  47. case offset := <-t.newOffset: // ...it might be old enough
  48. t.offset = offset
  49. }
  50. }
  51. }