Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add srw-only import #1483

Merged
merged 2 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions bin/migrate-files/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ The files are uploaded in the format `/migrate/application||issue||planning_revi
- `document_id` is the primary key from the documents table
- `filename` is the filename metadata from the documents table

Note: SRWs are stored in the application folder but imported separately

## Libraries Used

os: used to interact with the file system
Expand Down Expand Up @@ -72,6 +74,7 @@ To run the script, run the following command:
python migrate-files.py application
python migrate-files.py application --start_document_id=500240 --end_document_id=505260 --last_imported_document_id=500475
```
Note: SRWs are stored in the application folder but imported separately

Application document import supports running multiple terminals at the same time with specifying baches of data to import.

Expand Down Expand Up @@ -117,6 +120,11 @@ python migrate-files.py planning
python migrate-files.py issue
```

```sh
# to start srw document import
python migrate-files.py srw
```

M1:

```sh
Expand All @@ -134,6 +142,11 @@ python3-intel64 migrate-files.py planning
python3-intel64 migrate-files.py issue
```

```sh
# to start srw document import
python3-intel64 migrate-files.py srw
```

The script will start uploading files from the Oracle database to DELL ECS. The upload progress will be displayed in a progress bar. For Planning and Issues documents the script will also save the last uploaded document id, so the upload process can be resumed from where it left off in case of any interruption. For Application documents import it is responsibility of whoever is running the process to specify "last_imported_document_id"

## Windows
Expand Down
1 change: 1 addition & 0 deletions bin/migrate-files/application_docs/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .application_docs_import import import_application_docs
from .srw_docs_import import import_srw_docs
232 changes: 232 additions & 0 deletions bin/migrate-files/application_docs/srw_docs_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
from tqdm import tqdm
import cx_Oracle
from common import (
LAST_IMPORTED_APPLICATION_FILE,
DocumentUploadBasePath,
upload_file_to_s3,
get_starting_document_id,
get_max_file_size,
EntityType,
handle_document_processing_error,
fetch_data_from_oracle,
process_results,
log_last_imported_file,
generate_log_file_name,
)

log_file_name = generate_log_file_name(LAST_IMPORTED_APPLICATION_FILE)


def import_srw_docs(
batch,
cursor,
conn,
s3,
start_document_id_arg,
end_document_id_arg,
last_imported_document_id_arg,
):
# Get total number of files
application_count = _get_total_number_of_files(
cursor, start_document_id_arg, end_document_id_arg
)
last_imported_document_id_arg = last_imported_document_id_arg
offset = (
last_imported_document_id_arg
if last_imported_document_id_arg == 0
else _get_total_number_of_transferred_files(
cursor, start_document_id_arg, last_imported_document_id_arg
)
)
print(
f"{EntityType.APPLICATION.value} count = {application_count} offset = {offset}"
)
starting_document_id = last_imported_document_id_arg

# Track progress
documents_processed = 0
last_document_id = starting_document_id

try:
with tqdm(
total=application_count,
initial=offset,
unit="file",
desc=f"Uploading {EntityType.APPLICATION.value} files to S3",
) as documents_upload_progress_bar:
max_file_size = get_max_file_size(cursor)

while True:
starting_document_id = get_starting_document_id(
starting_document_id, last_document_id, EntityType.APPLICATION.value
)

params = {
"starting_document_id": starting_document_id,
"end_document_id": end_document_id_arg,
"max_file_size": max_file_size,
"batch_size": batch,
}
data = fetch_data_from_oracle(_document_query, cursor, params)

if not data:
break
# Upload the batch to S3 with a progress bar
for (
file_size,
document_id,
application_id,
filename,
file,
) in data:
tqdm.write(f"{application_id}/{document_id}_{filename}")

upload_file_to_s3(
s3,
DocumentUploadBasePath.APPLICATION.value,
file_size,
document_id,
application_id,
filename,
file,
)

documents_upload_progress_bar.update(1)
last_document_id = document_id
documents_processed += 1
log_last_imported_file(last_document_id, log_file_name)

except Exception as error:
handle_document_processing_error(
cursor,
conn,
error,
EntityType.APPLICATION.value,
documents_processed,
last_document_id,
log_file_name,
)

# Display results
process_results(
EntityType.APPLICATION.value,
application_count,
documents_processed,
last_document_id,
log_file_name,
)

return


_document_query = """
WITH app_docs_srw AS (

SELECT document_id FROM oats.oats_documents od
LEFT JOIN oats.oats_alr_appl_components oaac ON oaac.alr_application_id = od.alr_application_id
WHERE oaac.alr_change_code = 'SRW'
GROUP BY od.document_id

),
documents_with_cumulative_file_size AS (
SELECT
ROW_NUMBER() OVER(
ORDER BY od.DOCUMENT_ID ASC
) row_num,
dbms_lob.getLength(DOCUMENT_BLOB) file_size,
SUM(dbms_lob.getLength(DOCUMENT_BLOB)) OVER (ORDER BY od.DOCUMENT_ID ASC ROWS UNBOUNDED PRECEDING) AS cumulative_file_size,
od.DOCUMENT_ID,
ALR_APPLICATION_ID,
FILE_NAME,
DOCUMENT_BLOB,
DOCUMENT_CODE,
DESCRIPTION,
DOCUMENT_SOURCE_CODE,
UPLOADED_DATE,
WHEN_UPDATED,
REVISION_COUNT
FROM
OATS.OATS_DOCUMENTS od
JOIN app_docs_srw appds ON appds.document_id = od.document_id -- this will filter out all non SRW related documents
WHERE
dbms_lob.getLength(DOCUMENT_BLOB) > 0
AND od.DOCUMENT_ID > :starting_document_id
AND (:end_document_id = 0 OR od.DOCUMENT_ID <= :end_document_id)
AND ALR_APPLICATION_ID IS NOT NULL
ORDER BY
DOCUMENT_ID ASC
)
SELECT
file_size,
docwc.DOCUMENT_ID,
ALR_APPLICATION_ID,
FILE_NAME,
DOCUMENT_BLOB
FROM
documents_with_cumulative_file_size docwc
WHERE
cumulative_file_size < :max_file_size
AND row_num < :batch_size
ORDER BY
docwc.DOCUMENT_ID ASC
"""


def _get_total_number_of_files(cursor, start_document_id, end_document_id):
try:
cursor.execute(
"""
WITH app_docs_srw AS (

SELECT document_id FROM oats.oats_documents od
LEFT JOIN oats.oats_alr_appl_components oaac ON oaac.alr_application_id = od.alr_application_id
WHERE oaac.alr_change_code = 'SRW'
GROUP BY od.document_id

)
SELECT COUNT(*)
FROM OATS.OATS_DOCUMENTS od
JOIN app_docs_srw ON app_docs_srw.document_id = od.document_id
WHERE dbms_lob.getLength(DOCUMENT_BLOB) > 0
AND ALR_APPLICATION_ID IS NOT NULL
AND (:start_document_id = 0 OR od.DOCUMENT_ID > :start_document_id)
AND (:end_document_id = 0 OR od.DOCUMENT_ID <= :end_document_id)
""",
{
"start_document_id": start_document_id,
"end_document_id": end_document_id,
},
)
return cursor.fetchone()[0]
except cx_Oracle.Error as e:
raise Exception("Oracle Error: {}".format(e))


def _get_total_number_of_transferred_files(cursor, start_document_id, end_document_id):
try:
cursor.execute(
"""
WITH app_docs_srw AS (

SELECT document_id FROM oats.oats_documents od
LEFT JOIN oats.oats_alr_appl_components oaac ON oaac.alr_application_id = od.alr_application_id
WHERE oaac.alr_change_code = 'SRW'
GROUP BY od.document_id

)
SELECT COUNT(*)
FROM OATS.OATS_DOCUMENTS od
JOIN app_docs_srw ON app_docs_srw.document_id = od.document_id
WHERE dbms_lob.getLength(DOCUMENT_BLOB) > 0
AND ALR_APPLICATION_ID IS NOT NULL
AND od.DOCUMENT_ID > :start_document_id
AND (:end_document_id = 0 OR od.DOCUMENT_ID <= :end_document_id)
""",
{
"start_document_id": start_document_id,
"end_document_id": end_document_id,
},
)
return cursor.fetchone()[0]
except cx_Oracle.Error as e:
raise Exception("Oracle Error: {}".format(e))
14 changes: 12 additions & 2 deletions bin/migrate-files/migrate-files.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
ecs_access_key,
ecs_secret_key,
)
from application_docs import import_application_docs
from application_docs import import_application_docs, import_srw_docs
from planning_docs import import_planning_review_docs
from issue_docs import import_issue_docs
import argparse
Expand Down Expand Up @@ -61,6 +61,16 @@ def main(args):
import_planning_review_docs(batch_size, cursor, conn, s3)
elif args.document_type == "issue":
import_issue_docs(batch_size, cursor, conn, s3)
elif args.document_type == "srw":
import_srw_docs(
batch_size,
cursor,
conn,
s3,
start_document_id,
end_document_id,
last_imported_document_id,
)

print("File upload complete, closing connection")

Expand All @@ -73,7 +83,7 @@ def _parse_command_line_args(args):
parser = argparse.ArgumentParser()
parser.add_argument(
"document_type",
choices=["application", "planning", "issue"],
choices=["application", "planning", "issue", "srw"],
help="Document type to be processed",
)
parser.add_argument(
Expand Down