Skip to content

Commit

Permalink
feat: support any precision in PromQL (#3933)
Browse files Browse the repository at this point in the history
* feat: support any precision in PromQL

* chore: add test
  • Loading branch information
Taylor-lagrange authored May 16, 2024
1 parent c915916 commit 9f4a6c6
Show file tree
Hide file tree
Showing 7 changed files with 355 additions and 14 deletions.
8 changes: 6 additions & 2 deletions src/promql/src/extension_plan/instant_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,16 @@ impl InstantManipulateStream {
// and the function `vectorSelectorSingle`
pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
let mut take_indices = vec![];
// TODO(ruihang): maybe the input is not timestamp millisecond array

let ts_column = input
.column(self.time_index)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
.ok_or_else(|| {
DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
)
})?;

// field column for staleness check
let field_column = self
Expand Down
7 changes: 5 additions & 2 deletions src/promql/src/extension_plan/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,15 @@ pub struct SeriesNormalizeStream {

impl SeriesNormalizeStream {
pub fn normalize(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
// TODO(ruihang): maybe the input is not timestamp millisecond array
let ts_column = input
.column(self.time_index)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
.ok_or_else(|| {
DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
)
})?;

// bias the timestamp column by offset
let ts_column_biased = if self.offset == 0 {
Expand Down
15 changes: 11 additions & 4 deletions src/promql/src/extension_plan/range_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ impl RangeManipulateStream {
pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
// calculate the range
let (aligned_ts, ranges) = self.calculate_range(&input);
let (aligned_ts, ranges) = self.calculate_range(&input)?;
// ignore this if all ranges are empty
if ranges.iter().all(|(_, len)| *len == 0) {
return Ok(None);
Expand Down Expand Up @@ -472,12 +472,19 @@ impl RangeManipulateStream {
.map_err(|e| DataFusionError::ArrowError(e, None))
}

fn calculate_range(&self, input: &RecordBatch) -> (ArrayRef, Vec<(u32, u32)>) {
fn calculate_range(
&self,
input: &RecordBatch,
) -> DataFusionResult<(ArrayRef, Vec<(u32, u32)>)> {
let ts_column = input
.column(self.time_index)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
.ok_or_else(|| {
DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
)
})?;

let mut aligned_ts = vec![];
let mut ranges = vec![];
Expand Down Expand Up @@ -506,7 +513,7 @@ impl RangeManipulateStream {

let aligned_ts_array = Arc::new(TimestampMillisecondArray::from(aligned_ts)) as _;

(aligned_ts_array, ranges)
Ok((aligned_ts_array, ranges))
}
}

Expand Down
155 changes: 152 additions & 3 deletions src/promql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
use datafusion::scalar::ScalarValue;
use datafusion::sql::TableReference;
use datafusion_expr::utils::conjunction;
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
use promql_parser::parser::{
Expand Down Expand Up @@ -910,9 +911,62 @@ impl PromPlanner {
.resolve_table(table_ref.clone())
.await
.context(CatalogSnafu)?;
// Safety: `scan_filters` is not empty.
let result = LogicalPlanBuilder::scan(table_ref, provider, None)

let is_time_index_ms = provider
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
.table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
.context(UnknownTableSnafu)?
.table()
.schema()
.timestamp_column()
.with_context(|| TimeIndexNotFoundSnafu {
table: table_ref.to_quoted_string(),
})?
.data_type
== ConcreteDataType::timestamp_millisecond_datatype();

let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;

if !is_time_index_ms {
// cast to ms if time_index not in Millisecond precision
let expr: Vec<_> = self
.ctx
.field_columns
.iter()
.map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
.chain(self.create_tag_column_exprs()?)
.chain(Some(DfExpr::Alias(Alias {
expr: Box::new(DfExpr::Cast(Cast {
expr: Box::new(self.create_time_index_column_expr()?),
data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
})),
relation: Some(table_ref.clone()),
name: self
.ctx
.time_index_column
.as_ref()
.with_context(|| TimeIndexNotFoundSnafu {
table: table_ref.to_quoted_string(),
})?
.clone(),
})))
.collect::<Vec<_>>();
scan_plan = LogicalPlanBuilder::from(scan_plan)
.project(expr)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
}

// Safety: `scan_filters` is not empty.
let result = LogicalPlanBuilder::from(scan_plan)
.filter(conjunction(filter).unwrap())
.context(DataFusionPlanningSnafu)?
.build()
Expand Down Expand Up @@ -2972,4 +3026,99 @@ mod test {
assert!(plan.is_err(), "query: {:?}", query);
}
}

#[tokio::test]
async fn test_non_ms_precision() {
let catalog_list = MemoryCatalogManager::with_default_setup();
let columns = vec![
ColumnSchema::new(
"tag".to_string(),
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
"timestamp".to_string(),
ConcreteDataType::timestamp_nanosecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new(
"field".to_string(),
ConcreteDataType::float64_datatype(),
true,
),
];
let schema = Arc::new(Schema::new(columns));
let table_meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.value_indices(vec![2])
.next_column_id(1024)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name("metrics".to_string())
.meta(table_meta)
.build()
.unwrap();
let table = EmptyTable::from_table_info(&table_info);
assert!(catalog_list
.register_table_sync(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "metrics".to_string(),
table_id: 1024,
table,
})
.is_ok());

let plan = PromPlanner::stmt_to_plan(
DfTableSourceProvider::new(catalog_list.clone(), false, QueryContext::arc().as_ref()),
EvalStmt {
expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
},
)
.await
.unwrap();
assert_eq!(plan.display_indent_schema().to_string(),
"PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Sort: metrics.tag DESC NULLS LAST, metrics.timestamp DESC NULLS LAST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
);
let plan = PromPlanner::stmt_to_plan(
DfTableSourceProvider::new(catalog_list.clone(), false, QueryContext::arc().as_ref()),
EvalStmt {
expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
},
)
.await
.unwrap();
assert_eq!(plan.display_indent_schema().to_string(),
"Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
\n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
\n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Sort: metrics.tag DESC NULLS LAST, metrics.timestamp DESC NULLS LAST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
);
}
}
8 changes: 5 additions & 3 deletions src/query/src/range_select/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,9 +1116,11 @@ impl RangeSelectStream {
let ts_column_ref = ts_column
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.ok_or(DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
))?;
.ok_or_else(|| {
DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
)
})?;
for i in 0..self.range_exec.len() {
let args = self.evaluate_many(&batch, &self.range_exec[i].args)?;
// use self.modify_map record (hash, align_ts) => [row_nums]
Expand Down
121 changes: 121 additions & 0 deletions tests/cases/standalone/common/promql/precisions.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
CREATE TABLE host_sec (
ts timestamp(0) time index,
host STRING PRIMARY KEY,
val DOUBLE,
);

Affected Rows: 0

INSERT INTO TABLE host_sec VALUES
(0, 'host1', 1),
(0, 'host2', 2),
(5, 'host1', 3),
(5, 'host2', 4),
(10, 'host1', 5),
(10, 'host2', 6),
(15, 'host1', 7),
(15, 'host2', 8);

Affected Rows: 8

CREATE TABLE host_micro (
ts timestamp(6) time index,
host STRING PRIMARY KEY,
val DOUBLE,
);

Affected Rows: 0

INSERT INTO TABLE host_micro VALUES
(0, 'host1', 1),
(0, 'host2', 2),
(5000000, 'host1', 3),
(5000000, 'host2', 4),
(10000000, 'host1', 5),
(10000000, 'host2', 6),
(15000000, 'host1', 7),
(15000000, 'host2', 8);

Affected Rows: 8

-- Test on Timestamps of different precisions
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host_sec{host="host1"};

+-----+-------+---------------------+
| val | host | ts |
+-----+-------+---------------------+
| 1.0 | host1 | 1970-01-01T00:00:00 |
| 3.0 | host1 | 1970-01-01T00:00:05 |
| 5.0 | host1 | 1970-01-01T00:00:10 |
| 7.0 | host1 | 1970-01-01T00:00:15 |
+-----+-------+---------------------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') avg_over_time(host_sec{host="host1"}[5s]);

+---------------------+----------------------------------+-------+
| ts | prom_avg_over_time(ts_range,val) | host |
+---------------------+----------------------------------+-------+
| 1970-01-01T00:00:00 | 1.0 | host1 |
| 1970-01-01T00:00:05 | 2.0 | host1 |
| 1970-01-01T00:00:10 | 4.0 | host1 |
| 1970-01-01T00:00:15 | 6.0 | host1 |
+---------------------+----------------------------------+-------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host_micro{host="host1"};

+-----+-------+---------------------+
| val | host | ts |
+-----+-------+---------------------+
| 1.0 | host1 | 1970-01-01T00:00:00 |
| 3.0 | host1 | 1970-01-01T00:00:05 |
| 5.0 | host1 | 1970-01-01T00:00:10 |
| 7.0 | host1 | 1970-01-01T00:00:15 |
+-----+-------+---------------------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') avg_over_time(host_micro{host="host1"}[5s]);

+---------------------+----------------------------------+-------+
| ts | prom_avg_over_time(ts_range,val) | host |
+---------------------+----------------------------------+-------+
| 1970-01-01T00:00:00 | 1.0 | host1 |
| 1970-01-01T00:00:05 | 2.0 | host1 |
| 1970-01-01T00:00:10 | 4.0 | host1 |
| 1970-01-01T00:00:15 | 6.0 | host1 |
+---------------------+----------------------------------+-------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host_sec{host="host1"} + host_micro{host="host1"};

+-------+---------------------+-------------------------------+
| host | ts | host_sec.val + host_micro.val |
+-------+---------------------+-------------------------------+
| host1 | 1970-01-01T00:00:00 | 2.0 |
| host1 | 1970-01-01T00:00:05 | 6.0 |
| host1 | 1970-01-01T00:00:10 | 10.0 |
| host1 | 1970-01-01T00:00:15 | 14.0 |
+-------+---------------------+-------------------------------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') avg_over_time(host_sec{host="host1"}[5s]) + avg_over_time(host_micro{host="host1"}[5s]);

+-------+---------------------+-----------------------------------------------------------------------------------------+
| host | ts | host_sec.prom_avg_over_time(ts_range,val) + host_micro.prom_avg_over_time(ts_range,val) |
+-------+---------------------+-----------------------------------------------------------------------------------------+
| host1 | 1970-01-01T00:00:00 | 2.0 |
| host1 | 1970-01-01T00:00:05 | 4.0 |
| host1 | 1970-01-01T00:00:10 | 8.0 |
| host1 | 1970-01-01T00:00:15 | 12.0 |
+-------+---------------------+-----------------------------------------------------------------------------------------+

DROP TABLE host_sec;

Affected Rows: 0

DROP TABLE host_micro;

Affected Rows: 0

Loading

0 comments on commit 9f4a6c6

Please sign in to comment.