Skip to content

Commit

Permalink
Feature/alcs 1711 srw parcel ETL (#1470)
Browse files Browse the repository at this point in the history
import SRW parcel
add oats columns to alcs.notification
  • Loading branch information
mhuseinov authored Mar 6, 2024
1 parent 23e1887 commit 5218525
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 1 deletion.
3 changes: 3 additions & 0 deletions bin/migrate-oats-data/srw/post_launch/srw_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from ..srw_base_update import update_srw_base_fields
from ..submission.srw_submission_init import init_srw_submissions, clean_srw_submissions
from ..submission.srw_proposal_fields import process_alcs_srw_proposal_fields
from ..submission.parcel.srw_parcel_init import init_srw_parcels, clean_parcels


def process_srw(batch_size):
Expand All @@ -17,8 +18,10 @@ def init_srw(batch_size):
def _process_srw_submission(batch_size):
init_srw_submissions(batch_size)
process_alcs_srw_proposal_fields(batch_size)
init_srw_parcels(batch_size)


def clean_srw():
clean_parcels()
clean_srw_submissions()
clean_initial_srw()
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
WITH parcels_to_insert AS (
SELECT nos.uuid,
osp.subject_property_id
FROM alcs.notification_submission nos
JOIN oats.oats_subject_properties osp ON osp.alr_application_id = nos.file_number::bigint
WHERE osp.alr_application_land_ind = 'Y' -- ensure that only parcels related to application are selected
AND nos.type_code = 'SRW'
),
grouped_oats_property_interests_ids AS (
SELECT subject_property_id
FROM oats.oats_property_interests opi
GROUP BY opi.subject_property_id
)
SELECT uuid AS notification_submission_uuid,
op.civic_address,
op.legal_description,
op.area_size,
op.pid,
op.pin,
osp.subject_property_id,
op.property_id
FROM parcels_to_insert pti
JOIN oats.oats_subject_properties osp ON osp.subject_property_id = pti.subject_property_id
JOIN oats.oats_properties op ON op.property_id = osp.property_id
LEFT JOIN grouped_oats_property_interests_ids gopi ON gopi.subject_property_id = pti.subject_property_id
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
WITH parcels_to_insert AS (
SELECT nos.uuid,
osp.subject_property_id
FROM alcs.notification_submission nos
JOIN oats.oats_subject_properties osp ON osp.alr_application_id = nos.file_number::bigint
WHERE osp.alr_application_land_ind = 'Y' -- ensure that only parcels related to application are selected
AND nos.type_code = 'SRW'
),
grouped_oats_property_interests_ids AS (
SELECT subject_property_id
FROM oats.oats_property_interests opi
GROUP BY opi.subject_property_id
)
SELECT count(*)
FROM parcels_to_insert pti
JOIN oats.oats_subject_properties osp ON osp.subject_property_id = pti.subject_property_id
JOIN oats.oats_properties op ON op.property_id = osp.property_id
LEFT JOIN grouped_oats_property_interests_ids gopi ON gopi.subject_property_id = pti.subject_property_id
1 change: 1 addition & 0 deletions bin/migrate-oats-data/srw/submission/parcel/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .srw_parcel_init import init_srw_parcels, clean_parcels
153 changes: 153 additions & 0 deletions bin/migrate-oats-data/srw/submission/parcel/srw_parcel_init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
from common import (
BATCH_UPLOAD_SIZE,
setup_and_get_logger,
OATS_ETL_USER,
OatsToAlcsOwnershipType,
to_alcs_format,
get_now_with_offset,
)
from db import inject_conn_pool
from psycopg2.extras import RealDictCursor

etl_name = "init_srw_parcels"
logger = setup_and_get_logger(etl_name)


@inject_conn_pool
def init_srw_parcels(conn=None, batch_size=BATCH_UPLOAD_SIZE):
logger.info(f"Start {etl_name}")

with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with open(
"srw/sql/submission/parcel/srw_parcels_insert_count.sql",
"r",
encoding="utf-8",
) as sql_file:
count_query = sql_file.read()
cursor.execute(count_query)
count_total = dict(cursor.fetchone())["count"]
logger.info(f"Total Notification Parcels to insert: {count_total}")

failed_inserts = 0
successful_inserts_count = 0
last_subject_property_id = 0

with open(
"srw/sql/submission/parcel/srw_parcels_insert.sql",
"r",
encoding="utf-8",
) as sql_file:
application_sql = sql_file.read()
while True:
cursor.execute(
f"""
{application_sql}
WHERE osp.subject_property_id > {last_subject_property_id} ORDER BY osp.subject_property_id;
"""
)

rows = cursor.fetchmany(batch_size)

if not rows:
break
try:
records_to_be_inserted_count = len(rows)

_insert_records(conn, cursor, rows, successful_inserts_count)

successful_inserts_count = (
successful_inserts_count + records_to_be_inserted_count
)

last_subject_property_id = dict(rows[-1])["subject_property_id"]

logger.debug(
f"inserted items count: {records_to_be_inserted_count}; total successfully inserted Notification Parcels so far {successful_inserts_count}; last inserted subject_property_id: {last_subject_property_id}"
)
except Exception as err:
logger.exception(err)
conn.rollback()
failed_inserts = count_total - successful_inserts_count
last_subject_property_id = last_subject_property_id + 1

logger.info(
f"Finished {etl_name}: total amount of successful inserts {successful_inserts_count}, total failed inserts {failed_inserts}"
)


def _insert_records(conn, cursor, rows, insert_index):
number_of_rows_to_insert = len(rows)

if number_of_rows_to_insert > 0:
insert_query = _compile_parcel_insert_query(number_of_rows_to_insert)
rows_to_insert = _prepare_data_to_insert(rows, insert_index)
cursor.execute(insert_query, rows_to_insert)
conn.commit()


def _prepare_data_to_insert(rows, insert_index):
row_without_last_element = []
for row in rows:
mapped_row = _map_data(row, insert_index)
row_without_last_element.append(tuple(mapped_row.values()))
insert_index += 1

return row_without_last_element


def _map_data(row, insert_index):
return {
"notification_submission_uuid": row["notification_submission_uuid"],
"audit_created_by": OATS_ETL_USER,
"audit_created_at": to_alcs_format(get_now_with_offset(insert_index)),
"civic_address": row["civic_address"],
"legal_description": row["legal_description"],
"map_area_hectares": row["area_size"],
"ownership_type_code": _map_ownership_type_code(row),
"pid": row["pid"],
"pin": row["pin"],
"oats_subject_property_id": row["subject_property_id"],
"oats_property_id": row["property_id"],
}


def _map_ownership_type_code(data):
return (
OatsToAlcsOwnershipType.CROWN.value
if data["pin"]
else OatsToAlcsOwnershipType.FEE.value
)


def _compile_parcel_insert_query(number_of_rows_to_insert):
parcels_to_insert = ",".join(["%s"] * number_of_rows_to_insert)
return f"""
INSERT INTO alcs.notification_parcel
(
notification_submission_uuid,
audit_created_by,
audit_created_at,
civic_address,
legal_description,
map_area_hectares,
ownership_type_code,
pid,
pin,
oats_subject_property_id,
oats_property_id
)
VALUES{parcels_to_insert}
ON CONFLICT DO NOTHING
"""


@inject_conn_pool
def clean_parcels(conn=None):
logger.info("Start notification parcel cleaning")
with conn.cursor() as cursor:
cursor.execute(
"DELETE FROM alcs.notification_parcel nop WHERE nop.audit_created_by = 'oats_etl' AND nop.audit_updated_by IS NULL"
)
logger.info(f"Deleted items count = {cursor.rowcount}")
conn.commit()
logger.info("Done notification parcel cleaning")
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from common import BATCH_UPLOAD_SIZE, NO_DATA_IN_OATS, setup_and_get_logger
from common import BATCH_UPLOAD_SIZE, setup_and_get_logger
from db import inject_conn_pool
from psycopg2.extras import RealDictCursor, execute_batch

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,22 @@ export class NotificationParcel extends Base {
@AutoMap()
@Column()
notificationSubmissionUuid: string;

@Column({
select: false,
nullable: true,
type: 'int8',
comment:
'This column is NOT related to any functionality in ALCS. It is only used for ETL and backtracking of imported data from OATS. It links oats.oats_subject_properties to alcs.notification_parcel.',
})
oatsSubjectPropertyId: number;

@Column({
select: false,
nullable: true,
type: 'int8',
comment:
'This column is NOT related to any functionality in ALCS. It is only used for ETL and backtracking of imported data from OATS. It links oats.oats_properties to alcs.notification_parcel.',
})
oatsPropertyId: number;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { MigrationInterface, QueryRunner } from 'typeorm';

export class NotificationOatsFields1709578449660 implements MigrationInterface {
name = 'NotificationOatsFields1709578449660';

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "alcs"."notification_parcel" ADD "oats_subject_property_id" bigint`,
);
await queryRunner.query(
`COMMENT ON COLUMN "alcs"."notification_parcel"."oats_subject_property_id" IS 'This column is NOT related to any functionality in ALCS. It is only used for ETL and backtracking of imported data from OATS. It links oats.oats_subject_properties to alcs.notification_parcel.'`,
);
await queryRunner.query(
`ALTER TABLE "alcs"."notification_parcel" ADD "oats_property_id" bigint`,
);
await queryRunner.query(
`COMMENT ON COLUMN "alcs"."notification_parcel"."oats_property_id" IS 'This column is NOT related to any functionality in ALCS. It is only used for ETL and backtracking of imported data from OATS. It links oats.oats_properties to alcs.notification_parcel.'`,
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`COMMENT ON COLUMN "alcs"."notification_parcel"."oats_property_id" IS 'This column is NOT related to any functionality in ALCS. It is only used for ETL and backtracking of imported data from OATS. It links oats.oats_properties to alcs.notification_parcel.'`,
);
await queryRunner.query(
`ALTER TABLE "alcs"."notification_parcel" DROP COLUMN "oats_property_id"`,
);
await queryRunner.query(
`COMMENT ON COLUMN "alcs"."notification_parcel"."oats_subject_property_id" IS 'This column is NOT related to any functionality in ALCS. It is only used for ETL and backtracking of imported data from OATS. It links oats.oats_subject_properties to alcs.notification_parcel.'`,
);
await queryRunner.query(
`ALTER TABLE "alcs"."notification_parcel" DROP COLUMN "oats_subject_property_id"`,
);
}
}

0 comments on commit 5218525

Please sign in to comment.