From 9f4a6c6fe2757d423681575970d30f477361d0d6 Mon Sep 17 00:00:00 2001 From: WU Jingdi Date: Thu, 16 May 2024 15:00:24 +0800 Subject: [PATCH] feat: support any precision in PromQL (#3933) * feat: support any precision in PromQL * chore: add test --- .../src/extension_plan/instant_manipulate.rs | 8 +- src/promql/src/extension_plan/normalize.rs | 7 +- .../src/extension_plan/range_manipulate.rs | 15 +- src/promql/src/planner.rs | 155 +++++++++++++++++- src/query/src/range_select/plan.rs | 8 +- .../common/promql/precisions.result | 121 ++++++++++++++ .../standalone/common/promql/precisions.sql | 55 +++++++ 7 files changed, 355 insertions(+), 14 deletions(-) create mode 100644 tests/cases/standalone/common/promql/precisions.result create mode 100644 tests/cases/standalone/common/promql/precisions.sql diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index 03e2c373eed8..6591f6db06c6 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -342,12 +342,16 @@ impl InstantManipulateStream { // and the function `vectorSelectorSingle` pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult { let mut take_indices = vec![]; - // TODO(ruihang): maybe the input is not timestamp millisecond array + let ts_column = input .column(self.time_index) .as_any() .downcast_ref::() - .unwrap(); + .ok_or_else(|| { + DataFusionError::Execution( + "Time index Column downcast to TimestampMillisecondArray failed".into(), + ) + })?; // field column for staleness check let field_column = self diff --git a/src/promql/src/extension_plan/normalize.rs b/src/promql/src/extension_plan/normalize.rs index 957c55fade54..70ca8da660cc 100644 --- a/src/promql/src/extension_plan/normalize.rs +++ b/src/promql/src/extension_plan/normalize.rs @@ -250,12 +250,15 @@ pub struct SeriesNormalizeStream { impl SeriesNormalizeStream { pub fn normalize(&self, input: RecordBatch) -> DataFusionResult { - // TODO(ruihang): maybe the input is not timestamp millisecond array let ts_column = input .column(self.time_index) .as_any() .downcast_ref::() - .unwrap(); + .ok_or_else(|| { + DataFusionError::Execution( + "Time index Column downcast to TimestampMillisecondArray failed".into(), + ) + })?; // bias the timestamp column by offset let ts_column_biased = if self.offset == 0 { diff --git a/src/promql/src/extension_plan/range_manipulate.rs b/src/promql/src/extension_plan/range_manipulate.rs index 49002dabfa74..6f644f308b3b 100644 --- a/src/promql/src/extension_plan/range_manipulate.rs +++ b/src/promql/src/extension_plan/range_manipulate.rs @@ -433,7 +433,7 @@ impl RangeManipulateStream { pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult> { let mut other_columns = (0..input.columns().len()).collect::>(); // calculate the range - let (aligned_ts, ranges) = self.calculate_range(&input); + let (aligned_ts, ranges) = self.calculate_range(&input)?; // ignore this if all ranges are empty if ranges.iter().all(|(_, len)| *len == 0) { return Ok(None); @@ -472,12 +472,19 @@ impl RangeManipulateStream { .map_err(|e| DataFusionError::ArrowError(e, None)) } - fn calculate_range(&self, input: &RecordBatch) -> (ArrayRef, Vec<(u32, u32)>) { + fn calculate_range( + &self, + input: &RecordBatch, + ) -> DataFusionResult<(ArrayRef, Vec<(u32, u32)>)> { let ts_column = input .column(self.time_index) .as_any() .downcast_ref::() - .unwrap(); + .ok_or_else(|| { + DataFusionError::Execution( + "Time index Column downcast to TimestampMillisecondArray failed".into(), + ) + })?; let mut aligned_ts = vec![]; let mut ranges = vec![]; @@ -506,7 +513,7 @@ impl RangeManipulateStream { let aligned_ts_array = Arc::new(TimestampMillisecondArray::from(aligned_ts)) as _; - (aligned_ts_array, ranges) + Ok((aligned_ts_array, ranges)) } } diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 0af53088388a..9fe5d3019b44 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -35,7 +35,8 @@ use datafusion::prelude::{Column, Expr as DfExpr, JoinType}; use datafusion::scalar::ScalarValue; use datafusion::sql::TableReference; use datafusion_expr::utils::conjunction; -use datatypes::arrow::datatypes::DataType as ArrowDataType; +use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit}; +use datatypes::data_type::ConcreteDataType; use itertools::Itertools; use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME}; use promql_parser::parser::{ @@ -910,9 +911,62 @@ impl PromPlanner { .resolve_table(table_ref.clone()) .await .context(CatalogSnafu)?; - // Safety: `scan_filters` is not empty. - let result = LogicalPlanBuilder::scan(table_ref, provider, None) + + let is_time_index_ms = provider + .as_any() + .downcast_ref::() + .context(UnknownTableSnafu)? + .table_provider + .as_any() + .downcast_ref::() + .context(UnknownTableSnafu)? + .table() + .schema() + .timestamp_column() + .with_context(|| TimeIndexNotFoundSnafu { + table: table_ref.to_quoted_string(), + })? + .data_type + == ConcreteDataType::timestamp_millisecond_datatype(); + + let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None) .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + + if !is_time_index_ms { + // cast to ms if time_index not in Millisecond precision + let expr: Vec<_> = self + .ctx + .field_columns + .iter() + .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone()))) + .chain(self.create_tag_column_exprs()?) + .chain(Some(DfExpr::Alias(Alias { + expr: Box::new(DfExpr::Cast(Cast { + expr: Box::new(self.create_time_index_column_expr()?), + data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None), + })), + relation: Some(table_ref.clone()), + name: self + .ctx + .time_index_column + .as_ref() + .with_context(|| TimeIndexNotFoundSnafu { + table: table_ref.to_quoted_string(), + })? + .clone(), + }))) + .collect::>(); + scan_plan = LogicalPlanBuilder::from(scan_plan) + .project(expr) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + } + + // Safety: `scan_filters` is not empty. + let result = LogicalPlanBuilder::from(scan_plan) .filter(conjunction(filter).unwrap()) .context(DataFusionPlanningSnafu)? .build() @@ -2972,4 +3026,99 @@ mod test { assert!(plan.is_err(), "query: {:?}", query); } } + + #[tokio::test] + async fn test_non_ms_precision() { + let catalog_list = MemoryCatalogManager::with_default_setup(); + let columns = vec![ + ColumnSchema::new( + "tag".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + ColumnSchema::new( + "timestamp".to_string(), + ConcreteDataType::timestamp_nanosecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new( + "field".to_string(), + ConcreteDataType::float64_datatype(), + true, + ), + ]; + let schema = Arc::new(Schema::new(columns)); + let table_meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices(vec![0]) + .value_indices(vec![2]) + .next_column_id(1024) + .build() + .unwrap(); + let table_info = TableInfoBuilder::default() + .name("metrics".to_string()) + .meta(table_meta) + .build() + .unwrap(); + let table = EmptyTable::from_table_info(&table_info); + assert!(catalog_list + .register_table_sync(RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "metrics".to_string(), + table_id: 1024, + table, + }) + .is_ok()); + + let plan = PromPlanner::stmt_to_plan( + DfTableSourceProvider::new(catalog_list.clone(), false, QueryContext::arc().as_ref()), + EvalStmt { + expr: parser::parse("metrics{tag = \"1\"}").unwrap(), + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }, + ) + .await + .unwrap(); + assert_eq!(plan.display_indent_schema().to_string(), + "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\ + \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\ + \n Sort: metrics.tag DESC NULLS LAST, metrics.timestamp DESC NULLS LAST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\ + \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\ + \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\ + \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]" + ); + let plan = PromPlanner::stmt_to_plan( + DfTableSourceProvider::new(catalog_list.clone(), false, QueryContext::arc().as_ref()), + EvalStmt { + expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(), + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }, + ) + .await + .unwrap(); + assert_eq!(plan.display_indent_schema().to_string(), + "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\ + \n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\ + \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\ + \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\ + \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\ + \n Sort: metrics.tag DESC NULLS LAST, metrics.timestamp DESC NULLS LAST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\ + \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\ + \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\ + \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]" + ); + } } diff --git a/src/query/src/range_select/plan.rs b/src/query/src/range_select/plan.rs index 6172fc8bd36b..5ba9b2248830 100644 --- a/src/query/src/range_select/plan.rs +++ b/src/query/src/range_select/plan.rs @@ -1116,9 +1116,11 @@ impl RangeSelectStream { let ts_column_ref = ts_column .as_any() .downcast_ref::() - .ok_or(DataFusionError::Execution( - "Time index Column downcast to TimestampMillisecondArray failed".into(), - ))?; + .ok_or_else(|| { + DataFusionError::Execution( + "Time index Column downcast to TimestampMillisecondArray failed".into(), + ) + })?; for i in 0..self.range_exec.len() { let args = self.evaluate_many(&batch, &self.range_exec[i].args)?; // use self.modify_map record (hash, align_ts) => [row_nums] diff --git a/tests/cases/standalone/common/promql/precisions.result b/tests/cases/standalone/common/promql/precisions.result new file mode 100644 index 000000000000..a86cc1e188f7 --- /dev/null +++ b/tests/cases/standalone/common/promql/precisions.result @@ -0,0 +1,121 @@ +CREATE TABLE host_sec ( + ts timestamp(0) time index, + host STRING PRIMARY KEY, + val DOUBLE, +); + +Affected Rows: 0 + +INSERT INTO TABLE host_sec VALUES + (0, 'host1', 1), + (0, 'host2', 2), + (5, 'host1', 3), + (5, 'host2', 4), + (10, 'host1', 5), + (10, 'host2', 6), + (15, 'host1', 7), + (15, 'host2', 8); + +Affected Rows: 8 + +CREATE TABLE host_micro ( + ts timestamp(6) time index, + host STRING PRIMARY KEY, + val DOUBLE, +); + +Affected Rows: 0 + +INSERT INTO TABLE host_micro VALUES + (0, 'host1', 1), + (0, 'host2', 2), + (5000000, 'host1', 3), + (5000000, 'host2', 4), + (10000000, 'host1', 5), + (10000000, 'host2', 6), + (15000000, 'host1', 7), + (15000000, 'host2', 8); + +Affected Rows: 8 + +-- Test on Timestamps of different precisions +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host_sec{host="host1"}; + ++-----+-------+---------------------+ +| val | host | ts | ++-----+-------+---------------------+ +| 1.0 | host1 | 1970-01-01T00:00:00 | +| 3.0 | host1 | 1970-01-01T00:00:05 | +| 5.0 | host1 | 1970-01-01T00:00:10 | +| 7.0 | host1 | 1970-01-01T00:00:15 | ++-----+-------+---------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') avg_over_time(host_sec{host="host1"}[5s]); + ++---------------------+----------------------------------+-------+ +| ts | prom_avg_over_time(ts_range,val) | host | ++---------------------+----------------------------------+-------+ +| 1970-01-01T00:00:00 | 1.0 | host1 | +| 1970-01-01T00:00:05 | 2.0 | host1 | +| 1970-01-01T00:00:10 | 4.0 | host1 | +| 1970-01-01T00:00:15 | 6.0 | host1 | ++---------------------+----------------------------------+-------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host_micro{host="host1"}; + ++-----+-------+---------------------+ +| val | host | ts | ++-----+-------+---------------------+ +| 1.0 | host1 | 1970-01-01T00:00:00 | +| 3.0 | host1 | 1970-01-01T00:00:05 | +| 5.0 | host1 | 1970-01-01T00:00:10 | +| 7.0 | host1 | 1970-01-01T00:00:15 | ++-----+-------+---------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') avg_over_time(host_micro{host="host1"}[5s]); + ++---------------------+----------------------------------+-------+ +| ts | prom_avg_over_time(ts_range,val) | host | ++---------------------+----------------------------------+-------+ +| 1970-01-01T00:00:00 | 1.0 | host1 | +| 1970-01-01T00:00:05 | 2.0 | host1 | +| 1970-01-01T00:00:10 | 4.0 | host1 | +| 1970-01-01T00:00:15 | 6.0 | host1 | ++---------------------+----------------------------------+-------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host_sec{host="host1"} + host_micro{host="host1"}; + ++-------+---------------------+-------------------------------+ +| host | ts | host_sec.val + host_micro.val | ++-------+---------------------+-------------------------------+ +| host1 | 1970-01-01T00:00:00 | 2.0 | +| host1 | 1970-01-01T00:00:05 | 6.0 | +| host1 | 1970-01-01T00:00:10 | 10.0 | +| host1 | 1970-01-01T00:00:15 | 14.0 | ++-------+---------------------+-------------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') avg_over_time(host_sec{host="host1"}[5s]) + avg_over_time(host_micro{host="host1"}[5s]); + ++-------+---------------------+-----------------------------------------------------------------------------------------+ +| host | ts | host_sec.prom_avg_over_time(ts_range,val) + host_micro.prom_avg_over_time(ts_range,val) | ++-------+---------------------+-----------------------------------------------------------------------------------------+ +| host1 | 1970-01-01T00:00:00 | 2.0 | +| host1 | 1970-01-01T00:00:05 | 4.0 | +| host1 | 1970-01-01T00:00:10 | 8.0 | +| host1 | 1970-01-01T00:00:15 | 12.0 | ++-------+---------------------+-----------------------------------------------------------------------------------------+ + +DROP TABLE host_sec; + +Affected Rows: 0 + +DROP TABLE host_micro; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/precisions.sql b/tests/cases/standalone/common/promql/precisions.sql new file mode 100644 index 000000000000..6966e5bca205 --- /dev/null +++ b/tests/cases/standalone/common/promql/precisions.sql @@ -0,0 +1,55 @@ +CREATE TABLE host_sec ( + ts timestamp(0) time index, + host STRING PRIMARY KEY, + val DOUBLE, +); + +INSERT INTO TABLE host_sec VALUES + (0, 'host1', 1), + (0, 'host2', 2), + (5, 'host1', 3), + (5, 'host2', 4), + (10, 'host1', 5), + (10, 'host2', 6), + (15, 'host1', 7), + (15, 'host2', 8); + +CREATE TABLE host_micro ( + ts timestamp(6) time index, + host STRING PRIMARY KEY, + val DOUBLE, +); + +INSERT INTO TABLE host_micro VALUES + (0, 'host1', 1), + (0, 'host2', 2), + (5000000, 'host1', 3), + (5000000, 'host2', 4), + (10000000, 'host1', 5), + (10000000, 'host2', 6), + (15000000, 'host1', 7), + (15000000, 'host2', 8); + +-- Test on Timestamps of different precisions + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host_sec{host="host1"}; + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') avg_over_time(host_sec{host="host1"}[5s]); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host_micro{host="host1"}; + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') avg_over_time(host_micro{host="host1"}[5s]); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host_sec{host="host1"} + host_micro{host="host1"}; + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') avg_over_time(host_sec{host="host1"}[5s]) + avg_over_time(host_micro{host="host1"}[5s]); + +DROP TABLE host_sec; + +DROP TABLE host_micro;