Skip to content

Commit

Permalink
Update applicant on NOI and Application (#1364)
Browse files Browse the repository at this point in the history
update audit_created_at on parcel and owner for application and noi so the dates will different from each other and allow sorting
update applicant on application_submission
update applicant on noi_submission
  • Loading branch information
mhuseinov authored Jan 31, 2024
1 parent d9e2b1a commit 4606bd8
Show file tree
Hide file tree
Showing 15 changed files with 448 additions and 43 deletions.
2 changes: 2 additions & 0 deletions bin/migrate-oats-data/applications/migrate_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
insert_application_submission_review,
clean_reviews,
update_application_submissions,
process_application_applicant_on_submissions
)
from .base_applications import process_applications, clean_applications
from .app_prep import process_alcs_application_prep_fields
Expand Down Expand Up @@ -67,6 +68,7 @@ def process_application_etl(batch_size):
process_application_statuses(batch_size)
process_application_parcels(batch_size)
process_application_owners(batch_size)
process_application_applicant_on_submissions(batch_size)
process_app_staff_journal(batch_size)
process_application_decisions(batch_size)
set_application_visibility()
Expand Down
3 changes: 2 additions & 1 deletion bin/migrate-oats-data/applications/submissions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
process_application_parcels,
)
from .migrate_application_owners import process_application_owners
from .review import insert_application_submission_review, clean_reviews
from .review import insert_application_submission_review, clean_reviews
from .application_applicant_on_submissions import process_application_applicant_on_submissions
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from common import (
setup_and_get_logger,
BATCH_UPLOAD_SIZE,
)
from db import inject_conn_pool
from psycopg2.extras import RealDictCursor, execute_batch

etl_name = "process_application_applicant_on_submissions"
logger = setup_and_get_logger(etl_name)


@inject_conn_pool
def process_application_applicant_on_submissions(
conn=None, batch_size=BATCH_UPLOAD_SIZE
):
logger.info(f"Start {etl_name}")
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with open(
"applications/submissions/sql/applicant/application_applicant_count.sql",
"r",
encoding="utf-8",
) as sql_file:
count_query = sql_file.read()
cursor.execute(count_query)
count_total = dict(cursor.fetchone())["count"]
logger.info(f"Total Applications data to update: {count_total}")

failed_updates_count = 0
successful_updates_count = 0
last_updated_id = "00000000-0000-0000-0000-000000000000"

with open(
"applications/submissions/sql/applicant/application_applicant.sql",
"r",
encoding="utf-8",
) as sql_file:
submission_query = sql_file.read()
while True:
cursor.execute(
f"{submission_query} AND submission_uuid > '{last_updated_id}' ORDER BY submission_uuid;"
)

rows = cursor.fetchmany(batch_size)

if not rows:
break
try:
records_to_be_updated_count = len(rows)

_update_records(conn, batch_size, cursor, rows)

successful_updates_count = (
successful_updates_count + records_to_be_updated_count
)
last_updated_id = dict(rows[-1])["submission_uuid"]

logger.debug(
f"retrieved/updated items count: {records_to_be_updated_count}; total successfully updated applications so far {successful_updates_count}; last updated submission_uuid: {last_updated_id}"
)
except Exception as err:
# this is NOT going to be caused by actual data update failure. This code is only executed when the code error appears or connection to DB is lost
logger.exception()
conn.rollback()
failed_updates_count = count_total - successful_updates_count
last_updated_id = last_updated_id + 1

logger.info(
f"Finished {etl_name}: total amount of successful updates {successful_updates_count}, total failed updates {failed_updates_count}"
)


def _update_records(conn, batch_size, cursor, rows):
parsed_data_list = _prepare_data(rows)

if len(rows) > 0:
execute_batch(
cursor,
_update_query,
parsed_data_list,
page_size=batch_size,
)

conn.commit()


def _prepare_data(rows):
data_list = []
for row in rows:
data_list.append(_map_data(row))

return data_list


def _map_data(row):
applicant = (
row.get("last_name") if row.get("last_name") else row.get("organization_name")
)

if applicant and row.get("owner_count_extension"):
applicant = f"{applicant} {row.get('owner_count_extension')}"

return {"applicant": applicant, "submission_uuid": row["submission_uuid"]}


