Selaa lähdekoodia

implement missing value fill functionality for postgres

Sven Klemm 8 vuotta sitten
vanhempi
commit
c1282e8ea8
1 muutettua tiedostoa jossa 55 lisäystä ja 13 poistoa
  1. 55 13
      pkg/tsdb/postgres/postgres.go

+ 55 - 13
pkg/tsdb/postgres/postgres.go

@@ -4,6 +4,7 @@ import (
 	"container/list"
 	"context"
 	"fmt"
+	"math"
 	"net/url"
 	"strconv"
 	"time"
@@ -198,6 +199,18 @@ func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *co
 		return fmt.Errorf("Found no column named time")
 	}
 
+	fillMissing := query.Model.Get("fill").MustBool(false)
+	var fillInterval float64
+	fillValue := null.Float{}
+	if fillMissing {
+		fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000
+		if query.Model.Get("fillNULL").MustBool(false) == false {
+			fillValue.Float64 = query.Model.Get("fillValue").MustFloat64()
+			fillValue.Valid = true
+		}
+
+	}
+
 	for rows.Next() {
 		var timestamp float64
 		var value null.Float
@@ -249,7 +262,34 @@ func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *co
 			if metricIndex == -1 {
 				metric = col
 			}
-			e.appendTimePoint(pointsBySeries, seriesByQueryOrder, metric, timestamp, value)
+
+			series, exist := pointsBySeries[metric]
+			if exist == false {
+				series = &tsdb.TimeSeries{Name: metric}
+				pointsBySeries[metric] = series
+				seriesByQueryOrder.PushBack(metric)
+			}
+
+			if fillMissing {
+				var intervalStart float64
+				if exist == false {
+					intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6)
+				} else {
+					intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval
+				}
+
+				// align interval start
+				intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
+
+				for i := intervalStart; i < timestamp; i += fillInterval {
+					series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
+					rowCount++
+				}
+			}
+
+			series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)})
+
+			e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value)
 			rowCount++
 
 		}
@@ -258,20 +298,22 @@ func (e PostgresQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *co
 	for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() {
 		key := elem.Value.(string)
 		result.Series = append(result.Series, pointsBySeries[key])
+
+		if fillMissing {
+			series := pointsBySeries[key]
+			// fill in values from last fetched value till interval end
+			intervalStart := series.Points[len(series.Points)-1][1].Float64
+			intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6)
+
+			// align interval start
+			intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval
+			for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval {
+				series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)})
+				rowCount++
+			}
+		}
 	}
 
 	result.Meta.Set("rowCount", rowCount)
 	return nil
 }
-
-func (e PostgresQueryEndpoint) appendTimePoint(pointsBySeries map[string]*tsdb.TimeSeries, seriesByQueryOrder *list.List, metric string, timestamp float64, value null.Float) {
-	if series, exist := pointsBySeries[metric]; exist {
-		series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)})
-	} else {
-		series := &tsdb.TimeSeries{Name: metric}
-		series.Points = append(series.Points, tsdb.TimePoint{value, null.FloatFrom(timestamp)})
-		pointsBySeries[metric] = series
-		seriesByQueryOrder.PushBack(metric)
-	}
-	e.log.Debug("Rows", "metric", metric, "time", timestamp, "value", value)
-}