Skip to content

Commit

Permalink
feat: support EXPLAIN ANALYZE to profile queries (#849)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>

Inspired by duckdb's [EXPLAIN
ANALYZE](https://duckdb.org/docs/guides/meta/explain_analyze.html).

<details>
  <summary>Example of TPCH Q6</summary>

```sql
explain analyze select
    sum(l_extendedprice * l_discount) as revenue
from
    lineitem
where
    l_shipdate >= date '1994-01-01'
    and l_shipdate < date '1994-01-01' + interval '1' year
    and l_discount between 0.08 - 0.01 and 0.08 + 0.01
    and l_quantity < 24;

Projection
├── exprs:ref
│   └── sum
│       └── * { lhs: l_discount, rhs: l_extendedprice }
├── rows: 1
├── time: 91.042µs
└── Agg
    ├── aggs:sum
    │   └── * { lhs: l_discount, rhs: l_extendedprice }
    ├── rows: 1
    ├── time: 4.525204ms
    └── Filter
        ├── cond: and { lhs: >= { lhs: 0.09, rhs: l_discount }, rhs: >= { lhs: l_discount, rhs: 0.07 } }
        ├── rows: 113920
        ├── time: 14.407315ms
        └── Projection { exprs: [ l_extendedprice, l_discount ], rows: 417809, time: 2.277959ms }
            └── Filter
                ├── cond:and
                │   ├── lhs: > { lhs: 24, rhs: l_quantity }
                │   └── rhs:and
                │       ├── lhs: > { lhs: 1995-01-01, rhs: l_shipdate }
                │       └── rhs: >= { lhs: l_shipdate, rhs: 1994-01-01 }
                ├── rows: 417809
                ├── time: 69.414093ms
                └── Scan
                    ├── table: lineitem
                    ├── list: [ l_quantity, l_extendedprice, l_discount, l_shipdate ]
                    ├── filter: true
                    ├── rows: 6001215
                    └── time: 289.72746ms
```
</details>

---------

Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored Jun 17, 2024
1 parent 50b2556 commit 8b19a09
Show file tree
Hide file tree
Showing 10 changed files with 318 additions and 105 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ parking_lot = "0.12"
parse-display = "0.9"
paste = "1"
pgwire = "0.20"
pin-project = "1"
pretty-xmlish = "0.1"
prost = "0.12"
pyo3 = { version = "0.21", features = ["extension-module"], optional = true }
Expand Down
11 changes: 8 additions & 3 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,9 @@ impl Binder {
..
} => self.bind_copy(source, to, target, &options),
Statement::Query(query) => self.bind_query(*query).map(|(id, _)| id),
Statement::Explain { statement, .. } => self.bind_explain(*statement),
Statement::Explain {
statement, analyze, ..
} => self.bind_explain(*statement, analyze),
Statement::ShowVariable { .. }
| Statement::ShowCreate { .. }
| Statement::ShowColumns { .. } => Err(BindError::NotSupportedTSQL),
Expand Down Expand Up @@ -447,9 +449,12 @@ impl Binder {
self.catalog.clone()
}

fn bind_explain(&mut self, query: Statement) -> Result {
fn bind_explain(&mut self, query: Statement, analyze: bool) -> Result {
let id = self.bind_stmt(query)?;
let id = self.egraph.add(Node::Explain(id));
let id = self.egraph.add(match analyze {
false => Node::Explain(id),
true => Node::Analyze(id),
});
Ok(id)
}
}
Expand Down
93 changes: 93 additions & 0 deletions src/executor/analyze.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2024 RisingLight Project Authors. Licensed under Apache-2.0.

use std::sync::atomic::{AtomicU64, Ordering};

use pretty_xmlish::PrettyConfig;

use super::*;
use crate::array::{ArrayImpl, StringArray};
use crate::planner::Explain;

/// Run the query and return the query plan with profiling information.
pub struct AnalyzeExecutor {
pub plan: RecExpr,
pub catalog: RootCatalogRef,
pub metrics: Metrics,
}

impl AnalyzeExecutor {
#[try_stream(boxed, ok = DataChunk, error = ExecutorError)]
pub async fn execute(self, child: BoxedExecutor) {
// consume the child stream
#[for_await]
for chunk in child {
_ = chunk?;
}

// explain the plan
let get_metadata = |id| {
vec![
("rows", self.metrics.get_rows(id).to_string()),
("time", format!("{:?}", self.metrics.get_time(id))),
]
};
let explain_obj = Explain::of(&self.plan)
.with_catalog(&self.catalog)
.with_metadata(&get_metadata);
let explainer = explain_obj.pretty();
let mut explain = String::with_capacity(4096);
let mut config = PrettyConfig {
need_boundaries: false,
..PrettyConfig::default()
};
config.unicode(&mut explain, &explainer);
let chunk = DataChunk::from_iter([ArrayImpl::new_string(StringArray::from_iter([Some(
explain,
)]))]);

yield chunk;
}
}

/// A collection of profiling information for a query.
#[derive(Default)]
pub struct Metrics {
spans: HashMap<Id, TimeSpan>,
rows: HashMap<Id, Counter>,
}

impl Metrics {
/// Register metrics for a node.
pub fn register(&mut self, id: Id, span: TimeSpan, rows: Counter) {
self.spans.insert(id, span);
self.rows.insert(id, rows);
}

/// Get the running time for a node.
pub fn get_time(&self, id: Id) -> Duration {
self.spans.get(&id).map(|span| span.busy_time()).unwrap()
}

/// Get the number of rows produced by a node.
pub fn get_rows(&self, id: Id) -> u64 {
self.rows.get(&id).map(|rows| rows.get()).unwrap()
}
}

/// A counter.
#[derive(Default, Clone)]
pub struct Counter {
count: Arc<AtomicU64>,
}

impl Counter {
/// Increments the counter.
pub fn inc(&self, value: u64) {
self.count.fetch_add(value, Ordering::Relaxed);
}

/// Gets the current value of the counter.
pub fn get(&self) -> u64 {
self.count.load(Ordering::Relaxed)
}
}
11 changes: 8 additions & 3 deletions src/executor/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ impl ExplainExecutor {
pub fn execute(self) -> BoxedExecutor {
let costs = self.optimizer.costs(&self.plan);
let rows = self.optimizer.rows(&self.plan);
let get_metadata = |id| {
vec![
("cost", costs[usize::from(id)].to_string()),
("rows", rows[usize::from(id)].to_string()),
]
};
let explain_obj = Explain::of(&self.plan)
.with_costs(&costs)
.with_rows(&rows)
.with_catalog(self.optimizer.catalog());
.with_catalog(self.optimizer.catalog())
.with_metadata(&get_metadata);
let explainer = explain_obj.pretty();
let mut explain = String::with_capacity(4096);
let mut config = PrettyConfig {
Expand Down
83 changes: 57 additions & 26 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use egg::{Id, Language};
use futures::stream::{BoxStream, StreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use tracing::Instrument;

// use minitrace::prelude::*;
use self::analyze::*;
use self::copy_from_file::*;
use self::copy_to_file::*;
use self::create_function::*;
Expand Down Expand Up @@ -55,7 +58,9 @@ use crate::catalog::{RootCatalog, RootCatalogRef, TableRefId};
use crate::planner::{Expr, ExprAnalysis, Optimizer, RecExpr, TypeSchemaAnalysis};
use crate::storage::Storage;
use crate::types::{ColumnIndex, DataType};
use crate::utils::timed::{FutureExt as _, Span as TimeSpan};

mod analyze;
mod copy_from_file;
mod copy_to_file;
mod create_function;
Expand Down Expand Up @@ -108,6 +113,7 @@ struct Builder<S: Storage> {
/// For scans on views, we prebuild their executors and store them here.
/// Multiple scans on the same view will share the same executor.
views: HashMap<TableRefId, StreamSubscriber>,
metrics: Metrics,
}

impl<S: Storage> Builder<S> {
Expand Down Expand Up @@ -136,6 +142,7 @@ impl<S: Storage> Builder<S> {
egraph,
root,
views,
metrics: Metrics::default(),
}
}

Expand Down Expand Up @@ -189,22 +196,22 @@ impl<S: Storage> Builder<S> {
}

/// Builds the executor.
fn build(self) -> BoxedExecutor {
fn build(mut self) -> BoxedExecutor {
self.build_id(self.root)
}

/// Builds the executor and returns its subscriber.
fn build_subscriber(self) -> StreamSubscriber {
fn build_subscriber(mut self) -> StreamSubscriber {
self.build_id_subscriber(self.root)
}

/// Builds the executor for the given id.
fn build_id(&self, id: Id) -> BoxedExecutor {
fn build_id(&mut self, id: Id) -> BoxedExecutor {
self.build_id_subscriber(id).subscribe()
}

/// Builds the executor for the given id and returns its subscriber.
fn build_id_subscriber(&self, id: Id) -> StreamSubscriber {
fn build_id_subscriber(&mut self, id: Id) -> StreamSubscriber {
use Expr::*;
let stream = match self.node(id).clone() {
Scan([table, list, filter]) => {
Expand Down Expand Up @@ -439,14 +446,24 @@ impl<S: Storage> Builder<S> {
}
.execute(),

Analyze(child) => {
let stream = self.build_id(child);
AnalyzeExecutor {
plan: self.recexpr(child),
catalog: self.optimizer.catalog().clone(),
metrics: std::mem::take(&mut self.metrics),
}
.execute(stream)
}

Empty(_) => futures::stream::empty().boxed(),

node => panic!("not a plan: {node:?}"),
};
spawn(&self.node(id).to_string(), stream)
self.spawn(id, stream)
}

fn build_hashjoin<const T: JoinType>(&self, args: [Id; 6]) -> BoxedExecutor {
fn build_hashjoin<const T: JoinType>(&mut self, args: [Id; 6]) -> BoxedExecutor {
let [_, cond, lkeys, rkeys, left, right] = args;
assert_eq!(self.node(cond), &Expr::true_());
HashJoinExecutor::<T> {
Expand All @@ -458,7 +475,7 @@ impl<S: Storage> Builder<S> {
.execute(self.build_id(left), self.build_id(right))
}

fn build_hashsemijoin(&self, args: [Id; 6], anti: bool) -> BoxedExecutor {
fn build_hashsemijoin(&mut self, args: [Id; 6], anti: bool) -> BoxedExecutor {
let [_, cond, lkeys, rkeys, left, right] = args;
if self.node(cond) == &Expr::true_() {
HashSemiJoinExecutor {
Expand All @@ -481,7 +498,7 @@ impl<S: Storage> Builder<S> {
}
}

fn build_mergejoin<const T: JoinType>(&self, args: [Id; 6]) -> BoxedExecutor {
fn build_mergejoin<const T: JoinType>(&mut self, args: [Id; 6]) -> BoxedExecutor {
let [_, cond, lkeys, rkeys, left, right] = args;
assert_eq!(self.node(cond), &Expr::true_());
MergeJoinExecutor::<T> {
Expand All @@ -492,26 +509,40 @@ impl<S: Storage> Builder<S> {
}
.execute(self.build_id(left), self.build_id(right))
}
}

/// Spawn a new task to execute the given stream.
fn spawn(name: &str, mut stream: BoxedExecutor) -> StreamSubscriber {
let (tx, rx) = async_broadcast::broadcast(16);
let handle = tokio::task::Builder::default()
.name(name)
.spawn(async move {
while let Some(item) = stream.next().await {
if tx.broadcast(item).await.is_err() {
// all receivers are dropped, stop the task.
return;
/// Spawn a new task to execute the given stream.
fn spawn(&mut self, id: Id, mut stream: BoxedExecutor) -> StreamSubscriber {
let name = self.node(id).to_string();
let span = TimeSpan::default();
let output_row_counter = Counter::default();

self.metrics
.register(id, span.clone(), output_row_counter.clone());

let (tx, rx) = async_broadcast::broadcast(16);
let handle = tokio::task::Builder::default()
.name(&format!("{id}.{name}"))
.spawn(
async move {
while let Some(item) = stream.next().await {
if let Ok(chunk) = &item {
output_row_counter.inc(chunk.cardinality() as _);
}
if tx.broadcast(item).await.is_err() {
// all receivers are dropped, stop the task.
return;
}
}
}
}
})
.expect("failed to spawn task");

StreamSubscriber {
rx: rx.deactivate(),
handle: Arc::new(AbortOnDropHandle(handle)),
.instrument(tracing::info_span!("executor", id = usize::from(id), name))
.timed(span),
)
.expect("failed to spawn task");

StreamSubscriber {
rx: rx.deactivate(),
handle: Arc::new(AbortOnDropHandle(handle)),
}
}
}

Expand Down
Loading

0 comments on commit 8b19a09

Please sign in to comment.