engine.go 954 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. package alerting
  2. import (
  3. "time"
  4. "github.com/Unknwon/log"
  5. "github.com/benbjohnson/clock"
  6. )
  7. type Engine struct {
  8. execQueue chan *AlertJob
  9. resultQueue chan *AlertResult
  10. clock clock.Clock
  11. ticker *Ticker
  12. scheduler Scheduler
  13. }
  14. func NewEngine() *Engine {
  15. e := &Engine{
  16. ticker: NewTicker(time.Now(), time.Second*0, clock.New()),
  17. execQueue: make(chan *AlertJob, 1000),
  18. resultQueue: make(chan *AlertResult, 1000),
  19. scheduler: NewScheduler(),
  20. }
  21. return e
  22. }
  23. func (e *Engine) Start() {
  24. go e.schedulerTick()
  25. go e.execDispatch()
  26. }
  27. func (e *Engine) Stop() {
  28. close(e.execQueue)
  29. }
  30. func (e *Engine) schedulerTick() {
  31. for {
  32. select {
  33. case tick := <-e.ticker.C:
  34. e.scheduler.Tick(tick, e.execQueue)
  35. }
  36. }
  37. }
  38. func (e *Engine) execDispatch() {
  39. for job := range e.execQueue {
  40. log.Info("AlertEngine: Dispatching alert job %s", job.Rule.Title)
  41. job.Running = true
  42. //scheduler.measureAndExecute(executor, job)
  43. }
  44. }