From f0669dfbed27d2d30939f5747a1a576442b22d4b Mon Sep 17 00:00:00 2001 From: Johnil Quezada Date: Sat, 15 Apr 2023 18:16:00 -0400 Subject: [PATCH 1/2] add option to keep sequences --- src/main.rs | 100 ++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 89 insertions(+), 11 deletions(-) diff --git a/src/main.rs b/src/main.rs index b1b4ab7..ded2c77 100644 --- a/src/main.rs +++ b/src/main.rs @@ -65,6 +65,13 @@ struct Args { /// and pg_parcel.toml. #[clap(long = "no-feature", value_delimiter = ',', display_order = 6)] skipped_features: Option>, + + /// Retain the current value of sequence number generators. + /// This will ensure that any tables that use sequences in any + /// of their columns will continue generating values starting from + /// the last known value + #[clap(long, default_value_t = false, display_order = 7)] + keep_sequences: bool, } /// Options here is a combination of command line arguments and contents of the slicefile. @@ -79,6 +86,7 @@ struct Options { estimate_only: bool, truncate: bool, features: HashSet, + keep_sequences: bool, } impl Options { @@ -121,6 +129,7 @@ impl Options { estimate_only: args.estimate_only, truncate: args.truncate, features, + keep_sequences: args.keep_sequences, }; Ok(options) } @@ -265,19 +274,38 @@ fn main() -> Result<(), Box> { )?; } + let mut stdout = std::io::stdout(); + // Dump table data. for table in tables.iter() { - let query = table.copy_out_query(&options); - // let query = format!("{query} LIMIT 10"); // TESTING ONLY - let copy_statement = format!("COPY ({}) TO stdout;", query); - pb.set_message(table.name.to_owned()); + { + let query = table.copy_out_query(&options); + // let query = format!("{query} LIMIT 10"); // TESTING ONLY + let copy_statement = format!("COPY ({}) TO stdout;", query); + pb.set_message(table.name.to_owned()); + + writeln!(stdout, "{};", table.copy_in_query())?; + let mut reader = client.copy_out(©_statement)?; + let size = std::io::copy(&mut reader, &mut stdout)?; + sizes.push((table.name.clone(), size)); + writeln!(stdout, "\\.")?; + } - let mut stdout = std::io::stdout(); - writeln!(stdout, "{};", table.copy_in_query())?; - let mut reader = client.copy_out(©_statement)?; - let size = std::io::copy(&mut reader, &mut stdout)?; - sizes.push((table.name.clone(), size)); - writeln!(stdout, "\\.")?; + { + if options.keep_sequences { + for sequence in table.sequences.iter() { + let current_sequence_value = client + .query_one(&table.current_sequence_value_query(sequence), &[])? + .get::(0) + .parse::()?; + writeln!( + stdout, + "{};", + table.restore_sequences_query(sequence, current_sequence_value) + )?; + } + } + } pb.inc(1); } @@ -314,6 +342,7 @@ struct Table { schema: String, size: u64, // Bytes. rows: u64, // Estimate. + sequences: Vec, } impl Table { @@ -324,6 +353,23 @@ impl Table { self.name.sql_identifier() ) } + fn current_sequence_value_query(&self, sequence: &Sequence) -> String { + format!( + r#"SELECT COALESCE(MAX("{column_name}"), 1)::text FROM {schema}."{table_name}""#, + schema = &self.schema, + column_name = sequence.column_name, + table_name = &self.name, + ) + } + fn restore_sequences_query(&self, sequence: &Sequence, current_sequence_value: u64) -> String { + format!( + r#"SELECT SETVAL('{schema}."{sequence_name}"', {current_sequence_value}) FROM {schema}."{table_name}""#, + schema = &self.schema, + sequence_name = sequence.sequence_name, + current_sequence_value = current_sequence_value, + table_name = &self.name, + ) + } fn copy_out_query(&self, options: &Options) -> String { let column_values = options.column_values.iter().map(|s| s.sql_value()); let column_values = intersperse(column_values, ",".to_string()).collect::(); @@ -378,6 +424,12 @@ struct Column { pub is_nullable: bool, } +#[derive(Debug, Clone)] +struct Sequence { + pub column_name: String, + pub sequence_name: String, +} + fn get_tables(options: &Options) -> Result, Box> { let mut client = pg_client(options)?; let query = format!( @@ -387,7 +439,8 @@ fn get_tables(options: &Options) -> Result, Box> { pg_total_relation_size(pg_class.oid)::text as table_size, max(pg_class.reltuples::int8)::text as table_rows, -- https://wiki.postgresql.org/wiki/Count_estimate array_agg(columns.column_name::text order by columns.ordinal_position) as column_names, - array_agg(columns.is_nullable = 'YES' order by columns.ordinal_position) as column_nullables + array_agg(columns.is_nullable = 'YES' order by columns.ordinal_position) as column_nullables, + array_agg(pg_attribute.attname::text || '::' || seqs.relname::text) FILTER (WHERE seqs.relname IS NOT NULL) AS sequences from information_schema.tables join information_schema.columns on ( columns.table_catalog = tables.table_catalog @@ -400,6 +453,14 @@ fn get_tables(options: &Options) -> Result, Box> { join pg_class on ( pg_class.relnamespace = pg_namespace.oid and pg_class.relname = tables.table_name) + join pg_attribute ON pg_attribute.attname = columns. "column_name" + and pg_class.oid = pg_attribute.attrelid + left join ( + select sequences.*, pg_depend.refobjid tbl_id, pg_depend.refobjsubid col_id + from pg_class sequences + join pg_depend ON pg_depend.objid = sequences.oid + where sequences.relkind = 'S' + ) seqs ON seqs.tbl_id = pg_class.oid AND seqs.col_id = pg_attribute.attnum where tables.table_schema = {schema} and tables.table_type = 'BASE TABLE' group by tables.table_name, pg_class.oid @@ -426,12 +487,29 @@ fn get_tables(options: &Options) -> Result, Box> { .zip(column_nullables) .map(|(name, is_nullable)| Column { name, is_nullable }) .collect(); + let sequences: Vec = row + .get::<&str, Option>>("sequences") + .unwrap_or(Vec::with_capacity(0)) + .iter() + .map(|seq| -> Sequence { + let mut parts = seq.split("::"); + let column_name = parts.next().unwrap().to_string(); + let sequence_name = parts.next().unwrap().to_string(); + + Sequence { + column_name, + sequence_name, + } + }) + .collect(); + Some(Table { name: table_name, columns, schema: options.schema.clone(), size: table_size, rows: table_rows, + sequences, }) } }) From 9748bc3609095d01e896ec139a5e8fc9c078a936 Mon Sep 17 00:00:00 2001 From: Johnil Quezada Date: Sun, 23 Apr 2023 16:56:12 -0400 Subject: [PATCH 2/2] use sql_string trait methods + unwrap_or_default --- src/main.rs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/main.rs b/src/main.rs index ded2c77..2d680e8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -355,19 +355,23 @@ impl Table { } fn current_sequence_value_query(&self, sequence: &Sequence) -> String { format!( - r#"SELECT COALESCE(MAX("{column_name}"), 1)::text FROM {schema}."{table_name}""#, - schema = &self.schema, - column_name = sequence.column_name, - table_name = &self.name, + r#"SELECT COALESCE(MAX({column_name}), 1)::text FROM {qualified_table_name}"#, + column_name = sequence.column_name.sql_identifier(), + qualified_table_name = &self.sql_identifier(), ) } fn restore_sequences_query(&self, sequence: &Sequence, current_sequence_value: u64) -> String { + let qualified_sequence_name = format!( + "{}.{}", + self.schema.sql_identifier(), + sequence.sequence_name.sql_identifier() + ); + format!( - r#"SELECT SETVAL('{schema}."{sequence_name}"', {current_sequence_value}) FROM {schema}."{table_name}""#, - schema = &self.schema, - sequence_name = sequence.sequence_name, + r#"SELECT SETVAL({qualified_sequence_name}, {current_sequence_value}) FROM {qualified_table_name}"#, + qualified_sequence_name = qualified_sequence_name.sql_value(), current_sequence_value = current_sequence_value, - table_name = &self.name, + qualified_table_name = &self.sql_identifier(), ) } fn copy_out_query(&self, options: &Options) -> String { @@ -489,7 +493,7 @@ fn get_tables(options: &Options) -> Result, Box> { .collect(); let sequences: Vec = row .get::<&str, Option>>("sequences") - .unwrap_or(Vec::with_capacity(0)) + .unwrap_or_default() .iter() .map(|seq| -> Sequence { let mut parts = seq.split("::");