Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support any precision in PromQL #3933

Merged
merged 2 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/promql/src/extension_plan/instant_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,16 @@ impl InstantManipulateStream {
// and the function `vectorSelectorSingle`
pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
let mut take_indices = vec![];
// TODO(ruihang): maybe the input is not timestamp millisecond array

let ts_column = input
.column(self.time_index)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
.ok_or_else(|| {
DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
)
})?;

// field column for staleness check
let field_column = self
Expand Down
7 changes: 5 additions & 2 deletions src/promql/src/extension_plan/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,15 @@ pub struct SeriesNormalizeStream {

impl SeriesNormalizeStream {
pub fn normalize(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
// TODO(ruihang): maybe the input is not timestamp millisecond array
let ts_column = input
.column(self.time_index)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
.ok_or_else(|| {
DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
)
})?;

// bias the timestamp column by offset
let ts_column_biased = if self.offset == 0 {
Expand Down
15 changes: 11 additions & 4 deletions src/promql/src/extension_plan/range_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ impl RangeManipulateStream {
pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
// calculate the range
let (aligned_ts, ranges) = self.calculate_range(&input);
let (aligned_ts, ranges) = self.calculate_range(&input)?;
// ignore this if all ranges are empty
if ranges.iter().all(|(_, len)| *len == 0) {
return Ok(None);
Expand Down Expand Up @@ -472,12 +472,19 @@ impl RangeManipulateStream {
.map_err(|e| DataFusionError::ArrowError(e, None))
}

fn calculate_range(&self, input: &RecordBatch) -> (ArrayRef, Vec<(u32, u32)>) {
fn calculate_range(
&self,
input: &RecordBatch,
) -> DataFusionResult<(ArrayRef, Vec<(u32, u32)>)> {
let ts_column = input
.column(self.time_index)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
.ok_or_else(|| {
DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
)
})?;

let mut aligned_ts = vec![];
let mut ranges = vec![];
Expand Down Expand Up @@ -506,7 +513,7 @@ impl RangeManipulateStream {

let aligned_ts_array = Arc::new(TimestampMillisecondArray::from(aligned_ts)) as _;

(aligned_ts_array, ranges)
Ok((aligned_ts_array, ranges))
}
}

Expand Down
155 changes: 152 additions & 3 deletions src/promql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
use datafusion::scalar::ScalarValue;
use datafusion::sql::TableReference;
use datafusion_expr::utils::conjunction;
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
use promql_parser::parser::{
Expand Down Expand Up @@ -910,9 +911,62 @@ impl PromPlanner {
.resolve_table(table_ref.clone())
.await
.context(CatalogSnafu)?;
// Safety: `scan_filters` is not empty.
let result = LogicalPlanBuilder::scan(table_ref, provider, None)

let is_time_index_ms = provider
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
.table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
.context(UnknownTableSnafu)?
.table()
.schema()
.timestamp_column()
.with_context(|| TimeIndexNotFoundSnafu {
table: table_ref.to_quoted_string(),
})?
.data_type
== ConcreteDataType::timestamp_millisecond_datatype();

let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;

if !is_time_index_ms {
// cast to ms if time_index not in Millisecond precision
let expr: Vec<_> = self
.ctx
.field_columns
.iter()
.map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
.chain(self.create_tag_column_exprs()?)
.chain(Some(DfExpr::Alias(Alias {
expr: Box::new(DfExpr::Cast(Cast {
expr: Box::new(self.create_time_index_column_expr()?),
data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
})),
relation: Some(table_ref.clone()),
name: self
.ctx
.time_index_column
.as_ref()
.with_context(|| TimeIndexNotFoundSnafu {
table: table_ref.to_quoted_string(),
})?
.clone(),
})))
.collect::<Vec<_>>();
scan_plan = LogicalPlanBuilder::from(scan_plan)
.project(expr)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
}

// Safety: `scan_filters` is not empty.
let result = LogicalPlanBuilder::from(scan_plan)
Taylor-lagrange marked this conversation as resolved.
Show resolved Hide resolved
.filter(conjunction(filter).unwrap())
.context(DataFusionPlanningSnafu)?
.build()
Expand Down Expand Up @@ -2972,4 +3026,99 @@ mod test {
assert!(plan.is_err(), "query: {:?}", query);
}
}

#[tokio::test]
async fn test_non_ms_precision() {
let catalog_list = MemoryCatalogManager::with_default_setup();
let columns = vec![
ColumnSchema::new(
"tag".to_string(),
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
"timestamp".to_string(),
ConcreteDataType::timestamp_nanosecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new(
"field".to_string(),
ConcreteDataType::float64_datatype(),
true,
),
];
let schema = Arc::new(Schema::new(columns));
let table_meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.value_indices(vec![2])
.next_column_id(1024)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name("metrics".to_string())
.meta(table_meta)
.build()
.unwrap();
let table = EmptyTable::from_table_info(&table_info);
assert!(catalog_list
.register_table_sync(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "metrics".to_string(),
table_id: 1024,
table,
})
.is_ok());

let plan = PromPlanner::stmt_to_plan(
DfTableSourceProvider::new(catalog_list.clone(), false, QueryContext::arc().as_ref()),
EvalStmt {
expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
},
)
.await
.unwrap();
assert_eq!(plan.display_indent_schema().to_string(),
"PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Sort: metrics.tag DESC NULLS LAST, metrics.timestamp DESC NULLS LAST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
);
let plan = PromPlanner::stmt_to_plan(
DfTableSourceProvider::new(catalog_list.clone(), false, QueryContext::arc().as_ref()),
EvalStmt {
expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
},
)
.await
.unwrap();
assert_eq!(plan.display_indent_schema().to_string(),
"Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
\n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
\n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Sort: metrics.tag DESC NULLS LAST, metrics.timestamp DESC NULLS LAST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
);
}
}
8 changes: 5 additions & 3 deletions src/query/src/range_select/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,9 +1116,11 @@ impl RangeSelectStream {
let ts_column_ref = ts_column
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.ok_or(DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
))?;
.ok_or_else(|| {
DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
)
})?;
for i in 0..self.range_exec.len() {
let args = self.evaluate_many(&batch, &self.range_exec[i].args)?;
// use self.modify_map record (hash, align_ts) => [row_nums]
Expand Down
121 changes: 121 additions & 0 deletions tests/cases/standalone/common/promql/precisions.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
CREATE TABLE host_sec (
ts timestamp(0) time index,
host STRING PRIMARY KEY,
val DOUBLE,
);

Affected Rows: 0

INSERT INTO TABLE host_sec VALUES
(0, 'host1', 1),
(0, 'host2', 2),
(5, 'host1', 3),
(5, 'host2', 4),
(10, 'host1', 5),
(10, 'host2', 6),
(15, 'host1', 7),
(15, 'host2', 8);

Affected Rows: 8

CREATE TABLE host_micro (
ts timestamp(6) time index,
host STRING PRIMARY KEY,
val DOUBLE,
);

Affected Rows: 0

INSERT INTO TABLE host_micro VALUES
(0, 'host1', 1),
(0, 'host2', 2),
(5000000, 'host1', 3),
(5000000, 'host2', 4),
(10000000, 'host1', 5),
(10000000, 'host2', 6),
(15000000, 'host1', 7),
(15000000, 'host2', 8);

Affected Rows: 8

-- Test on Timestamps of different precisions
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host_sec{host="host1"};

+-----+-------+---------------------+
| val | host | ts |
+-----+-------+---------------------+
| 1.0 | host1 | 1970-01-01T00:00:00 |
| 3.0 | host1 | 1970-01-01T00:00:05 |
| 5.0 | host1 | 1970-01-01T00:00:10 |
| 7.0 | host1 | 1970-01-01T00:00:15 |
+-----+-------+---------------------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') avg_over_time(host_sec{host="host1"}[5s]);

+---------------------+----------------------------------+-------+
| ts | prom_avg_over_time(ts_range,val) | host |
+---------------------+----------------------------------+-------+
| 1970-01-01T00:00:00 | 1.0 | host1 |
| 1970-01-01T00:00:05 | 2.0 | host1 |
| 1970-01-01T00:00:10 | 4.0 | host1 |
| 1970-01-01T00:00:15 | 6.0 | host1 |
+---------------------+----------------------------------+-------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host_micro{host="host1"};

+-----+-------+---------------------+
| val | host | ts |
+-----+-------+---------------------+
| 1.0 | host1 | 1970-01-01T00:00:00 |
| 3.0 | host1 | 1970-01-01T00:00:05 |
| 5.0 | host1 | 1970-01-01T00:00:10 |
| 7.0 | host1 | 1970-01-01T00:00:15 |
+-----+-------+---------------------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') avg_over_time(host_micro{host="host1"}[5s]);

+---------------------+----------------------------------+-------+
| ts | prom_avg_over_time(ts_range,val) | host |
+---------------------+----------------------------------+-------+
| 1970-01-01T00:00:00 | 1.0 | host1 |
| 1970-01-01T00:00:05 | 2.0 | host1 |
| 1970-01-01T00:00:10 | 4.0 | host1 |
| 1970-01-01T00:00:15 | 6.0 | host1 |
+---------------------+----------------------------------+-------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host_sec{host="host1"} + host_micro{host="host1"};

+-------+---------------------+-------------------------------+
| host | ts | host_sec.val + host_micro.val |
+-------+---------------------+-------------------------------+
| host1 | 1970-01-01T00:00:00 | 2.0 |
| host1 | 1970-01-01T00:00:05 | 6.0 |
| host1 | 1970-01-01T00:00:10 | 10.0 |
| host1 | 1970-01-01T00:00:15 | 14.0 |
+-------+---------------------+-------------------------------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') avg_over_time(host_sec{host="host1"}[5s]) + avg_over_time(host_micro{host="host1"}[5s]);

+-------+---------------------+-----------------------------------------------------------------------------------------+
| host | ts | host_sec.prom_avg_over_time(ts_range,val) + host_micro.prom_avg_over_time(ts_range,val) |
+-------+---------------------+-----------------------------------------------------------------------------------------+
| host1 | 1970-01-01T00:00:00 | 2.0 |
| host1 | 1970-01-01T00:00:05 | 4.0 |
| host1 | 1970-01-01T00:00:10 | 8.0 |
| host1 | 1970-01-01T00:00:15 | 12.0 |
+-------+---------------------+-----------------------------------------------------------------------------------------+

DROP TABLE host_sec;

Affected Rows: 0

DROP TABLE host_micro;

Affected Rows: 0

Loading