diff --git a/src/main.rs b/src/main.rs index f3fb4c0..27f0114 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,11 +5,11 @@ use futures03::future::join_all; use futures03::StreamExt; use hyper_rustls::HttpsConnectorBuilder; use loader::Cursor; -use tracing::{error, info}; use logging::LogConfig; use pb::sf::substreams::v1::Package; +use tracing::{error, info}; use tracing_core::LevelFilter; -use tracing_subscriber::{prelude::*, Registry, EnvFilter}; +use tracing_subscriber::{prelude::*, EnvFilter, Registry}; use url::Url; use prost::Message; @@ -78,10 +78,10 @@ pub enum ElricError { PackageFileError(#[from] std::io::Error), #[error("Could not decode package")] PackageDecodeError(#[from] prost::DecodeError), - #[error("Could not load cursor")] - CursorError, - #[error("Could not load schema")] - LoadSchemaError, + #[error("Could not load cursor: {0}")] + CursorError(anyhow::Error), + #[error("Could not load schema: {0}")] + LoadSchemaError(clickhouse::error::Error), #[error("Could not insert cursor")] InsertCursorError, #[error("Could not insert row")] @@ -129,7 +129,9 @@ async fn main() -> Result<(), Error> { Some(token) => token, None => token.ok_or(ElricError::TokenNotFound)?, }; - let cursor = load_persisted_cursor(&client, &id).await.map_err(|_| ElricError::CursorError)?; + let cursor = load_persisted_cursor(&client, &id) + .await + .map_err(|e| ElricError::CursorError(e))?; let stream = create_stream( cursor, package_file, @@ -138,13 +140,8 @@ async fn main() -> Result<(), Error> { token, start_block, end_block, - )?; - run( - id, - stream, - client, - ) - .await?; + )?; + run(id, stream, client).await?; } } Ok(()) @@ -177,7 +174,6 @@ async fn run( mut stream: SubstreamsStream, client: clickhouse::Client, ) -> Result<(), ElricError> { - let table_info = get_table_information(&client).await?; let dynamic_tables = table_info diff --git a/src/table_info.rs b/src/table_info.rs index 5207632..24df833 100644 --- a/src/table_info.rs +++ b/src/table_info.rs @@ -245,17 +245,15 @@ pub async fn get_table_information(client: &Client) -> Result, El let query = client.query( format!( " - SELECT - table_schema, - table_name - FROM - information_schema.tables - WHERE - table_type = 1 AND - table_schema = '{}' - ORDER BY - table_schema, - table_name + SELECT database AS table_schema, + name AS table_name + FROM system.tables + WHERE NOT is_temporary + AND engine NOT LIKE '%View' + AND engine NOT LIKE 'System%' + AND has_own_data != 0 + AND database = '{}' + ORDER BY database, name ", client.database().unwrap_or("default") ) @@ -264,6 +262,6 @@ pub async fn get_table_information(client: &Client) -> Result, El let result = query .fetch_all() .await - .map_err(|_| ElricError::LoadSchemaError)?; + .map_err(|e| ElricError::LoadSchemaError(e))?; Ok(result) }