|
|
@@ -122,7 +122,7 @@ func (this *Scheduler) Executor(executor Executor) {
|
|
|
log.Info("Executor: queue length %d", len(this.runQueue))
|
|
|
log.Info("Executor: executing %s", job.rule.Title)
|
|
|
this.jobs[job.rule.Id].running = true
|
|
|
- go this.Measure(executor, job)
|
|
|
+ this.MeasureAndExecute(executor, job)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -135,11 +135,20 @@ func (this *Scheduler) HandleResponses() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (this *Scheduler) Measure(exec Executor, rule *AlertJob) {
|
|
|
+func (this *Scheduler) MeasureAndExecute(exec Executor, rule *AlertJob) {
|
|
|
now := time.Now()
|
|
|
- exec.Execute(rule.rule, this.responseQueue)
|
|
|
- elapsed := time.Since(now)
|
|
|
- log.Info("Schedular: exeuction took %v milli seconds", elapsed.Nanoseconds()/1000000)
|
|
|
+
|
|
|
+ response := make(chan *AlertResult, 1)
|
|
|
+ go exec.Execute(rule.rule, response)
|
|
|
+
|
|
|
+ select {
|
|
|
+ case <-time.After(time.Second * 5):
|
|
|
+ this.responseQueue <- &AlertResult{id: rule.rule.Id, state: "timed out", duration: time.Since(now).Nanoseconds() / 1000000}
|
|
|
+ case r := <-response:
|
|
|
+ r.duration = time.Since(now).Nanoseconds() / 1000000
|
|
|
+ log.Info("Schedular: exeuction took %v milli seconds", r.duration)
|
|
|
+ this.responseQueue <- r
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
type AlertJob struct {
|
|
|
@@ -152,5 +161,5 @@ type AlertJob struct {
|
|
|
type AlertResult struct {
|
|
|
id int64
|
|
|
state string
|
|
|
- duration time.Time
|
|
|
+ duration int64
|
|
|
}
|