_update_query = """
UPDATE alcs.application_submission
SET applicant = %(applicant)s
WHERE "uuid" = %(submission_uuid)s;
"""
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
OATS_ETL_USER,
add_timezone_and_keep_date_part,
OatsToAlcsOwnershipType,
to_alcs_format,
get_now_with_offset,
)
from db import inject_conn_pool
from psycopg2.extras import RealDictCursor
Expand Down Expand Up @@ -32,7 +34,7 @@ def init_application_parcels(conn=None, batch_size=BATCH_UPLOAD_SIZE):
last_subject_property_id = 0

with open(
"applications/submissions/sql/parcels/application_parcels_insert.sql",
"applications/submissions/sql/parcels/application_parcels_insert.sql",
"r",
encoding="utf-8",
) as sql_file:
Expand All @@ -49,7 +51,7 @@ def init_application_parcels(conn=None, batch_size=BATCH_UPLOAD_SIZE):
try:
records_to_be_inserted_count = len(rows)

_insert_records(conn, cursor, rows)
_insert_records(conn, cursor, rows, successful_inserts_count)

successful_inserts_count = (
successful_inserts_count + records_to_be_inserted_count
Expand All @@ -71,26 +73,27 @@ def init_application_parcels(conn=None, batch_size=BATCH_UPLOAD_SIZE):
)


def _insert_records(conn, cursor, rows):
def _insert_records(conn, cursor, rows, insert_index):
number_of_rows_to_insert = len(rows)

if number_of_rows_to_insert > 0:
insert_query = _compile_application_insert_query(number_of_rows_to_insert)
rows_to_insert = _prepare_data_to_insert(rows)
rows_to_insert = _prepare_data_to_insert(rows, insert_index)
cursor.execute(insert_query, rows_to_insert)
conn.commit()


def _prepare_data_to_insert(rows):
def _prepare_data_to_insert(rows, insert_index):
row_without_last_element = []
for row in rows:
mapped_row = _map_data(row)
mapped_row = _map_data(row, insert_index)
row_without_last_element.append(tuple(mapped_row.values()))
insert_index += 1

return row_without_last_element


def _map_data(row):
def _map_data(row, insert_index):
return {
"application_submission_uuid": row["application_submission_uuid"],
"audit_created_by": OATS_ETL_USER,
Expand All @@ -108,6 +111,7 @@ def _map_data(row):
"purchased_date": _map_purchased_date(row["purchase_date"]),
"oats_subject_property_id": row["subject_property_id"],
"oats_property_id": row["property_id"],
"audit_created_at": to_alcs_format(get_now_with_offset(insert_index)),
}


Expand Down Expand Up @@ -145,7 +149,8 @@ def _compile_application_insert_query(number_of_rows_to_insert):
pin,
purchased_date,
oats_subject_property_id,
oats_property_id
oats_property_id,
audit_created_at
)
VALUES{parcels_to_insert}
ON CONFLICT DO NOTHING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
OATS_ETL_USER,
ALCSOwnershipType,
ALCSOwnerType,
to_alcs_format,
get_now_with_offset,
)
from db import inject_conn_pool
from psycopg2.extras import RealDictCursor
Expand All @@ -27,7 +29,7 @@ def init_application_parcel_owners(conn=None, batch_size=BATCH_UPLOAD_SIZE):
logger.info(f"Total applications data to insert: {count_total}")

failed_inserts = 0
successful_updates_count = 0
successful_insert_count = 0
last_subject_property = 0
last_person_organization_id = 0

Expand All @@ -49,36 +51,36 @@ def init_application_parcel_owners(conn=None, batch_size=BATCH_UPLOAD_SIZE):
try:
records_to_be_inserted_count = len(rows)

_insert_records(conn, cursor, rows)
_insert_records(conn, cursor, rows, successful_insert_count)

