浏览代码

@grafana/data: improve the CircularVector api (#18716)

Ryan McKinley 6 年之前
父节点
当前提交
73d9f262bd

+ 124 - 9
packages/grafana-data/src/utils/vector.test.ts

@@ -28,16 +28,131 @@ describe('Check Proxy Vector', () => {
 });
 
 describe('Check Circular Vector', () => {
-  it('should support constant values', () => {
-    const buffer = [3, 2, 1, 0];
-    const v = new CircularVector(buffer);
-    expect(v.length).toEqual(4);
-    expect(v.toJSON()).toEqual([3, 2, 1, 0]);
+  it('should append values', () => {
+    const buffer = [1, 2, 3];
+    const v = new CircularVector({ buffer }); // tail is default option
+    expect(v.toArray()).toEqual([1, 2, 3]);
+
+    v.add(4);
+    expect(v.toArray()).toEqual([2, 3, 4]);
+
+    v.add(5);
+    expect(v.toArray()).toEqual([3, 4, 5]);
+
+    v.add(6);
+    expect(v.toArray()).toEqual([4, 5, 6]);
+
+    v.add(7);
+    expect(v.toArray()).toEqual([5, 6, 7]);
+
+    v.add(8);
+    expect(v.toArray()).toEqual([6, 7, 8]);
+  });
+
+  it('should grow buffer until it hits capacity (append)', () => {
+    const v = new CircularVector({ capacity: 3 }); // tail is default option
+    expect(v.toArray()).toEqual([]);
+
+    v.add(1);
+    expect(v.toArray()).toEqual([1]);
+
+    v.add(2);
+    expect(v.toArray()).toEqual([1, 2]);
+
+    v.add(3);
+    expect(v.toArray()).toEqual([1, 2, 3]);
+
+    v.add(4);
+    expect(v.toArray()).toEqual([2, 3, 4]);
+
+    v.add(5);
+    expect(v.toArray()).toEqual([3, 4, 5]);
+  });
+
+  it('should prepend values', () => {
+    const buffer = [3, 2, 1];
+    const v = new CircularVector({ buffer, append: 'head' });
+    expect(v.toArray()).toEqual([3, 2, 1]);
+
+    v.add(4);
+    expect(v.toArray()).toEqual([4, 3, 2]);
+
+    v.add(5);
+    expect(v.toArray()).toEqual([5, 4, 3]);
+
+    v.add(6);
+    expect(v.toArray()).toEqual([6, 5, 4]);
+
+    v.add(7);
+    expect(v.toArray()).toEqual([7, 6, 5]);
+
+    v.add(8);
+    expect(v.toArray()).toEqual([8, 7, 6]);
+  });
+
+  it('should expand buffer and then prepend', () => {
+    const v = new CircularVector({ capacity: 3, append: 'head' });
+    expect(v.toArray()).toEqual([]);
+
+    v.add(1);
+    expect(v.toArray()).toEqual([1]);
+
+    v.add(2);
+    expect(v.toArray()).toEqual([2, 1]);
+
+    v.add(3);
+    expect(v.toArray()).toEqual([3, 2, 1]);
+
+    v.add(4);
+    expect(v.toArray()).toEqual([4, 3, 2]);
+
+    v.add(5);
+    expect(v.toArray()).toEqual([5, 4, 3]);
+  });
+
+  it('should reduce size and keep working (tail)', () => {
+    const buffer = [1, 2, 3, 4, 5];
+    const v = new CircularVector({ buffer });
+    expect(v.toArray()).toEqual([1, 2, 3, 4, 5]);
+
+    v.setCapacity(3);
+    expect(v.toArray()).toEqual([3, 4, 5]);
+
+    v.add(6);
+    expect(v.toArray()).toEqual([4, 5, 6]);
+
+    v.add(7);
+    expect(v.toArray()).toEqual([5, 6, 7]);
+  });
+
+  it('should reduce size and keep working (head)', () => {
+    const buffer = [5, 4, 3, 2, 1];
+    const v = new CircularVector({ buffer, append: 'head' });
+    expect(v.toArray()).toEqual([5, 4, 3, 2, 1]);
+
+    v.setCapacity(3);
+    expect(v.toArray()).toEqual([5, 4, 3]);
+
+    v.add(6);
+    expect(v.toArray()).toEqual([6, 5, 4]);
+
+    v.add(7);
+    expect(v.toArray()).toEqual([7, 6, 5]);
+  });
+
+  it('change buffer direction', () => {
+    const buffer = [1, 2, 3];
+    const v = new CircularVector({ buffer });
+    expect(v.toArray()).toEqual([1, 2, 3]);
+
+    v.setAppendMode('head');
+    expect(v.toArray()).toEqual([3, 2, 1]);
 
-    v.append(4);
-    expect(v.toJSON()).toEqual([4, 3, 2, 1]);
+    v.add(4);
+    expect(v.toArray()).toEqual([4, 3, 2]);
 
-    v.append(5);
-    expect(v.toJSON()).toEqual([5, 4, 3, 2]);
+    v.setAppendMode('tail');
+    v.add(5);
+    expect(v.toArray()).toEqual([3, 4, 5]);
   });
 });

+ 120 - 25
packages/grafana-data/src/utils/vector.ts

@@ -76,28 +76,18 @@ export class ScaledVector implements Vector<number> {
   }
 }
 
-export class CircularVector<T = any> implements Vector<T> {
-  buffer: T[];
-  index: number;
-  length: number;
-
-  constructor(buffer: T[]) {
-    this.length = buffer.length;
-    this.buffer = buffer;
-    this.index = 0;
-  }
+/**
+ * Values are returned in the order defined by the input parameter
+ */
+export class SortedVector<T = any> implements Vector<T> {
+  constructor(private source: Vector<T>, private order: number[]) {}
 
-  append(value: T) {
-    let idx = this.index - 1;
-    if (idx < 0) {
-      idx = this.length - 1;
-    }
-    this.buffer[idx] = value;
-    this.index = idx;
+  get length(): number {
+    return this.source.length;
   }
 
   get(index: number): T {
-    return this.buffer[(index + this.index) % this.length];
+    return this.source.get(this.order[index]);
   }
 
   toArray(): T[] {
@@ -109,18 +99,123 @@ export class CircularVector<T = any> implements Vector<T> {
   }
 }
 
+interface CircularOptions<T> {
+  buffer?: T[];
+  append?: 'head' | 'tail';
+  capacity?: number;
+}
+
 /**
- * Values are returned in the order defined by the input parameter
+ * Circular vector uses a single buffer to capture a stream of values
+ * overwriting the oldest value on add.
+ *
+ * This supports addting to the 'head' or 'tail' and will grow the buffer
+ * to match a configured capacity.
  */
-export class SortedVector<T = any> implements Vector<T> {
-  constructor(private source: Vector<T>, private order: number[]) {}
+export class CircularVector<T = any> implements Vector<T> {
+  private buffer: T[];
+  private index: number;
+  private capacity: number;
+  private tail: boolean;
+
+  constructor(options: CircularOptions<T>) {
+    this.buffer = options.buffer || [];
+    this.capacity = this.buffer.length;
+    this.tail = 'head' !== options.append;
+    this.index = 0;
 
-  get length(): number {
-    return this.source.length;
+    this.add = this.getAddFunction();
+    if (options.capacity) {
+      this.setCapacity(options.capacity);
+    }
   }
 
-  get(index: number): T {
-    return this.source.get(this.order[index]);
+  /**
+   * This gets the appropriate add function depending on the buffer state:
+   *  * head vs tail
+   *  * growing buffer vs overwriting values
+   */
+  private getAddFunction() {
+    // When we are not at capacity, it should actually modify the buffer
+    if (this.capacity > this.buffer.length) {
+      if (this.tail) {
+        return (value: T) => {
+          this.buffer.push(value);
+          if (this.buffer.length >= this.capacity) {
+            this.add = this.getAddFunction();
+          }
+        };
+      } else {
+        return (value: T) => {
+          this.buffer.unshift(value);
+          if (this.buffer.length >= this.capacity) {
+            this.add = this.getAddFunction();
+          }
+        };
+      }
+    }
+
+    if (this.tail) {
+      return (value: T) => {
+        this.buffer[this.index] = value;
+        this.index = (this.index + 1) % this.buffer.length;
+      };
+    }
+
+    // Append values to the head
+    return (value: T) => {
+      let idx = this.index - 1;
+      if (idx < 0) {
+        idx = this.buffer.length - 1;
+      }
+      this.buffer[idx] = value;
+      this.index = idx;
+    };
+  }
+
+  setCapacity(v: number) {
+    if (this.capacity === v) {
+      return;
+    }
+    // Make a copy so it is in order and new additions can be at the head or tail
+    const copy = this.toArray();
+    if (v > this.length) {
+      this.buffer = copy;
+    } else if (v < this.capacity) {
+      // Shrink the buffer
+      const delta = this.length - v;
+      if (this.tail) {
+        this.buffer = copy.slice(delta, copy.length); // Keep last items
+      } else {
+        this.buffer = copy.slice(0, copy.length - delta); // Keep first items
+      }
+    }
+    this.capacity = v;
+    this.index = 0;
+    this.add = this.getAddFunction();
+  }
+
+  setAppendMode(mode: 'head' | 'tail') {
+    const tail = 'head' !== mode;
+    if (tail !== this.tail) {
+      this.buffer = this.toArray().reverse();
+      this.index = 0;
+      this.tail = tail;
+      this.add = this.getAddFunction();
+    }
+  }
+
+  /**
+   * Add the value to the buffer
+   */
+  add: (value: T) => void;
+
+  get(index: number) {
+    return this.buffer[(index + this.index) % this.buffer.length];
+  }
+
+  get length() {
+    return this.buffer.length;
   }
 
   toArray(): T[] {

+ 10 - 10
public/app/plugins/datasource/testdata/StreamHandler.ts

@@ -127,7 +127,7 @@ export class StreamWorker {
     for (let i = 0; i < append.length; i++) {
       const row = append[i];
       for (let j = 0; j < values.length; j++) {
-        values[j].append(row[j]); // Circular buffer will kick out old entries
+        values[j].add(row[j]); // Circular buffer will kick out old entries
       }
     }
     // Clear any cached values
@@ -178,8 +178,8 @@ export class SignalWorker extends StreamWorker {
     const { speed, buffer } = this.query;
     const request = this.stream.request;
     const maxRows = buffer ? buffer : request.maxDataPoints;
-    const times = new CircularVector(new Array<number>(maxRows));
-    const vals = new CircularVector(new Array<number>(maxRows));
+    const times = new CircularVector({ capacity: maxRows });
+    const vals = new CircularVector({ capacity: maxRows });
     this.values = [times, vals];
 
     const data = new DataFrameHelper({
@@ -193,8 +193,8 @@ export class SignalWorker extends StreamWorker {
 
     for (let i = 0; i < this.bands; i++) {
       const suffix = this.bands > 1 ? ` ${i + 1}` : '';
-      const min = new CircularVector(new Array<number>(maxRows));
-      const max = new CircularVector(new Array<number>(maxRows));
+      const min = new CircularVector({ capacity: maxRows });
+      const max = new CircularVector({ capacity: maxRows });
       this.values.push(min);
       this.values.push(max);
 
@@ -209,7 +209,7 @@ export class SignalWorker extends StreamWorker {
     for (let i = 0; i < maxRows; i++) {
       const row = this.nextRow(time);
       for (let j = 0; j < this.values.length; j++) {
-        this.values[j].append(row[j]);
+        this.values[j].add(row[j]);
       }
       time += speed;
     }
@@ -347,8 +347,8 @@ export class LogsWorker extends StreamWorker {
 
     const maxRows = buffer ? buffer : request.maxDataPoints;
 
-    const times = new CircularVector(new Array(maxRows));
-    const lines = new CircularVector(new Array(maxRows));
+    const times = new CircularVector({ capacity: maxRows });
+    const lines = new CircularVector({ capacity: maxRows });
 
     this.values = [times, lines];
     this.data = new DataFrameHelper({
@@ -364,8 +364,8 @@ export class LogsWorker extends StreamWorker {
     let time = Date.now() - maxRows * speed;
     for (let i = 0; i < maxRows; i++) {
       const row = this.nextRow(time);
-      times.append(row[0]);
-      lines.append(row[1]);
+      times.add(row[0]);
+      lines.add(row[1]);
       time += speed;
     }
   }