From 7fa87f13546a81f2d9187688e4d66c26c4438c0e Mon Sep 17 00:00:00 2001 From: Liam Stoddard Date: Thu, 7 Mar 2024 09:24:51 -0800 Subject: [PATCH 1/3] add srw to staff journals --- bin/migrate-oats-data/srw/__init__.py | 1 + .../srw/post_launch/srw_migration.py | 3 + .../srw/sql/srw_staff_journal.sql | 3 + .../srw/sql/srw_staff_journal_count.sql | 3 + .../srw/srw_staff_journal.py | 155 ++++++++++++++++++ 5 files changed, 165 insertions(+) create mode 100644 bin/migrate-oats-data/srw/sql/srw_staff_journal.sql create mode 100644 bin/migrate-oats-data/srw/sql/srw_staff_journal_count.sql create mode 100644 bin/migrate-oats-data/srw/srw_staff_journal.py diff --git a/bin/migrate-oats-data/srw/__init__.py b/bin/migrate-oats-data/srw/__init__.py index e2ce0f6054..13737d032e 100644 --- a/bin/migrate-oats-data/srw/__init__.py +++ b/bin/migrate-oats-data/srw/__init__.py @@ -1,2 +1,3 @@ from .srw_base import init_srw_base, clean_initial_srw from .srw_base_update import update_srw_base_fields +from .srw_staff_journal import process_srw_staff_journal, clean_srw_staff_journal diff --git a/bin/migrate-oats-data/srw/post_launch/srw_migration.py b/bin/migrate-oats-data/srw/post_launch/srw_migration.py index 90c054fab0..c89f2fb56b 100644 --- a/bin/migrate-oats-data/srw/post_launch/srw_migration.py +++ b/bin/migrate-oats-data/srw/post_launch/srw_migration.py @@ -8,6 +8,7 @@ clean_transferees, ) from ..applicant.srw_process_applicant import update_srw_base_applicant +from ..srw_staff_journal import process_srw_staff_journal, clean_srw_staff_journal def process_srw(batch_size): @@ -19,6 +20,7 @@ def init_srw(batch_size): update_srw_base_fields(batch_size) _process_srw_submission(batch_size) update_srw_base_applicant(batch_size) + process_srw_staff_journal(batch_size) def _process_srw_submission(batch_size): @@ -29,6 +31,7 @@ def _process_srw_submission(batch_size): def clean_srw(): + clean_srw_staff_journal() clean_transferees() clean_parcels() clean_srw_submissions() diff --git a/bin/migrate-oats-data/srw/sql/srw_staff_journal.sql b/bin/migrate-oats-data/srw/sql/srw_staff_journal.sql new file mode 100644 index 0000000000..adf2d01a83 --- /dev/null +++ b/bin/migrate-oats-data/srw/sql/srw_staff_journal.sql @@ -0,0 +1,3 @@ +SELECT osj.journal_date, osj.journal_text, osj.revision_count, osj.staff_journal_entry_id, an."uuid" +FROM oats.oats_staff_journal_entries osj +JOIN alcs.notification an ON an.file_number = osj.alr_application_id::TEXT \ No newline at end of file diff --git a/bin/migrate-oats-data/srw/sql/srw_staff_journal_count.sql b/bin/migrate-oats-data/srw/sql/srw_staff_journal_count.sql new file mode 100644 index 0000000000..d74c31c409 --- /dev/null +++ b/bin/migrate-oats-data/srw/sql/srw_staff_journal_count.sql @@ -0,0 +1,3 @@ +SELECT count (*) +FROM oats.oats_staff_journal_entries osj +JOIN alcs.notification an ON an.file_number = osj.alr_application_id::TEXT \ No newline at end of file diff --git a/bin/migrate-oats-data/srw/srw_staff_journal.py b/bin/migrate-oats-data/srw/srw_staff_journal.py new file mode 100644 index 0000000000..a79d24fa2a --- /dev/null +++ b/bin/migrate-oats-data/srw/srw_staff_journal.py @@ -0,0 +1,155 @@ +from common import ( + BATCH_UPLOAD_SIZE, + OATS_ETL_USER, + setup_and_get_logger, + add_timezone_and_keep_date_part, +) +from db import inject_conn_pool +from psycopg2.extras import RealDictCursor, execute_batch + +etl_name = "srw_staff_journal" +logger = setup_and_get_logger(etl_name) + + +@inject_conn_pool +def process_srw_staff_journal(conn=None, batch_size=BATCH_UPLOAD_SIZE): + """ + This function is responsible for initializing entries for notifications in staff_journal table in ALCS. + + Args: + conn (psycopg2.extensions.connection): PostgreSQL database connection. Provided by the decorator. + batch_size (int): The number of items to process at once. Defaults to BATCH_UPLOAD_SIZE. + """ + + logger.info(f"Start {etl_name}") + with conn.cursor(cursor_factory=RealDictCursor) as cursor: + with open( + "srw/sql/srw_staff_journal_count.sql", + "r", + encoding="utf-8", + ) as sql_file: + count_query = sql_file.read() + cursor.execute(count_query) + count_total = dict(cursor.fetchone())["count"] + logger.info(f"Total staff journal entry data to insert: {count_total}") + + failed_inserts_count = 0 + successful_inserts_count = 0 + last_entry_id = 0 + with open( + "srw/sql/srw_staff_journal.sql", + "r", + encoding="utf-8", + ) as sql_file: + submission_sql = sql_file.read() + while True: + cursor.execute( + f"{submission_sql} WHERE osj.staff_journal_entry_id > '{last_entry_id}' ORDER BY osj.staff_journal_entry_id;" + ) + + rows = cursor.fetchmany(batch_size) + + if not rows: + break + try: + users_to_be_inserted_count = len(rows) + + _insert_entries(conn, batch_size, cursor, rows) + + successful_inserts_count = ( + successful_inserts_count + users_to_be_inserted_count + ) + last_entry_id = dict(rows[-1])["staff_journal_entry_id"] + + logger.debug( + f"retrieved/inserted items count: {users_to_be_inserted_count}; total successfully inserted entries so far {successful_inserts_count}; last inserted journal_id: {last_entry_id}" + ) + except Exception as err: + logger.exception("") + conn.rollback() + failed_inserts_count = count_total - successful_inserts_count + last_entry_id = last_entry_id + 1 + + logger.info( + f"Finished {etl_name}: total amount of successful inserts {successful_inserts_count}, total failed inserts {failed_inserts_count}" + ) + + +def _insert_entries(conn, batch_size, cursor, rows): + query = _get_insert_query() + parsed_data_list = _prepare_journal_data(rows) + + if len(parsed_data_list) > 0: + execute_batch(cursor, query, parsed_data_list, page_size=batch_size) + + conn.commit() + + +def _get_insert_query(): + query = f""" + INSERT INTO alcs.staff_journal ( + body, + edited, + notification_uuid, + created_at, + author_uuid, + audit_created_by + ) + VALUES ( + %(journal_text)s, + %(edit)s, + %(uuid)s, + %(journal_date)s, + %(user)s, + '{OATS_ETL_USER}' + ) + ON CONFLICT DO NOTHING; + """ + return query + + +def _prepare_journal_data(row_data_list): + data_list = [] + for row in row_data_list: + data = dict(row) + data = _map_revision(data) + data = _map_timezone(data) + data = _add_user(data) + data_list.append(dict(data)) + return data_list + + +def _map_revision(data): + revision = data.get("revision_count", "") + # check if edited + if revision == 0: + data["edit"] = False + else: + data["edit"] = True + return data + + +def _map_timezone(data): + date = data.get("journal_date", "") + journal_date = add_timezone_and_keep_date_part(date) + data["journal_date"] = journal_date + return data + + +def _add_user(data): + user_uuid = "ca8e91dc-cfb0-45c3-a443-8e47e44591df" + data["user"] = user_uuid + return data + + +@inject_conn_pool +def clean_srw_staff_journal(conn=None): + logger.info("Start staff journal cleaning") + # Only clean applications + with conn.cursor() as cursor: + cursor.execute( + f"DELETE FROM alcs.staff_journal asj WHERE asj.audit_created_by = '{OATS_ETL_USER}' AND asj.notification_uuid IS NOT NULL" + ) + logger.info(f"Deleted items count = {cursor.rowcount}") + + conn.commit() From f7641f047b6daa5ab6d8fcd8f36f409860a8fa8c Mon Sep 17 00:00:00 2001 From: Liam Stoddard Date: Thu, 7 Mar 2024 09:43:46 -0800 Subject: [PATCH 2/3] simplify user uuid --- bin/migrate-oats-data/srw/srw_staff_journal.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/bin/migrate-oats-data/srw/srw_staff_journal.py b/bin/migrate-oats-data/srw/srw_staff_journal.py index a79d24fa2a..2fdb7fddd0 100644 --- a/bin/migrate-oats-data/srw/srw_staff_journal.py +++ b/bin/migrate-oats-data/srw/srw_staff_journal.py @@ -114,7 +114,7 @@ def _prepare_journal_data(row_data_list): data = dict(row) data = _map_revision(data) data = _map_timezone(data) - data = _add_user(data) + data["user"] = "ca8e91dc-cfb0-45c3-a443-8e47e44591df" data_list.append(dict(data)) return data_list @@ -136,12 +136,6 @@ def _map_timezone(data): return data -def _add_user(data): - user_uuid = "ca8e91dc-cfb0-45c3-a443-8e47e44591df" - data["user"] = user_uuid - return data - - @inject_conn_pool def clean_srw_staff_journal(conn=None): logger.info("Start staff journal cleaning") From b4dd02a7baeb08f444ed9ec398e5a3c5b883c38f Mon Sep 17 00:00:00 2001 From: Liam Stoddard Date: Thu, 7 Mar 2024 10:43:58 -0800 Subject: [PATCH 3/3] add SRW type clause --- bin/migrate-oats-data/srw/sql/srw_staff_journal.sql | 3 ++- bin/migrate-oats-data/srw/sql/srw_staff_journal_count.sql | 3 ++- bin/migrate-oats-data/srw/srw_staff_journal.py | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/bin/migrate-oats-data/srw/sql/srw_staff_journal.sql b/bin/migrate-oats-data/srw/sql/srw_staff_journal.sql index adf2d01a83..7d765e5b17 100644 --- a/bin/migrate-oats-data/srw/sql/srw_staff_journal.sql +++ b/bin/migrate-oats-data/srw/sql/srw_staff_journal.sql @@ -1,3 +1,4 @@ SELECT osj.journal_date, osj.journal_text, osj.revision_count, osj.staff_journal_entry_id, an."uuid" FROM oats.oats_staff_journal_entries osj -JOIN alcs.notification an ON an.file_number = osj.alr_application_id::TEXT \ No newline at end of file +JOIN alcs.notification an ON an.file_number = osj.alr_application_id::TEXT +WHERE an.type_code = 'SRW' \ No newline at end of file diff --git a/bin/migrate-oats-data/srw/sql/srw_staff_journal_count.sql b/bin/migrate-oats-data/srw/sql/srw_staff_journal_count.sql index d74c31c409..692ed2a0a7 100644 --- a/bin/migrate-oats-data/srw/sql/srw_staff_journal_count.sql +++ b/bin/migrate-oats-data/srw/sql/srw_staff_journal_count.sql @@ -1,3 +1,4 @@ SELECT count (*) FROM oats.oats_staff_journal_entries osj -JOIN alcs.notification an ON an.file_number = osj.alr_application_id::TEXT \ No newline at end of file +JOIN alcs.notification an ON an.file_number = osj.alr_application_id::TEXT +WHERE an.type_code = 'SRW'; \ No newline at end of file diff --git a/bin/migrate-oats-data/srw/srw_staff_journal.py b/bin/migrate-oats-data/srw/srw_staff_journal.py index 2fdb7fddd0..cb47afacfb 100644 --- a/bin/migrate-oats-data/srw/srw_staff_journal.py +++ b/bin/migrate-oats-data/srw/srw_staff_journal.py @@ -44,7 +44,7 @@ def process_srw_staff_journal(conn=None, batch_size=BATCH_UPLOAD_SIZE): submission_sql = sql_file.read() while True: cursor.execute( - f"{submission_sql} WHERE osj.staff_journal_entry_id > '{last_entry_id}' ORDER BY osj.staff_journal_entry_id;" + f"{submission_sql} AND osj.staff_journal_entry_id > '{last_entry_id}' ORDER BY osj.staff_journal_entry_id;" ) rows = cursor.fetchmany(batch_size)