successful_updates_count = (
successful_updates_count + records_to_be_inserted_count
successful_insert_count = (
successful_insert_count + records_to_be_inserted_count
)

last_record = dict(rows[-1])
last_subject_property = last_record["subject_property_id"]
last_person_organization_id = last_record["person_organization_id"]

logger.debug(
f"retrieved/updated items count: {records_to_be_inserted_count}; total successfully insert applications owners so far {successful_updates_count}; last updated {last_subject_property} {last_person_organization_id}"
f"retrieved/updated items count: {records_to_be_inserted_count}; total successfully insert applications owners so far {successful_insert_count}; last updated {last_subject_property} {last_person_organization_id}"
)
except Exception as err:
logger.exception(err)
conn.rollback()
failed_inserts = count_total - successful_updates_count
failed_inserts = count_total - successful_insert_count
last_person_organization_id = last_person_organization_id + 1

logger.info(
f"Finished {etl_name}: total amount of successful inserts {successful_updates_count}, total failed inserts {failed_inserts}"
f"Finished {etl_name}: total amount of successful inserts {successful_insert_count}, total failed inserts {failed_inserts}"
)


def _insert_records(conn, cursor, rows):
def _insert_records(conn, cursor, rows, insert_index):
number_of_rows_to_insert = len(rows)

if number_of_rows_to_insert > 0:
insert_query = _compile_owner_insert_query(number_of_rows_to_insert)
rows_to_insert = _prepare_data_to_insert(rows)
rows_to_insert = _prepare_data_to_insert(rows, insert_index)
cursor.execute(insert_query, rows_to_insert)
conn.commit()

Expand All @@ -96,23 +98,25 @@ def _compile_owner_insert_query(number_of_rows_to_insert):
type_code,
oats_person_organization_id,
oats_property_interest_id,
audit_created_by
audit_created_by,
audit_created_at
)
VALUES{owners_to_insert}
ON CONFLICT DO NOTHING;
"""


def _prepare_data_to_insert(rows):
def _prepare_data_to_insert(rows, insert_index):
row_without_last_element = []
for row in rows:
mapped_row = _map_data(row)
mapped_row = _map_data(row, insert_index)
row_without_last_element.append(tuple(mapped_row.values()))
insert_index += 1

return row_without_last_element


def _map_data(row):
def _map_data(row, insert_index):
return {
"first_name": _get_name(row),
"last_name": row["last_name"],
Expand All @@ -124,6 +128,7 @@ def _map_data(row):
"oats_person_organization_id": row["person_organization_id"],
"oats_property_interest_id": row["property_interest_id"],
"audit_created_by": OATS_ETL_USER,
"audit_created_at": to_alcs_format(get_now_with_offset(insert_index)),
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
WITH ranked_parcels AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY application_submission_uuid
ORDER BY audit_created_at
) AS rn
FROM alcs.application_parcel
),
first_parcel_per_submission AS (
SELECT uuid,
audit_created_at,
application_submission_uuid
FROM ranked_parcels
WHERE rn = 1
),
ranked_owners AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY app_own.application_parcel_uuid
ORDER BY appo.audit_created_at
) AS rn,
fp.application_submission_uuid as submission_uuid
FROM alcs.application_owner appo
JOIN alcs.application_parcel_owners_application_owner app_own ON app_own.application_owner_uuid = appo.uuid
JOIN first_parcel_per_submission AS fp ON fp.uuid = app_own.application_parcel_uuid
JOIN alcs.application_submission nois ON nois.uuid = fp.application_submission_uuid
)
SELECT last_name,
organization_name,
(
CASE
WHEN (
SELECT count(*)
FROM alcs.application_parcel_owners_application_owner
WHERE application_parcel_uuid = ranked_owners.application_parcel_uuid
) > 1 THEN 'et al.'
ELSE ''
END
) as owner_count_extension,
submission_uuid
FROM ranked_owners
WHERE rn = 1
AND applicant ILIKE 'unknown'
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
WITH ranked_parcels AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY application_submission_uuid
ORDER BY audit_created_at
) AS rn
FROM alcs.application_parcel
),
first_parcel_per_submission AS (
SELECT uuid,
audit_created_at,
application_submission_uuid
FROM ranked_parcels
WHERE rn = 1
),
ranked_owners AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY app_own.application_parcel_uuid
ORDER BY appo.audit_created_at
) AS rn,
fp.application_submission_uuid as submission_uuid
FROM alcs.application_owner appo
JOIN alcs.application_parcel_owners_application_owner app_own ON app_own.application_owner_uuid = appo.uuid
JOIN first_parcel_per_submission AS fp ON fp.uuid = app_own.application_parcel_uuid
JOIN alcs.application_submission nois ON nois.uuid = fp.application_submission_uuid
)
SELECT count(*)
FROM ranked_owners
WHERE rn = 1
AND applicant ILIKE 'unknown'
Loading

0 comments on commit 4606bd8

Please sign in to comment.