| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542 |
- import _ from 'lodash';
- import flatten from 'app/core/utils/flatten';
- import * as queryDef from './query_def';
- import TableModel from 'app/core/table_model';
- import { DataFrame, toDataFrame, FieldType, MutableDataFrame } from '@grafana/data';
- import { DataQueryResponse } from '@grafana/ui';
- import { ElasticsearchAggregation } from './types';
- export class ElasticResponse {
- constructor(private targets: any, private response: any) {
- this.targets = targets;
- this.response = response;
- }
- processMetrics(esAgg: any, target: any, seriesList: any, props: any) {
- let metric, y, i, newSeries, bucket, value;
- for (y = 0; y < target.metrics.length; y++) {
- metric = target.metrics[y];
- if (metric.hide) {
- continue;
- }
- switch (metric.type) {
- case 'count': {
- newSeries = { datapoints: [], metric: 'count', props: props };
- for (i = 0; i < esAgg.buckets.length; i++) {
- bucket = esAgg.buckets[i];
- value = bucket.doc_count;
- newSeries.datapoints.push([value, bucket.key]);
- }
- seriesList.push(newSeries);
- break;
- }
- case 'percentiles': {
- if (esAgg.buckets.length === 0) {
- break;
- }
- const firstBucket = esAgg.buckets[0];
- const percentiles = firstBucket[metric.id].values;
- for (const percentileName in percentiles) {
- newSeries = {
- datapoints: [],
- metric: 'p' + percentileName,
- props: props,
- field: metric.field,
- };
- for (i = 0; i < esAgg.buckets.length; i++) {
- bucket = esAgg.buckets[i];
- const values = bucket[metric.id].values;
- newSeries.datapoints.push([values[percentileName], bucket.key]);
- }
- seriesList.push(newSeries);
- }
- break;
- }
- case 'extended_stats': {
- for (const statName in metric.meta) {
- if (!metric.meta[statName]) {
- continue;
- }
- newSeries = {
- datapoints: [],
- metric: statName,
- props: props,
- field: metric.field,
- };
- for (i = 0; i < esAgg.buckets.length; i++) {
- bucket = esAgg.buckets[i];
- const stats = bucket[metric.id];
- // add stats that are in nested obj to top level obj
- stats.std_deviation_bounds_upper = stats.std_deviation_bounds.upper;
- stats.std_deviation_bounds_lower = stats.std_deviation_bounds.lower;
- newSeries.datapoints.push([stats[statName], bucket.key]);
- }
- seriesList.push(newSeries);
- }
- break;
- }
- default: {
- newSeries = {
- datapoints: [],
- metric: metric.type,
- field: metric.field,
- metricId: metric.id,
- props: props,
- };
- for (i = 0; i < esAgg.buckets.length; i++) {
- bucket = esAgg.buckets[i];
- value = bucket[metric.id];
- if (value !== undefined) {
- if (value.normalized_value) {
- newSeries.datapoints.push([value.normalized_value, bucket.key]);
- } else {
- newSeries.datapoints.push([value.value, bucket.key]);
- }
- }
- }
- seriesList.push(newSeries);
- break;
- }
- }
- }
- }
- processAggregationDocs(esAgg: any, aggDef: ElasticsearchAggregation, target: any, table: any, props: any) {
- // add columns
- if (table.columns.length === 0) {
- for (const propKey of _.keys(props)) {
- table.addColumn({ text: propKey, filterable: true });
- }
- table.addColumn({ text: aggDef.field, filterable: true });
- }
- // helper func to add values to value array
- const addMetricValue = (values: any[], metricName: string, value: any) => {
- table.addColumn({ text: metricName });
- values.push(value);
- };
- for (const bucket of esAgg.buckets) {
- const values = [];
- for (const propValues of _.values(props)) {
- values.push(propValues);
- }
- // add bucket key (value)
- values.push(bucket.key);
- for (const metric of target.metrics) {
- switch (metric.type) {
- case 'count': {
- addMetricValue(values, this.getMetricName(metric.type), bucket.doc_count);
- break;
- }
- case 'extended_stats': {
- for (const statName in metric.meta) {
- if (!metric.meta[statName]) {
- continue;
- }
- const stats = bucket[metric.id];
- // add stats that are in nested obj to top level obj
- stats.std_deviation_bounds_upper = stats.std_deviation_bounds.upper;
- stats.std_deviation_bounds_lower = stats.std_deviation_bounds.lower;
- addMetricValue(values, this.getMetricName(statName), stats[statName]);
- }
- break;
- }
- case 'percentiles': {
- const percentiles = bucket[metric.id].values;
- for (const percentileName in percentiles) {
- addMetricValue(values, `p${percentileName} ${metric.field}`, percentiles[percentileName]);
- }
- break;
- }
- default: {
- let metricName = this.getMetricName(metric.type);
- const otherMetrics = _.filter(target.metrics, { type: metric.type });
- // if more of the same metric type include field field name in property
- if (otherMetrics.length > 1) {
- metricName += ' ' + metric.field;
- }
- addMetricValue(values, metricName, bucket[metric.id].value);
- break;
- }
- }
- }
- table.rows.push(values);
- }
- }
- // This is quite complex
- // need to recurse down the nested buckets to build series
- processBuckets(aggs: any, target: any, seriesList: any, table: any, props: any, depth: any) {
- let bucket, aggDef: any, esAgg, aggId;
- const maxDepth = target.bucketAggs.length - 1;
- for (aggId in aggs) {
- aggDef = _.find(target.bucketAggs, { id: aggId });
- esAgg = aggs[aggId];
- if (!aggDef) {
- continue;
- }
- if (depth === maxDepth) {
- if (aggDef.type === 'date_histogram') {
- this.processMetrics(esAgg, target, seriesList, props);
- } else {
- this.processAggregationDocs(esAgg, aggDef, target, table, props);
- }
- } else {
- for (const nameIndex in esAgg.buckets) {
- bucket = esAgg.buckets[nameIndex];
- props = _.clone(props);
- if (bucket.key !== void 0) {
- props[aggDef.field] = bucket.key;
- } else {
- props['filter'] = nameIndex;
- }
- if (bucket.key_as_string) {
- props[aggDef.field] = bucket.key_as_string;
- }
- this.processBuckets(bucket, target, seriesList, table, props, depth + 1);
- }
- }
- }
- }
- private getMetricName(metric: any) {
- let metricDef: any = _.find(queryDef.metricAggTypes, { value: metric });
- if (!metricDef) {
- metricDef = _.find(queryDef.extendedStats, { value: metric });
- }
- return metricDef ? metricDef.text : metric;
- }
- private getSeriesName(series: any, target: any, metricTypeCount: any) {
- let metricName = this.getMetricName(series.metric);
- if (target.alias) {
- const regex = /\{\{([\s\S]+?)\}\}/g;
- return target.alias.replace(regex, (match: any, g1: any, g2: any) => {
- const group = g1 || g2;
- if (group.indexOf('term ') === 0) {
- return series.props[group.substring(5)];
- }
- if (series.props[group] !== void 0) {
- return series.props[group];
- }
- if (group === 'metric') {
- return metricName;
- }
- if (group === 'field') {
- return series.field || '';
- }
- return match;
- });
- }
- if (series.field && queryDef.isPipelineAgg(series.metric)) {
- if (series.metric && queryDef.isPipelineAggWithMultipleBucketPaths(series.metric)) {
- const agg: any = _.find(target.metrics, { id: series.metricId });
- if (agg && agg.settings.script) {
- metricName = agg.settings.script;
- for (const pv of agg.pipelineVariables) {
- const appliedAgg: any = _.find(target.metrics, { id: pv.pipelineAgg });
- if (appliedAgg) {
- metricName = metricName.replace('params.' + pv.name, queryDef.describeMetric(appliedAgg));
- }
- }
- } else {
- metricName = 'Unset';
- }
- } else {
- const appliedAgg: any = _.find(target.metrics, { id: series.field });
- if (appliedAgg) {
- metricName += ' ' + queryDef.describeMetric(appliedAgg);
- } else {
- metricName = 'Unset';
- }
- }
- } else if (series.field) {
- metricName += ' ' + series.field;
- }
- const propKeys = _.keys(series.props);
- if (propKeys.length === 0) {
- return metricName;
- }
- let name = '';
- for (const propName in series.props) {
- name += series.props[propName] + ' ';
- }
- if (metricTypeCount === 1) {
- return name.trim();
- }
- return name.trim() + ' ' + metricName;
- }
- nameSeries(seriesList: any, target: any) {
- const metricTypeCount = _.uniq(_.map(seriesList, 'metric')).length;
- for (let i = 0; i < seriesList.length; i++) {
- const series = seriesList[i];
- series.target = this.getSeriesName(series, target, metricTypeCount);
- }
- }
- processHits(hits: { total: { value: any }; hits: any[] }, seriesList: any[]) {
- const hitsTotal = typeof hits.total === 'number' ? hits.total : hits.total.value; // <- Works with Elasticsearch 7.0+
- const series: any = {
- target: 'docs',
- type: 'docs',
- datapoints: [],
- total: hitsTotal,
- filterable: true,
- };
- let propName, hit, doc: any, i;
- for (i = 0; i < hits.hits.length; i++) {
- hit = hits.hits[i];
- doc = {
- _id: hit._id,
- _type: hit._type,
- _index: hit._index,
- };
- if (hit._source) {
- for (propName in hit._source) {
- doc[propName] = hit._source[propName];
- }
- }
- for (propName in hit.fields) {
- doc[propName] = hit.fields[propName];
- }
- series.datapoints.push(doc);
- }
- seriesList.push(series);
- }
- trimDatapoints(aggregations: any, target: any) {
- const histogram: any = _.find(target.bucketAggs, { type: 'date_histogram' });
- const shouldDropFirstAndLast = histogram && histogram.settings && histogram.settings.trimEdges;
- if (shouldDropFirstAndLast) {
- const trim = histogram.settings.trimEdges;
- for (const prop in aggregations) {
- const points = aggregations[prop];
- if (points.datapoints.length > trim * 2) {
- points.datapoints = points.datapoints.slice(trim, points.datapoints.length - trim);
- }
- }
- }
- }
- getErrorFromElasticResponse(response: any, err: any) {
- const result: any = {};
- result.data = JSON.stringify(err, null, 4);
- if (err.root_cause && err.root_cause.length > 0 && err.root_cause[0].reason) {
- result.message = err.root_cause[0].reason;
- } else {
- result.message = err.reason || 'Unkown elastic error response';
- }
- if (response.$$config) {
- result.config = response.$$config;
- }
- return result;
- }
- getTimeSeries() {
- const seriesList = [];
- for (let i = 0; i < this.response.responses.length; i++) {
- const response = this.response.responses[i];
- if (response.error) {
- throw this.getErrorFromElasticResponse(this.response, response.error);
- }
- if (response.hits && response.hits.hits.length > 0) {
- this.processHits(response.hits, seriesList);
- }
- if (response.aggregations) {
- const aggregations = response.aggregations;
- const target = this.targets[i];
- const tmpSeriesList: any[] = [];
- const table = new TableModel();
- this.processBuckets(aggregations, target, tmpSeriesList, table, {}, 0);
- this.trimDatapoints(tmpSeriesList, target);
- this.nameSeries(tmpSeriesList, target);
- for (let y = 0; y < tmpSeriesList.length; y++) {
- seriesList.push(tmpSeriesList[y]);
- }
- if (table.rows.length > 0) {
- seriesList.push(table);
- }
- }
- }
- return { data: seriesList };
- }
- getLogs(logMessageField?: string, logLevelField?: string): DataQueryResponse {
- const dataFrame: DataFrame[] = [];
- const docs: any[] = [];
- for (let n = 0; n < this.response.responses.length; n++) {
- const response = this.response.responses[n];
- if (response.error) {
- throw this.getErrorFromElasticResponse(this.response, response.error);
- }
- const hits = response.hits;
- let propNames: string[] = [];
- let propName, hit, doc: any, i;
- for (i = 0; i < hits.hits.length; i++) {
- hit = hits.hits[i];
- const flattened = hit._source ? flatten(hit._source, null) : {};
- doc = {};
- doc[this.targets[0].timeField] = null;
- doc = {
- ...doc,
- _id: hit._id,
- _type: hit._type,
- _index: hit._index,
- ...flattened,
- };
- // Note: the order of for...in is arbitrary amd implementation dependant
- // and should probably not be relied upon.
- for (propName in hit.fields) {
- if (propNames.indexOf(propName) === -1) {
- propNames.push(propName);
- }
- doc[propName] = hit.fields[propName];
- }
- for (propName in doc) {
- if (propNames.indexOf(propName) === -1) {
- propNames.push(propName);
- }
- }
- doc._source = { ...flattened };
- docs.push(doc);
- }
- if (docs.length > 0) {
- propNames = propNames.sort();
- const series = new MutableDataFrame({ fields: [] });
- series.addField({
- name: this.targets[0].timeField,
- type: FieldType.time,
- }).parse = (v: any) => {
- return v[0] || '';
- };
- if (logMessageField) {
- series.addField({
- name: logMessageField,
- type: FieldType.string,
- }).parse = (v: any) => {
- return v || '';
- };
- } else {
- series.addField({
- name: '_source',
- type: FieldType.string,
- }).parse = (v: any) => {
- return JSON.stringify(v, null, 2);
- };
- }
- if (logLevelField) {
- series.addField({
- name: 'level',
- type: FieldType.string,
- }).parse = (v: any) => {
- return v || '';
- };
- }
- for (const propName of propNames) {
- if (propName === this.targets[0].timeField || propName === '_source') {
- continue;
- }
- series.addField({
- name: propName,
- type: FieldType.string,
- }).parse = (v: any) => {
- return v || '';
- };
- }
- // Add a row for each document
- for (const doc of docs) {
- series.add(doc);
- }
- dataFrame.push(series);
- }
- if (response.aggregations) {
- const aggregations = response.aggregations;
- const target = this.targets[n];
- const tmpSeriesList: any[] = [];
- const table = new TableModel();
- this.processBuckets(aggregations, target, tmpSeriesList, table, {}, 0);
- this.trimDatapoints(tmpSeriesList, target);
- this.nameSeries(tmpSeriesList, target);
- for (let y = 0; y < tmpSeriesList.length; y++) {
- const series = toDataFrame(tmpSeriesList[y]);
- series.labels = {};
- dataFrame.push(series);
- }
- }
- }
- return { data: dataFrame };
- }
- }
|