diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 00000000..e247d6f2 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,13 @@ +# To get started with Dependabot version updates, you'll need to specify which +# package ecosystems to update and where the package manifests are located. +# Please see the documentation for all configuration options: +# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates + +version: 2 +updates: + - package-ecosystem: "pip" # See documentation for possible values + directory: "/" # Location of package manifests + schedule: + interval: "daily" + reviewers: + - "@Hochfrequenz/python-developers-review-team" diff --git a/.gitignore b/.gitignore index c2d86e13..fc2c0814 100644 --- a/.gitignore +++ b/.gitignore @@ -132,3 +132,13 @@ dmypy.json # vscode settings .vscode/ + +# document folders +documents/ +backup_documents/ + +# output files +*.xlsx +*.csv +*.json +*.7z diff --git a/README.md b/README.md index e8f10621..d2d5ecec 100644 --- a/README.md +++ b/README.md @@ -1,66 +1,132 @@ -# Python Template Repository including Tox.ini, Unittests, Linting Actions and Coverage Measurements - -![Unittests status badge](https://github.com/Hochfrequenz/python_template_repository/workflows/Unittests/badge.svg) -![Coverage status badge](https://github.com/Hochfrequenz/python_template_repository/workflows/Coverage/badge.svg) -![Linting status badge](https://github.com/Hochfrequenz/python_template_repository/workflows/Linting/badge.svg) -![Black status badge](https://github.com/Hochfrequenz/python_template_repository/workflows/Black/badge.svg) - -This is a template repository. It doesn't contain any useful code but only a minimal working setup for a Python project including: - -+ a basic **project structure** with - + tox.ini - + requirements.in - + and a requirements.txt derived from it - + an example class - + an example unit test (using pytest) -+ ready to use **Github Actions** for - + [pytest](https://pytest.org) - + [code coverage measurement](https://coverage.readthedocs.io) (fails below 80% by default) - + [pylint](https://pylint.org/) (only accepts 10/10 code rating by default) - + [black](https://github.com/psf/black) code formatter check - using [lgeiger/black-action](https://github.com/lgeiger/black-action) - -By default it uses Python version 3.9. - -## How to use this Repository on Your Machine - -This introduction assumes that you have tox installed already ( -see [installation instructions](https://tox.readthedocs.io/en/latest/install.html)) and that a `.toxbase` environment -has been created. - -If this is the case, clone this repository and create the `dev` environment on your machine. +# AHB Extractor +![Unittests status badge](https://github.com/Hochfrequenz/AHBExtractor/workflows/Unittests/badge.svg) +![Coverage status badge](https://github.com/Hochfrequenz/AHBExtractor/workflows/Coverage/badge.svg) +![Linting status badge](https://github.com/Hochfrequenz/AHBExtractor/workflows/Linting/badge.svg) +![Black status badge](https://github.com/Hochfrequenz/AHBExtractor/workflows/Black/badge.svg) + +This tool helps to generate machine readable files from AHB documents. +## Installation +The AHB Extractor is a Python based tool. Therefor you have to make sure, that Python is running on your machine. +### Without tox + +Create a new virtual environment ```bash +python -m venv .venv +``` +The activation of the virtual environment depends on your used OS. +#### Activate virtual environment under Windows +``` +.venv\Scripts\activate +``` +#### Activate virtual environment under MacOS/Linux +``` +source .venv/bin/activate +``` + +Install the requirements with +``` +pip install -r requirements.txt +``` +Done. +### With tox +If tox is installed system-wide, then you can just run +``` tox -e dev ``` +in the root directory. -### How to use with PyCharm - -1. Create a new project using existing sources with your local working copy of this repository as root directory. Choose - the path `your_repo/.tox/dev/` as path of the "previously configured interpreter". -2. Set the - default [test runner of your project](https://www.jetbrains.com/help/pycharm/choosing-your-testing-framework.html) to - pytest. -3. Set - the [working directory of the unit tests](https://www.jetbrains.com/help/pycharm/creating-run-debug-configuration-for-tests.html) - to the project root (instead of the unittest directory) - -### How to use with VS Code - -1. Open the folder with VS Code. -2. **Select the python interpreter** which is created by tox. Open the command pallett with `CTRL + P` and type `Python: Select Interpreter`. Select the interpreter which is placed in `.tox/dev/Scripts/python.exe` under Windows or `.tox/dev/bin/python` under Linux and macOS. -3. **Setup pytest and pylint**. Therefore we open the file `.vscode/settings.json` which should be automatically generated during the interpreter setup. Insert the following lines into the settings: -```json - "python.testing.unittestEnabled": false, - "python.testing.nosetestsEnabled": false, - "python.testing.pytestEnabled": true, - "pythonTestExplorer.testFramework": "pytest", - "python.testing.pytestArgs": [ - "unittests" - ], - "python.linting.pylintEnabled": true +## Execution + +At the moment you have to define the filename of the AHB you want to extract in [`ahbextractor\ahbextractor.py`](./ahbextractor/ahbextractor.py). + +If the filename is set, you can run the script with +```bash +python -m ahbextractor ``` -4. Enjoy 🤗 +You should see some output like this in your terminal +``` +🔍 Extracting Pruefidentifikatoren: 11039, 11040, 11041 +💥 The Excel file 11039.xlsx is open. Please close this file and try again. +💾 Saved file for Pruefidentifikator 11040 +💾 Saved file for Pruefidentifikator 11041 +``` + +## PDF Documents + +The following sections give a short overview where to find the start and end for the Formate. + +## APERAK (Fehlermeldung) +* Datei: `CONTRL_APERAK_AHB_2_3h_20201016.pdf` +* Start: `4.2 Tabellarische Darstellung der APERAK` +* Ende einschließlich: `4.2 Tabellarische Darstellung der APERAK` + +## IFTSTA (Infomeldung) +* Dateiname: `IFTSTA_AHB_2_0b_20201016.pdf` +* Start: `4.1 Übermittlung des Status des Gesamtvorgangs im Rahmen des MSB-Wechsels 1/2` +* Ende einschließlich: `4.9 Information zur Ablehnung eines Angebots oder einer Anfrage` + +## INSRPT (Prüfbericht) +* Datei: `INSRPT_AHB_1_1f_Lesefassung_20191002.pdf` +* Start: `3.1 Anwendungsfälle: Störungsmeldung, Ablehnung bzw. Bestätigung der Störungsmeldung und Ergebnisbericht` +* Ende einschließlich: `3.3 Anwendungsfälle: Informationsmeldungen des MSB in der Sparte Strom` + +## INVOIC (Rechnung) +* Datei: `INVOIC_REMADV_AHB_2_4_Lesefassung_20200701.pdf` +* Start: `2.1.1 Abschlags-, NN-, WiM- und MSB-Rechnung` +* Ende einschließlich: `2.1.4 Kapazitätsrechnung` +* Start: `3.1 Anwendungsfälle REMADV` +* Ende einschließlich: `3.1 Anwendungsfälle REMADV` +## MSCONS (Messwerte und Zählerstände) +* Datei: `MSCONS_AHB_2_3c_20201001_v2.pdf` +* Start: `4.2 Anwendungsübersicht Messwert Lastgang` +* Ende einschließlich: `4.2 Anwendungsübersicht Messwert Lastgang` +* Start: `4.4 Anwendungsübersicht Messwert Energiemenge` +* Ende einschließlich: `4.4 Anwendungsübersicht Messwert Energiemenge` +* Start: `4.6 Anwendungsübersicht Messwert Zählerstand` +* Ende einschließlich: `4.6 Anwendungsübersicht Messwert Zählerstand` +* Start: `4.8 Anwendungsübersicht Messwert Storno` +* Ende einschließlich: `4.8 Anwendungsübersicht Messwert Storno` +* Start: `4.10Anwendungsübersicht Bilanzkreissummen` +* Ende einschließlich: `4.10Anwendungsübersicht Bilanzkreissummen` +* Start: `4.12Anwendungsübersicht Normiertes Profil / Profilschar / Vergangenheitsw. TEP` +* Ende eischließlich: `4.12Anwendungsübersicht Normiertes Profil / Profilschar / Vergangenheitsw. TEP` +* Start: `4.14Anwendungsübersicht EEG-Überführungszeitreihen` +* Ende einschließlich: `4.14Anwendungsübersicht EEG-Überführungszeitreihen` +* Start: `4.16Anwendungsübersicht Gasbeschaffenheitsdaten` +* Ende einschließlich: `4.16Anwendungsübersicht Gasbeschaffenheitsdaten` +* Start: `4.18Anwendungsübersicht Allokationsliste Gas / bilanzierte Menge Strom/Gas` +* Ende einschließlich: `4.18Anwendungsübersicht Allokationsliste Gas / bilanzierte Menge Strom/Gas` +* Start: `4.20Anwendungsübersicht Bewegungsdaten im Kalenderjahr vor Lieferbeginn` +* Ende einschließlich: `4.20Anwendungsübersicht Bewegungsdaten im Kalenderjahr vor Lieferbeginn` +* Start: `4.22Anwendungsübersicht Energiemenge und Leistungsmaximum` +* Ende einschließlich: `4.22Anwendungsübersicht Energiemenge und Leistungsmaximum` + +## ORDERS (Bestellung) +* Datei: `REQOTE_QUOTES_ORDERS_ORDRSP_AHB_1_0c_20201001.pdf` +* Start: `3.1.1 Anfrage zur Übermittlung von Stammdaten im Initialprozess` +* Ende einschließlich: `3.5 Reklamation von Werten/Lastgängen` +* Start: `3.6.1 Anforderung eines Geräteübernahmeangebots (REQOTE)` +* Ende einschließlich: `3.10.4 Bestätigung bzw. Ablehnung der Beendigung der Rechnungsabwicklung des Messstellenbetriebs über den LF (ORDRSP` + +## ORDRSP (Bestellantwort) +* Datei: `REQOTE_QUOTES_ORDERS_ORDRSP_AHB_1_0c_20201001.pdf` +* selbe Datei wie `ORDERS (Bestellung)` + +## QUOTES (Angebot) +* Datei: `REQOTE_QUOTES_ORDERS_ORDRSP_AHB_1_0c_20201001.pdf` +* selbe Datei wie `ORDERS (Bestellung)` + +## REQOTE (Anfrage) +* Datei: `REQOTE_QUOTES_ORDERS_ORDRSP_AHB_1_0c_20201001.pdf` +* selbe Datei wie `ORDERS (Bestellung)` + +## UTILMD (Stammdaten) +* Datei: `UTILMD_AHB_Stammdatenänderung_1_1b_20201016.pdf` +* Start: `8.1 Nicht bilanzierungsrelevante Änderung vom LF ohne Abhängigkeiten (jedes Stammdatum kann einzeln übermittelt werden)` +* Ende einschließlich: `9.10 Anfrage zur Stammdatenänderung von ÜNB` -## Contribute -You are very welcome to contribute to this template repository by opening a pull request against the main branch. +## UTILTS (Berechnungsformel) +* Datei: `UTILTS_AHB_Berechnungsformel_1_0b_20201016.pdf` +* Start: `4 Übermittlung der Berechnungsformel` +* Ende einschließlich: `4 Übermittlung der Berechnungsformel` diff --git a/ahbextractor/__init__.py b/ahbextractor/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ahbextractor/__main__.py b/ahbextractor/__main__.py new file mode 100644 index 00000000..7e44740e --- /dev/null +++ b/ahbextractor/__main__.py @@ -0,0 +1,12 @@ +""" +This file makes the package itself executable. +You can run it with: + + python -m ahbextractor + +""" + +from ahbextractor import ahbextractor + +if __name__ == "__main__": + ahbextractor.main() diff --git a/ahbextractor/ahbextractor.py b/ahbextractor/ahbextractor.py new file mode 100644 index 00000000..6eadbba5 --- /dev/null +++ b/ahbextractor/ahbextractor.py @@ -0,0 +1,40 @@ +""" +Main script of the AHB Extractor +""" + +from pathlib import Path + +import docx + +from ahbextractor.helper.read_functions import get_ahb_extract + + +def main(): + """ + Main function of the module ahbextractor. + It reads the docx file and calls the function to extract all Prüfindentifikatoren tables. + """ + + input_directory_path = Path.cwd() / "documents" + ahb_file_name = "UTILMD_AHB_WiM_3_1c_2021_04_01_2021_03_30.docx" + path_to_ahb_file = input_directory_path / ahb_file_name + + output_directory_path = Path.cwd() / "output" + path_to_all_in_one_excel = output_directory_path / "xlsx" / f"{ahb_file_name[:-5]}.xlsx" + + # Remove old "all in one excel file" if it already exists + if path_to_all_in_one_excel.exists(): + path_to_all_in_one_excel.unlink(missing_ok=False) + + try: + doc = docx.Document(path_to_ahb_file) # Creating word reader object. + + except IOError: + print(f"There was an error opening the file {ahb_file_name}!") + return + + get_ahb_extract(document=doc, output_directory_path=output_directory_path, ahb_file_name=ahb_file_name) + + +if __name__ == "__main__": + main() diff --git a/ahbextractor/helper/__init__.py b/ahbextractor/helper/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ahbextractor/helper/check_row_type.py b/ahbextractor/helper/check_row_type.py new file mode 100644 index 00000000..f54a6884 --- /dev/null +++ b/ahbextractor/helper/check_row_type.py @@ -0,0 +1,192 @@ +""" +This module contains all functions to define the type of a row of the tables in an AHB. +""" + +from enum import Enum + +from docx.shared import RGBColor +from docx.table import _Cell + + +class RowType(Enum): + """All possible row types. + + The RowType is defined by the first cell in each row. + + Example content for each row type: + + SEGMENTNAME: "Nachrichten-Kopfsegment" + SEGMENTGRUPPE: "SG2" + SEGMENT: " UNH" or "SG2 NAD" + DATENELEMENT: " UNH 0062" + HEADER: "EDIFACT Struktur" + EMPTY: "" + + """ + + SEGMENTNAME = 1 + SEGMENTGRUPPE = 2 + SEGMENT = 3 + DATENELEMENT = 4 + HEADER = 5 + EMPTY = 6 + + +def is_row_header(edifact_struktur_cell: _Cell) -> bool: + """Checks if the current row is a header. + + Args: + edifact_struktur_cell (_Cell): Indicator cell + + Returns: + bool: + """ + if edifact_struktur_cell.text == "EDIFACT Struktur": + return True + + return False + + +def is_row_segmentname(edifact_struktur_cell: _Cell) -> bool: + """Checks if the current row contains just a segment name. + Example: "Nachrichten-Kopfsegment" + + Args: + edifact_struktur_cell (_Cell): Indicator cell + + Returns: + bool: + """ + try: + if edifact_struktur_cell.paragraphs[0].runs[0].font.color.rgb == RGBColor(128, 128, 128): # grey + return True + except IndexError: + return False + + return False + + +def is_row_segmentgruppe(edifact_struktur_cell: _Cell, left_indent_position: int) -> bool: + """Checks if the current row is a segmentgruppe. + Example: "SG2" + + Args: + edifact_struktur_cell (_Cell): Indicator cell + left_indent_position (int): Position of the left indent + + Returns: + bool: + """ + if ( + not edifact_struktur_cell.paragraphs[0].paragraph_format.left_indent == left_indent_position + and not "\t" in edifact_struktur_cell.text + and not edifact_struktur_cell.text == "" + ): + return True + + return False + + +def is_row_segment(edifact_struktur_cell: _Cell, left_indent_position: int) -> bool: + """Checks if the current row is a segment. + Example: "UNH", "SG2\tNAD" + + Args: + edifact_struktur_cell (_Cell): Indicator cell + left_indent_position (int): Position of the left indent + + Returns: + bool: + """ + # | UNH | + if ( + edifact_struktur_cell.paragraphs[0].paragraph_format.left_indent == left_indent_position + and not "\t" in edifact_struktur_cell.text + and not edifact_struktur_cell.text == "" + ): + return True + + # | SG2\tNAD | + if ( + not edifact_struktur_cell.paragraphs[0].paragraph_format.left_indent == left_indent_position + and edifact_struktur_cell.text.count("\t") == 1 + ): + return True + + return False + + +def is_row_datenelement(edifact_struktur_cell: _Cell, left_indent_position: int) -> bool: + """Checks if the current row is a datenelement. + Example: "UNH\t00062", "SG2\tNAD\t3035" + + Args: + edifact_struktur_cell (_Cell): Indicator cell + left_indent_position (int): Position of the left indent + + Returns: + bool: + """ + # | UNH\t0062 | + if ( + edifact_struktur_cell.paragraphs[0].paragraph_format.left_indent == left_indent_position + and "\t" in edifact_struktur_cell.text + ): + return True + + # | SG2\tNAD\t3035 | + if ( + not edifact_struktur_cell.paragraphs[0].paragraph_format.left_indent == left_indent_position + and edifact_struktur_cell.text.count("\t") == 2 + ): + return True + + return False + + +def is_row_empty(edifact_struktur_cell: _Cell) -> bool: + """Checks if the current row is empty. + Example: "" + Args: + edifact_struktur_cell (_Cell): Indicator cell + + Returns: + bool: + """ + if edifact_struktur_cell.text == "": + return True + return False + + +def define_row_type(edifact_struktur_cell: _Cell, left_indent_position: int) -> RowType: + """Defines the type of the current row. + + Args: + edifact_struktur_cell (_Cell): Indicator cell + left_indent_position (int): Position of the left indent + + Raises: + NotImplemented: Gets raised if the RowType got not to be defined + + Returns: + RowType: Type of the current row + """ + if is_row_header(edifact_struktur_cell=edifact_struktur_cell): + return RowType.HEADER + + if is_row_segmentname(edifact_struktur_cell=edifact_struktur_cell): + return RowType.SEGMENTNAME + + if is_row_segmentgruppe(edifact_struktur_cell=edifact_struktur_cell, left_indent_position=left_indent_position): + return RowType.SEGMENTGRUPPE + + if is_row_segment(edifact_struktur_cell=edifact_struktur_cell, left_indent_position=left_indent_position): + return RowType.SEGMENT + + if is_row_datenelement(edifact_struktur_cell=edifact_struktur_cell, left_indent_position=left_indent_position): + return RowType.DATENELEMENT + + if is_row_empty(edifact_struktur_cell=edifact_struktur_cell): + return RowType.EMPTY + + raise NotImplementedError(f"Could not define row type of cell with text: {edifact_struktur_cell.text}") diff --git a/ahbextractor/helper/export_functions.py b/ahbextractor/helper/export_functions.py new file mode 100644 index 00000000..b60910a2 --- /dev/null +++ b/ahbextractor/helper/export_functions.py @@ -0,0 +1,132 @@ +"""Collections of functions which are needed to export the created DataFrame into a file. + +The possible file types are: + * csv + * json + * xlsx + +You can save each Prüfidentifikator into a separate file or +save all Prüfidentifikators of one AHB in one Excel file. +""" + +import re +from pathlib import Path + +import pandas as pd + + +def beautify_bedingungen(bedingung: str) -> str: + """Inserts newline characters before each Bedingung key [###] + + Example: + [12] Wenn SG4 + DTM+471 (Ende zum + nächstmöglichem + Termin) nicht vorhanden + + [13] Wenn SG4 + STS+E01++Z01 (Status + der Antwort: Zustimmung + mit Terminänderung) + nicht vorhanden + + -> + + [12] Wenn SG4 DTM+471 (Ende zum nächstmöglichem Termin) nicht vorhanden + [13] Wenn SG4 STS+E01++Z01 (Status der Antwort: Zustimmung mit Terminänderung) nicht vorhanden + + Args: + bedingung (str): Text in a Bedingung cell + + Returns: + str: Beautified text with one Bedingung per line + """ + + if isinstance(bedingung, str): + bedingung = bedingung.replace("\n", " ") + matches = re.findall(r"\[\d*\]", bedingung) + for match in matches[1:]: + index = bedingung.find(match) + bedingung = bedingung[:index] + "\n" + bedingung[index:] + return bedingung + + +# pylint: disable=too-many-locals +def export_single_pruefidentifikator(pruefi: str, df: pd.DataFrame, output_directory_path: Path): + """Exports the current Prüfidentifikator in different file formats: json, csv and xlsx + Each Prüfidentifikator is saved in an extra file. + + Args: + pruefi (str): Current Prüfidentifikator + df (pd.DataFrame): DataFrame which contains all information + output_directory_path (Path): Path to the output directory + """ + + json_output_directory_path = output_directory_path / "json" + csv_output_directory_path = output_directory_path / "csv" + xlsx_output_directory_path = output_directory_path / "xlsx" + + json_output_directory_path.mkdir(parents=True, exist_ok=True) + csv_output_directory_path.mkdir(parents=True, exist_ok=True) + xlsx_output_directory_path.mkdir(parents=True, exist_ok=True) + + # write for each pruefi an extra file + columns_to_export = list(df.columns)[:5] + [pruefi] + columns_to_export.append("Bedingung") + # df["Bedingung"] = df["Bedingung"].apply(beautify_bedingungen) + df_to_export = df[columns_to_export] + df_to_export.to_csv(csv_output_directory_path / f"{pruefi}.csv") + + df_to_export.to_json(json_output_directory_path / f"{pruefi}.json", force_ascii=False, orient="records") + + try: + # https://github.com/PyCQA/pylint/issues/3060 pylint: disable=abstract-class-instantiated + with pd.ExcelWriter(xlsx_output_directory_path / f"{pruefi}.xlsx", engine="xlsxwriter") as writer: + df_to_export.to_excel(writer, sheet_name=f"{pruefi}") + # pylint: disable=no-member + workbook = writer.book + worksheet = writer.sheets[f"{pruefi}"] + wrap_format = workbook.add_format({"text_wrap": True}) + column_letters = ["A", "B", "C", "D", "E", "F", "G", "H"] + column_widths = [3.5, 47, 9, 14, 39, 33, 18, 102] + for column_letter, column_width in zip(column_letters, column_widths): + excel_header = f"{column_letter}:{column_letter}" + worksheet.set_column(excel_header, column_width, wrap_format) + print(f"💾 Saved files for Pruefidentifikator {pruefi}") + except PermissionError: + print(f"💥 The Excel file {pruefi}.xlsx is open. Please close this file and try again.") + + +def export_all_pruefidentifikatoren_in_one_file( + pruefi: str, df: pd.DataFrame, output_directory_path: Path, file_name: str +): + """Exports all Prüfidentifikatoren in one AHB into **one** Excel file + + Args: + pruefi (str): Current Prüfidentifikator + df (pd.DataFrame): DataFrame which contains all information + output_directory_path (Path): Path to the output directory + file_name (str): Name of the read AHB file + """ + + xlsx_output_directory_path = output_directory_path / "xlsx" + xlsx_output_directory_path.mkdir(parents=True, exist_ok=True) + + path_to_all_in_one_excel = xlsx_output_directory_path / f"{file_name[:-5]}.xlsx" + + # write for each pruefi an extra file + # take the first five column header's names and add the current pruefi + columns_to_export = list(df.columns)[:5] + [pruefi] + columns_to_export.append("Bedingung") + df_to_export = df[columns_to_export] + + try: + # https://github.com/PyCQA/pylint/issues/3060 pylint: disable=abstract-class-instantiated + with pd.ExcelWriter(path=path_to_all_in_one_excel, mode="a", engine="openpyxl") as writer: + df_to_export.to_excel(writer, sheet_name=f"{pruefi}") + except FileNotFoundError: + # https://github.com/PyCQA/pylint/issues/3060 pylint: disable=abstract-class-instantiated + with pd.ExcelWriter(path=path_to_all_in_one_excel, mode="w", engine="openpyxl") as writer: + df_to_export.to_excel(writer, sheet_name=f"{pruefi}") + except PermissionError: + print(f"💥 The Excel file {file_name[:-5]}.xlsx is open. Please close this file and try again.") diff --git a/ahbextractor/helper/read_functions.py b/ahbextractor/helper/read_functions.py new file mode 100644 index 00000000..2b6e90d7 --- /dev/null +++ b/ahbextractor/helper/read_functions.py @@ -0,0 +1,334 @@ +""" +A collection of functions to get information from AHB tables. +""" + +from pathlib import Path +from typing import List, Tuple, Union + +import pandas as pd +from docx.document import Document +from docx.oxml.table import CT_Tbl +from docx.oxml.text.paragraph import CT_P +from docx.table import Table, _Cell +from docx.text.paragraph import Paragraph + +from ahbextractor.helper.check_row_type import RowType, define_row_type +from ahbextractor.helper.export_functions import ( + export_all_pruefidentifikatoren_in_one_file, + export_single_pruefidentifikator, +) +from ahbextractor.helper.write_functions import write_new_row_in_dataframe + + +def get_all_paragraphs_and_tables(parent): + """ + Yield each paragraph and table child within *parent*, in document order. + Each returned value is an instance of either Table or Paragraph. + *parent* would most commonly be a reference to a main Document object, but + also works for a _Cell object, which itself can contain paragraphs and tables. + """ + # pylint: disable=protected-access + if isinstance(parent, Document): + parent_elm = parent.element.body + elif isinstance(parent, _Cell): + parent_elm = parent._tc + else: + raise ValueError("Passed parent argument must be of type Document or _Cell") + + for child in parent_elm.iterchildren(): + if isinstance(child, CT_P): + yield Paragraph(child, parent) + elif isinstance(child, CT_Tbl): + yield Table(child, parent) + + +def get_tabstop_positions(paragraph: Paragraph) -> List[int]: + """Find all tabstop positions in a given paragraph. + + Mainly the tabstop positions of cells from the middle column are determined + + Args: + paragraph (Paragraph): + + Returns: + List[int]: All tabstop positions in the given paragraph + """ + tabstop_positions: List = [] + # pylint: disable=protected-access + for tabstop in paragraph.paragraph_format.tab_stops._pPr.tabs: + tabstop_positions.append(tabstop.pos) + return tabstop_positions + + +# pylint: disable=too-many-arguments +def read_table( + table: Table, + dataframe: pd.DataFrame, + current_df_row_index: int, + last_two_row_types: List[RowType], + edifact_struktur_cell_left_indent_position: int, + middle_cell_left_indent_position: int, + tabstop_positions: List[int], +) -> Tuple[List[RowType], int]: + """ + Iterates through all rows in a given table and writes all extracted infos in a DataFrame. + + Args: + table (Table): Current table in the docx + dataframe (pd.DataFrame): Contains all infos of the Prüfidentifikators + current_df_row_index (int): Current row of the dataframe + last_two_row_types (List[RowType]): Contains the two last RowType. Is needed for the case of empty rows. + edifact_struktur_cell_left_indent_position (int): Position of the left indent in the + indicator edifact struktur cell + middle_cell_left_indent_position (int): Position of the left indent in the indicator middle cell + tabstop_positions (List[int]): All tabstop positions of the indicator middle cell + + Returns: + Tuple[List[RowType], int]: Last two RowTypes and the new row index for the DataFrame + """ + # pylint: disable=protected-access + if table._column_count == 4: + index_for_middle_column = 2 + else: + index_for_middle_column = 1 + + for row in range(len(table.rows)): + + # initial empty list for the next row in the dataframe + dataframe.loc[current_df_row_index] = (len(dataframe.columns)) * [""] + + row_cell_texts_as_list = [cell.text for cell in table.row_cells(row)] + + # pylint: disable=protected-access + if table._column_count == 4: + # remove redundant information for tables with 4 columns + if ( + row_cell_texts_as_list[0] == row_cell_texts_as_list[1] + and row_cell_texts_as_list[2] == row_cell_texts_as_list[3] + ): + # pylint: disable=line-too-long + # HEADER looks like + # 0:'EDIFACT Struktur' + # 1:'EDIFACT Struktur' + # 2:'Beschreibung\tKündigung\tBestätigung\tAblehnung\tBedingung\n\tMSB \tKündigung\tKündigung\n\tMSB \tMSB \nKommunikation von\tMSBN an\tMSBA an\tMSBA an\n\tMSBA\tMSBN\tMSBN\nPrüfidentifikator\t11039\t11040\t11041' + # 3:'Beschreibung\tKündigung\tBestätigung\tAblehnung\tBedingung\n\tMSB \tKündigung\tKündigung\n\tMSB \tMSB \nKommunikation von\tMSBN an\tMSBA an\tMSBA an\n\tMSBA\tMSBN\tMSBN\nPrüfidentifikator\t11039\t11040\t11041' + # len():4 + del row_cell_texts_as_list[1] + row_cell_texts_as_list[2] = "" + elif row_cell_texts_as_list[1] == row_cell_texts_as_list[2]: + # Dataelement row with header in the table + # 0:'SG2\tNAD\t3035' + # 1:'SG2\tNAD\t3035' + # 2:'MR\tNachrichtenempfänger\tX\tX\tX' + # 3:'' + # len():4 + del row_cell_texts_as_list[1] + elif row_cell_texts_as_list[0] == row_cell_texts_as_list[1]: + del row_cell_texts_as_list[1] + + current_edifact_struktur_cell = table.row_cells(row)[0] + + # check for row type + current_row_type = define_row_type( + edifact_struktur_cell=current_edifact_struktur_cell, + left_indent_position=edifact_struktur_cell_left_indent_position, + ) + + # write actual row into dataframe + if not (current_row_type is RowType.EMPTY and last_two_row_types[0] is RowType.HEADER): + current_df_row_index = write_new_row_in_dataframe( + row_type=current_row_type, + table=table, + row=row, + index_for_middle_column=index_for_middle_column, + dataframe=dataframe, + dataframe_row_index=current_df_row_index, + edifact_struktur_cell_left_indent_position=edifact_struktur_cell_left_indent_position, + middle_cell_left_indent_position=middle_cell_left_indent_position, + tabstop_positions=tabstop_positions, + ) + + else: + current_df_row_index = write_new_row_in_dataframe( + row_type=last_two_row_types[1], + table=table, + row=row, + index_for_middle_column=index_for_middle_column, + dataframe=dataframe, + dataframe_row_index=current_df_row_index, + edifact_struktur_cell_left_indent_position=edifact_struktur_cell_left_indent_position, + middle_cell_left_indent_position=middle_cell_left_indent_position, + tabstop_positions=tabstop_positions, + ) + + # remember last row type for empty cells + last_two_row_types[1] = last_two_row_types[0] + last_two_row_types[0] = current_row_type + + return last_two_row_types, current_df_row_index + + +def initial_setup_for_tables_with_pruefidentifikatoren( + item: Union[Paragraph, Table] +) -> Tuple[List[str], pd.DataFrame, int, int, List[int], List[RowType], int]: + """Prepare DataFrame for a new table with new Prüfidentifikatoren + + + + Args: + item (Union[Paragraph, Table]): A paragraph or table from the docx + + Returns: + Tuple[List[str], pd.DataFrame, int, int, List[int], List[RowType], int]: Returns + all detected Prüfidentifikatoren, + prepared DataFrame, + left intend position of the Edifact struktur cell, + left intend position of the middle cell, + list of tabstop positions of the middle cell, + list of the last two RowTypes, + the current row index for the DataFrame + """ + header_cells = [cell.text for cell in item.row_cells(0)] + look_up_term = "Prüfidentifikator" + cutter_index = header_cells[-1].find(look_up_term) + 1 + # +1 cause of \t after Prüfidentifikator + pruefidentifikatoren: List = header_cells[-1][cutter_index + len(look_up_term) :].split("\t") + + # edifact struktur cell + edifact_struktur_indicator_paragraph = item.cell(row_idx=4, col_idx=0).paragraphs[0] + edifact_struktur_left_indent_position = edifact_struktur_indicator_paragraph.paragraph_format.left_indent + + # middle cell + middle_cell_indicator_paragraph = item.cell(row_idx=4, col_idx=1).paragraphs[0] + middle_cell_left_indent_position = middle_cell_indicator_paragraph.paragraph_format.left_indent + tabstop_positions: List = get_tabstop_positions(middle_cell_indicator_paragraph) + + base_columns: List = [ + "Segment Gruppe", + "Segment", + "Datenelement", + "Codes und Qualifier", + "Beschreibung", + ] + columns = base_columns + pruefidentifikatoren + columns.append("Bedingung") + + df = pd.DataFrame( + columns=columns, + dtype="str", + ) + # Initialize help variables + last_two_row_types: List = [RowType.EMPTY, RowType.EMPTY] + current_df_row_index: int = 0 + + return ( + pruefidentifikatoren, + df, + edifact_struktur_left_indent_position, + middle_cell_left_indent_position, + tabstop_positions, + last_two_row_types, + current_df_row_index, + ) + + +# pylint: disable=inconsistent-return-statements +def get_ahb_extract(document: Document, output_directory_path: Path, ahb_file_name: str) -> int: + """Reads a docx file and extracts all information for each Prüfidentifikator. + + Args: + document (Document): AHB which is read by python-docx package + output_directory_path (Path): Location of the output files + ahb_file_name (str): Name of the AHB document + + Returns: + int: Error code, 0 means success + """ + + pruefidentifikatoren: List = [] + + # Iterate through the whole word document + for item in get_all_paragraphs_and_tables(parent=document): + + # Check if there is just a text paragraph, + if isinstance(item, Paragraph) and not "Heading" in item.style.name: + continue + + # Check if the paragraph is a chapter or section title + if isinstance(item, Paragraph) and "Heading" in item.style.name: + current_chapter_title = item.text + + # Stop iterating at the section "Änderungshistorie" + if current_chapter_title == "Änderungshistorie": + # export last pruefidentifikatoren in AHB + for pruefi in pruefidentifikatoren: + + export_single_pruefidentifikator( + pruefi=pruefi, + df=df, + output_directory_path=output_directory_path, + ) + + export_all_pruefidentifikatoren_in_one_file( + pruefi=pruefi, + df=df, + output_directory_path=output_directory_path, + file_name=ahb_file_name, + ) + + # I don't know how to exit the program without a return + return 0 + + # Check if a table comes with new Prüfidentifikatoren + elif isinstance(item, Table) and item.cell(row_idx=0, col_idx=0).text == "EDIFACT Struktur": + # before we go to the next pruefidentifikatoren we save the current ones + # but at the first loop we check if list of pruefidentifikatoren is empty + if pruefidentifikatoren: + for pruefi in pruefidentifikatoren: + + export_single_pruefidentifikator( + pruefi=pruefi, + df=df, + output_directory_path=output_directory_path, + ) + + export_all_pruefidentifikatoren_in_one_file( + pruefi=pruefi, + df=df, + output_directory_path=output_directory_path, + file_name=ahb_file_name, + ) + + # Prepare a DataFrame, get all characteristic postions and initialize help variables + ( + pruefidentifikatoren, + df, + edifact_struktur_left_indent_position, + middle_cell_left_indent_position, + tabstop_positions, + last_two_row_types, + current_df_row_index, + ) = initial_setup_for_tables_with_pruefidentifikatoren(item=item) + + print("\n🔍 Extracting Pruefidentifikatoren:", ", ".join(pruefidentifikatoren)) + + last_two_row_types, current_df_row_index = read_table( + table=item, + dataframe=df, + current_df_row_index=current_df_row_index, + last_two_row_types=last_two_row_types, + edifact_struktur_cell_left_indent_position=edifact_struktur_left_indent_position, + middle_cell_left_indent_position=middle_cell_left_indent_position, + tabstop_positions=tabstop_positions, + ) + + elif isinstance(item, Table) and "df" in locals(): + last_two_row_types, current_df_row_index = read_table( + table=item, + dataframe=df, + current_df_row_index=current_df_row_index, + last_two_row_types=last_two_row_types, + edifact_struktur_cell_left_indent_position=edifact_struktur_left_indent_position, + middle_cell_left_indent_position=middle_cell_left_indent_position, + tabstop_positions=tabstop_positions, + ) diff --git a/ahbextractor/helper/write_functions.py b/ahbextractor/helper/write_functions.py new file mode 100644 index 00000000..bee063ef --- /dev/null +++ b/ahbextractor/helper/write_functions.py @@ -0,0 +1,517 @@ +""" +Collection of functions to write the extracted infos from the AHB tables into a DataFrame. +""" + +import re +from typing import List + +import pandas as pd +from docx.table import Table, _Cell +from docx.text.paragraph import Paragraph + +from ahbextractor.helper.check_row_type import RowType + + +def parse_paragraph_in_edifact_struktur_column_to_dataframe( + paragraph: Paragraph, + dataframe: pd.DataFrame, + row_index: int, + edifact_struktur_cell_left_indent_position: int, +): + """Parses a paragraph in the edifact struktur column and puts the information into the appropriate columns + + Args: + paragraph (Paragraph): Current paragraph in the edifact struktur cell + dataframe (pd.DataFrame): Contains all infos + row_index (int): Current index of the DataFrame + edifact_struktur_cell_left_indent_position (int): Position of the left indent from the indicator edifact + struktur cell + """ + + splitted_text_at_tabs = paragraph.text.split("\t") + tab_count = paragraph.text.count("\t") + + # Check if the line starts on the far left + if paragraph.paragraph_format.left_indent != edifact_struktur_cell_left_indent_position: + + if tab_count == 2: + dataframe.at[row_index, "Segment Gruppe"] = splitted_text_at_tabs[0] + dataframe.at[row_index, "Segment"] = splitted_text_at_tabs[1] + dataframe.at[row_index, "Datenelement"] = splitted_text_at_tabs[2] + elif tab_count == 1: + dataframe.at[row_index, "Segment Gruppe"] = splitted_text_at_tabs[0] + dataframe.at[row_index, "Segment"] = splitted_text_at_tabs[1] + elif tab_count == 0 and not paragraph.text == "": + if paragraph.runs[0].bold: + # Segmentgruppe: SG8 + dataframe.at[row_index, "Segment Gruppe"] = splitted_text_at_tabs[0] + else: + # Segmentname: Referenzen auf die ID der\nTranche + if dataframe.at[row_index, "Segment Gruppe"] == "": + # Referenzen auf die ID der + dataframe.at[row_index, "Segment Gruppe"] = splitted_text_at_tabs[0] + else: + # Tranche + dataframe.at[row_index, "Segment Gruppe"] += " " + splitted_text_at_tabs[0] + + # Now the text should start in middle of the EDIFACT Struktur column + else: + + if tab_count == 1: + # Example: "UNH\t0062" + dataframe.at[row_index, "Segment"] = splitted_text_at_tabs[0] + dataframe.at[row_index, "Datenelement"] = splitted_text_at_tabs[1] + + elif tab_count == 0: + # Example: "UNH" + dataframe.at[row_index, "Segment"] = splitted_text_at_tabs[0] + + +def parse_paragraph_in_middle_column_to_dataframe( + paragraph: Paragraph, + dataframe: pd.DataFrame, + row_index: int, + left_indent_position: int, + tabstop_positions: List[int], +): + """Parses a paragraph in the middle column and puts the information into the appropriate columns + + Args: + paragraph (Paragraph): Current paragraph in the edifact struktur cell + dataframe (pd.DataFrame): Contains all infos + row_index (int): Current index of the DataFrame + left_indent_position (int): Position of the left indent from the indicator middle cell + tabstop_positions (List[int]): All tabstop positions of the indicator middle cell + """ + + splitted_text_at_tabs = paragraph.text.split("\t") + + # Qualifier / Code + # left_indent_position is characteristic for Datenelemente + if paragraph.paragraph_format.left_indent == left_indent_position: + dataframe.at[row_index, "Codes und Qualifier"] += splitted_text_at_tabs.pop(0) + column_indezes = list(range(4, 4 + len(tabstop_positions))) + + else: + if splitted_text_at_tabs[0] == "": + tabstop_positions = tabstop_positions[1:] + del splitted_text_at_tabs[0] + + column_indezes = list(range(5, 5 + len(tabstop_positions))) + + # pylint: disable=protected-access + tab_stops = paragraph.paragraph_format.tab_stops._pPr.tabs + + if tab_stops is not None: + for tabstop in tab_stops: + for tabstop_position, column_index in zip(tabstop_positions, column_indezes): + if tabstop.pos == tabstop_position: + dataframe.iat[row_index, column_index] += splitted_text_at_tabs.pop(0) + elif tab_stops is None and splitted_text_at_tabs: + # in splitted_text_at_tabs list must be an entry + dataframe.at[row_index, "Beschreibung"] += splitted_text_at_tabs.pop(0) + elif tab_stops is None: + pass + # Could not figure out a scenario where this error could be raised. + # else: + # raise NotImplementedError(f"Could not parse paragraphe in middle cell with {paragraph.text}") + + +def parse_bedingung_cell(bedingung_cell: _Cell, dataframe: pd.DataFrame, row_index: int): + """Parses a cell in the Bedingung column and puts the information into the in the appropriate column + + Args: + bedingung_cell (_Cell): Cell from the Bedingung column + dataframe (pd.DataFrame): Saves all infos + row_index (int): Current index of the DataFrame + """ + + bedingung = bedingung_cell.text.replace("\n", " ") + matches = re.findall(r"\[\d*\]", bedingung) + for match in matches[1:]: + index = bedingung.find(match) + bedingung = bedingung[:index] + "\n" + bedingung[index:] + + dataframe.at[row_index, "Bedingung"] += bedingung + + +# pylint: disable=too-many-arguments +def write_segment_name_to_dataframe( + dataframe: pd.DataFrame, + row_index: int, + edifact_struktur_cell: _Cell, + edifact_struktur_cell_left_indent_position: int, + middle_cell: _Cell, + middle_cell_left_indent_position: int, + tabstop_positions: List[int], + bedingung_cell: _Cell, +): + """Writes all infos from a segment name row into a DataFrame. + + Args: + dataframe (pd.DataFrame): Saves all infos + row_index (int): Current index of the DataFrame + edifact_struktur_cell (_Cell): Cell from the edifact struktur column + edifact_struktur_cell_left_indent_position (int): Position of the left indent from the indicator edifact + struktur cell + middle_cell (_Cell): Cell from the middle column + middle_cell_left_indent_position (int): Position of the left indent from the indicator middle cell + tabstop_positions (List[int]): All tabstop positions of the indicator middle cell + bedingung_cell (_Cell): Cell from the Bedingung column + """ + + # EDIFACT STRUKTUR COLUMN + for paragraph in edifact_struktur_cell.paragraphs: + parse_paragraph_in_edifact_struktur_column_to_dataframe( + paragraph=paragraph, + dataframe=dataframe, + row_index=row_index, + edifact_struktur_cell_left_indent_position=edifact_struktur_cell_left_indent_position, + ) + + # MIDDLE COLUMN + # I do not expect to a multiline Segementgruppe, + # but just in case we loop through all paragraphs + for paragraph in middle_cell.paragraphs: + parse_paragraph_in_middle_column_to_dataframe( + paragraph=paragraph, + dataframe=dataframe, + row_index=row_index, + left_indent_position=middle_cell_left_indent_position, + tabstop_positions=tabstop_positions, + ) + + # BEDINGUNG COLUMN + parse_bedingung_cell(bedingung_cell=bedingung_cell, dataframe=dataframe, row_index=row_index) + + +# pylint: disable=too-many-arguments +def write_segmentgruppe_to_dataframe( + dataframe: pd.DataFrame, + row_index: int, + edifact_struktur_cell: _Cell, + edifact_struktur_cell_left_indent_position: int, + middle_cell: _Cell, + middle_cell_left_indent_position: int, + tabstop_positions: List[int], + bedingung_cell: _Cell, +): + """Writes all infos from a segmentgruppe row into a DataFrame. + + Args: + dataframe (pd.DataFrame): Saves all infos + row_index (int): Current index of the DataFrame + edifact_struktur_cell (_Cell): Cell from the edifact struktur column + edifact_struktur_cell_left_indent_position (int): Position of the left indent from the indicator edifact + struktur cell + middle_cell (_Cell): Cell from the middle column + middle_cell_left_indent_position (int): Position of the left indent from the indicator middle cell + tabstop_positions (List[int]): All tabstop positions of the indicator middle cell + bedingung_cell (_Cell): Cell from the Bedingung column + """ + + # EDIFACT STRUKTUR COLUMN + parse_paragraph_in_edifact_struktur_column_to_dataframe( + paragraph=edifact_struktur_cell.paragraphs[0], + dataframe=dataframe, + row_index=row_index, + edifact_struktur_cell_left_indent_position=edifact_struktur_cell_left_indent_position, + ) + + # MIDDLE COLUMN + # I do not expect to a multiline Segementgruppe, + # but just in case we loop through all paragraphs + for paragraph in middle_cell.paragraphs: + parse_paragraph_in_middle_column_to_dataframe( + paragraph=paragraph, + dataframe=dataframe, + row_index=row_index, + left_indent_position=middle_cell_left_indent_position, + tabstop_positions=tabstop_positions, + ) + + # BEDINGUNG COLUMN + parse_bedingung_cell(bedingung_cell=bedingung_cell, dataframe=dataframe, row_index=row_index) + + +# pylint: disable=too-many-arguments +def write_segment_to_dataframe( + dataframe: pd.DataFrame, + row_index: int, + edifact_struktur_cell: _Cell, + edifact_struktur_cell_left_indent_position: int, + middle_cell: _Cell, + middle_cell_left_indent_position: int, + tabstop_positions: List[int], + bedingung_cell: _Cell, +): + """Writes all infos from a segment row into a DataFrame. + + Args: + dataframe (pd.DataFrame): Saves all infos + row_index (int): Current index of the DataFrame + edifact_struktur_cell (_Cell): Cell from the edifact struktur column + edifact_struktur_cell_left_indent_position (int): Position of the left indent from the indicator edifact + struktur cell + middle_cell (_Cell): Cell from the middle column + middle_cell_left_indent_position (int): Position of the left indent from the indicator middle cell + tabstop_positions (List[int]): All tabstop positions of the indicator middle cell + bedingung_cell (_Cell): Cell from the Bedingung column + """ + + # EDIFACT STRUKTUR COLUMN + parse_paragraph_in_edifact_struktur_column_to_dataframe( + paragraph=edifact_struktur_cell.paragraphs[0], + dataframe=dataframe, + row_index=row_index, + edifact_struktur_cell_left_indent_position=edifact_struktur_cell_left_indent_position, + ) + + # MIDDLE COLUMN + for paragraph in middle_cell.paragraphs: + parse_paragraph_in_middle_column_to_dataframe( + paragraph=paragraph, + dataframe=dataframe, + row_index=row_index, + left_indent_position=middle_cell_left_indent_position, + tabstop_positions=tabstop_positions, + ) + + # BEDINGUNG COLUMN + parse_bedingung_cell(bedingung_cell=bedingung_cell, dataframe=dataframe, row_index=row_index) + + +def count_matching(condition, condition_argument, seq): + """Returns the amount of items in seq that return true from condition""" + return sum(condition(item, condition_argument) for item in seq) + + +def code_condition(paragraph: Paragraph, pruefi_tabstops: List[int]) -> bool: + """Checks if the paragraph contains a Code by checking for bold style. + + Example for Codes: UTILMD, 11A, UN, + + + Args: + paragraph (Paragraph): Current paragraph + pruefi_tabstops (List[int]): All tabstop positions of the indicator middle cell + + Returns: + [bool]: + """ + try: + # pylint: disable=protected-access + tabstop_positions = [tab_position.pos for tab_position in paragraph.paragraph_format.tab_stops._pPr.tabs] + except TypeError: + return False + + if paragraph.runs[0].bold is True and any(x in tabstop_positions for x in pruefi_tabstops): + return True + return False + + +def has_middle_cell_multiple_codes(paragraphs: List[Paragraph], pruefi_tabstops: List[int]) -> bool: + """Checks if the paragraphs of a middle cell contains more than one Code. + + Args: + paragraphs (List[Paragraph]): All paragraphs in the current middle cell + pruefi_tabstops (List[int]): All tabstop positions of the indicator middle cell + + Returns: + bool: + """ + + if count_matching(condition=code_condition, condition_argument=pruefi_tabstops, seq=paragraphs) > 1: + return True + return False + + +# pylint: disable=too-many-arguments +def write_dataelement_to_dataframe( + dataframe: pd.DataFrame, + row_index: int, + edifact_struktur_cell: _Cell, + edifact_struktur_cell_left_indent_position: int, + middle_cell: _Cell, + middle_cell_left_indent_position: int, + tabstop_positions: List[int], + bedingung_cell: _Cell, +): + """Writes all infos from a dataelement row into a DataFrame. + + Args: + dataframe (pd.DataFrame): Contains all infos + row_index (int): Current index of the DataFrame + edifact_struktur_cell (_Cell): Cell from the edifact struktur column + edifact_struktur_cell_left_indent_position (int): Position of the left indent from the indicator edifact + struktur cell + middle_cell (_Cell): Cell from the middle column + middle_cell_left_indent_position (int): Position of the left indent from the indicator middle cell + tabstop_positions (List[int]): All tabstop positions of the indicator middle cell + bedingung_cell (_Cell): Cell from the Bedingung column + """ + + # EDIFACT STRUKTUR COLUMN + parse_paragraph_in_edifact_struktur_column_to_dataframe( + paragraph=edifact_struktur_cell.paragraphs[0], + dataframe=dataframe, + row_index=row_index, + edifact_struktur_cell_left_indent_position=edifact_struktur_cell_left_indent_position, + ) + + # BEDINGUNG COLUMN + # Bedingung have to be parsed before MIDDLE COLUMN + # The cell with multiple codes counts up the row_index + # This will cause then an IndexError for Bedingung + parse_bedingung_cell(bedingung_cell=bedingung_cell, dataframe=dataframe, row_index=row_index) + + # MIDDLE COLUMN + if not has_middle_cell_multiple_codes(paragraphs=middle_cell.paragraphs, pruefi_tabstops=tabstop_positions[1:]): + for paragraph in middle_cell.paragraphs: + parse_paragraph_in_middle_column_to_dataframe( + paragraph=paragraph, + dataframe=dataframe, + row_index=row_index, + left_indent_position=middle_cell_left_indent_position, + tabstop_positions=tabstop_positions, + ) + row_index = row_index + 1 + + else: + # The middle cell contains multiple Codes + + # here we have to look into the next row to see, if we have to add a new datarow or + # if we have to collect more information in the next row which we have to add to the current row + + create_new_dataframe_row_indicator_list: List = [ + paragraph.runs[0].bold is True for paragraph in middle_cell.paragraphs + ] + + for paragraph, i in zip(middle_cell.paragraphs, range(len(create_new_dataframe_row_indicator_list))): + + # For reasons of good readability the EDIFACT Struktur information gets written again + + # EDIFACT STRUKTUR COLUMN + + if edifact_struktur_cell.paragraphs[0].text != "": + parse_paragraph_in_edifact_struktur_column_to_dataframe( + paragraph=edifact_struktur_cell.paragraphs[0], + dataframe=dataframe, + row_index=row_index, + edifact_struktur_cell_left_indent_position=edifact_struktur_cell_left_indent_position, + ) + else: + dataframe.at[row_index, "Segment Gruppe"] = dataframe.loc[row_index - 1, "Segment Gruppe"] + dataframe.at[row_index, "Segment"] = dataframe.loc[row_index - 1, "Segment"] + dataframe.at[row_index, "Datenelement"] = dataframe.loc[row_index - 1, "Datenelement"] + + if paragraph.runs[0].bold: + parse_paragraph_in_middle_column_to_dataframe( + paragraph=paragraph, + dataframe=dataframe, + row_index=row_index, + left_indent_position=middle_cell_left_indent_position, + tabstop_positions=tabstop_positions, + ) + + elif paragraph.paragraph_format.left_indent == tabstop_positions[0]: + # multi line Beschreibung + dataframe.at[row_index, "Beschreibung"] += " " + paragraph.text + + if len(create_new_dataframe_row_indicator_list) > i + 1: + if create_new_dataframe_row_indicator_list[i + 1]: + row_index = row_index + 1 + dataframe.loc[row_index] = (len(dataframe.columns)) * [""] + else: + row_index = row_index + 1 + + return row_index + + +def write_new_row_in_dataframe( + row_type: RowType, + table: Table, + row: int, + index_for_middle_column: int, + dataframe: pd.DataFrame, + dataframe_row_index: int, + edifact_struktur_cell_left_indent_position: int, + middle_cell_left_indent_position: int, + tabstop_positions: List[int], +) -> int: + """Writes the current row of the current table into the DataFrame depending on the type of the row + + Args: + row_type (RowType): Type of the current row + table (Table): Current table + row (int): Row of the current table + index_for_middle_column (int): Index of the actual middle column + dataframe (pd.DataFrame): Contains all infos + dataframe_row_index (int): Current index of the DataFrame + edifact_struktur_cell_left_indent_position (int): Position of the left indent from the indicator edifact + struktur cell + middle_cell_left_indent_position (int): Position of the left indent from the indicator middle cell + tabstop_positions (List[int]): All tabstop positions of the indicator middle cell + + Returns: + [int]: the next DataFrame row index + """ + + if row_type is RowType.HEADER: + pass + + elif row_type is RowType.SEGMENTNAME: + write_segment_name_to_dataframe( + dataframe=dataframe, + row_index=dataframe_row_index, + edifact_struktur_cell=table.row_cells(row)[0], + edifact_struktur_cell_left_indent_position=edifact_struktur_cell_left_indent_position, + middle_cell=table.row_cells(row)[index_for_middle_column], + middle_cell_left_indent_position=middle_cell_left_indent_position, + tabstop_positions=tabstop_positions, + bedingung_cell=table.row_cells(row)[-1], + ) + dataframe_row_index = dataframe_row_index + 1 + + elif row_type is RowType.SEGMENTGRUPPE: + write_segmentgruppe_to_dataframe( + dataframe=dataframe, + row_index=dataframe_row_index, + edifact_struktur_cell=table.row_cells(row)[0], + edifact_struktur_cell_left_indent_position=edifact_struktur_cell_left_indent_position, + middle_cell=table.row_cells(row)[index_for_middle_column], + middle_cell_left_indent_position=middle_cell_left_indent_position, + tabstop_positions=tabstop_positions, + bedingung_cell=table.row_cells(row)[-1], + ) + dataframe_row_index = dataframe_row_index + 1 + + elif row_type is RowType.SEGMENT: + write_segment_to_dataframe( + dataframe=dataframe, + row_index=dataframe_row_index, + edifact_struktur_cell=table.row_cells(row)[0], + edifact_struktur_cell_left_indent_position=edifact_struktur_cell_left_indent_position, + middle_cell=table.row_cells(row)[index_for_middle_column], + middle_cell_left_indent_position=middle_cell_left_indent_position, + tabstop_positions=tabstop_positions, + bedingung_cell=table.row_cells(row)[-1], + ) + dataframe_row_index = dataframe_row_index + 1 + + elif row_type is RowType.DATENELEMENT: + dataframe_row_index = write_dataelement_to_dataframe( + dataframe=dataframe, + row_index=dataframe_row_index, + edifact_struktur_cell=table.row_cells(row)[0], + edifact_struktur_cell_left_indent_position=edifact_struktur_cell_left_indent_position, + middle_cell=table.row_cells(row)[index_for_middle_column], + middle_cell_left_indent_position=middle_cell_left_indent_position, + tabstop_positions=tabstop_positions, + bedingung_cell=table.row_cells(row)[-1], + ) + + elif row_type is RowType.EMPTY: + pass + + return dataframe_row_index diff --git a/mymodule.py b/mymodule.py deleted file mode 100644 index a34c936e..00000000 --- a/mymodule.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -This a docstring for the module. -""" - - -class MyClass: # pylint: disable=too-few-public-methods - """ - This is a docstring for the class. - """ - - def __init__(self): - """ - Initialize for the sake of initializing - """ - self.my_instance_var = "abc" - - def do_something(self) -> str: - """ - Actually does nothing. - :return: the value of an instance variable - """ - # this is a super long line with: 100 < line length <= 120 to demonstrate the purpose of pyproject.toml - return self.my_instance_var diff --git a/pyproject.toml b/pyproject.toml index 6da0fb7c..6a849f46 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,6 +3,8 @@ line-length = 120 [tool.isort] line_length = 120 +profile = "black" [tool.pylint."MESSAGES CONTROL"] max-line-length = 120 +good-names=["i", "j","k", "ex", "Run", "_", "df"] diff --git a/requirements.in b/requirements.in index e69de29b..fead543a 100644 --- a/requirements.in +++ b/requirements.in @@ -0,0 +1,4 @@ +openpyxl +pandas +python-docx +XlsxWriter diff --git a/requirements.txt b/requirements.txt index 9b60a9d9..733cb4de 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,23 @@ # # pip-compile requirements.in # +et-xmlfile==1.1.0 + # via openpyxl +lxml==4.9.1 + # via python-docx +numpy==1.22.0 + # via pandas +openpyxl==3.0.10 + # via -r requirements.in +pandas==1.4.3 + # via -r requirements.in +python-dateutil==2.8.1 + # via pandas +python-docx==0.8.11 + # via -r requirements.in +pytz==2021.1 + # via pandas +six==1.16.0 + # via python-dateutil +xlsxwriter==3.0.3 + # via -r requirements.in diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..5eb11534 --- /dev/null +++ b/setup.py @@ -0,0 +1,11 @@ +from setuptools import find_packages, setup + +setup( + name="ahbextractor", + version="0.0.0", + author="Kevin Krechan", + author_email="kevin.krechan@hochfrequenz.de", + description="Tool to generate machine readable files from AHB documents.", + packages=find_packages(), + install_requires=["openpyxl", "pandas", "python-docx", "XlsxWriter"], +) diff --git a/tox.ini b/tox.ini index 70e351c3..472419fb 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,8 @@ envlist = linting coverage skip_missing_interpreters = True -skipsdist = True +; isolated_build = True +; skipsdist = True [testenv] commands = python -m pip install --upgrade pip @@ -14,7 +15,7 @@ commands = python -m pip install --upgrade pip deps = -rrequirements.txt pytest -commands = python -m pytest --basetemp={envtmpdir} {posargs} +commands = pytest --basetemp={envtmpdir} {posargs} [testenv:linting] # the linting environment is called by the Github Action that runs the linter @@ -23,8 +24,7 @@ deps = pylint # add your fixtures like e.g. pytest_datafiles here commands = - pylint mymodule.py - # add single files (ending with .py) or packages here + pylint ahbextractor [testenv:coverage] # the coverage environment is called by the Github Action that runs the coverage measurement diff --git a/unittests/test_check_row_type.py b/unittests/test_check_row_type.py new file mode 100644 index 00000000..9077ac5e --- /dev/null +++ b/unittests/test_check_row_type.py @@ -0,0 +1,97 @@ +import docx +import pytest +from docx.shared import RGBColor + +from ahbextractor.helper.check_row_type import RowType, define_row_type + + +class TestCheckRowType: + + # this is the touchstone position for the left indent of the edifact struktur cells + # example rows with this left indent position: + # | UNH | + # | UNH\t0062 | + segment_left_indent_position = 364490 + + # example rows with this left indent position: + # | SG2 | + # | SG2\tNAD | + # | SG2\tNAD\t3025 | + segmentgruppe_left_indent_position = 36830 + + @pytest.mark.parametrize( + "text, left_indent_position, font_color, expected", + [ + pytest.param( + "EDIFACT Struktur", segmentgruppe_left_indent_position, RGBColor(0, 0, 0), RowType.HEADER, id="HEADER" + ), + pytest.param( + "Nachrichten-Kopfsegment", + segmentgruppe_left_indent_position, + RGBColor(128, 128, 128), + RowType.SEGMENTNAME, + id="SEGMENTNAME", + ), + pytest.param( + "SG2", segmentgruppe_left_indent_position, RGBColor(0, 0, 0), RowType.SEGMENTGRUPPE, id="SEGMENTGRUPPE" + ), + pytest.param( + "UNH", + segment_left_indent_position, + RGBColor(0, 0, 0), + RowType.SEGMENT, + id="SEGMENT w/o Segmentgruppe", + ), + pytest.param( + "SG2\tNAD", + segmentgruppe_left_indent_position, + RGBColor(0, 0, 0), + RowType.SEGMENT, + id="SEGMENT w Segementgruppe", + ), + pytest.param( + "UNH\t0062", + segment_left_indent_position, + RGBColor(0, 0, 0), + RowType.DATENELEMENT, + id="DATENELEMENT w/o Segmentgruppe", + ), + pytest.param( + "SG2\tNAD\t3035", + segmentgruppe_left_indent_position, + RGBColor(0, 0, 0), + RowType.DATENELEMENT, + id="DATENELEMENT w Segementgruppe", + ), + pytest.param("", 635, RGBColor(0, 0, 0), RowType.EMPTY, id="EMPTY"), + ], # TODO find left indent for empty cell + ) + def test_define_row_type(self, text: str, left_indent_position: int, font_color: RGBColor, expected: RowType): + """Test if all defined row types are identified correctly. + + Args: + text (str): Text content of the test cell + left_indent_position (int): Position of the left indent in arbritrary units + font_color (RGBColor): A class from docx to define colors + expected (RowType): The expected RowType for each test case + """ + # ! Attention: It seems that you can set the left indent only to discret numbers! + # edifact_struktur_left_indent_position = 1270 + + # create table test cell, it contains per default an empty paragraph + test_document = docx.Document() + test_table = test_document.add_table(rows=1, cols=1) + test_cell = test_table.add_row().cells[0] + + # insert text + test_cell.text = text + + # set left indent positon + test_cell.paragraphs[0].paragraph_format.left_indent = left_indent_position + # set font color + test_cell.paragraphs[0].runs[0].font.color.rgb = font_color + + result = define_row_type( + edifact_struktur_cell=test_cell, left_indent_position=self.segment_left_indent_position + ) + assert result == expected diff --git a/unittests/test_myclass.py b/unittests/test_myclass.py deleted file mode 100644 index 75810cca..00000000 --- a/unittests/test_myclass.py +++ /dev/null @@ -1,11 +0,0 @@ -from mymodule import MyClass - - -class TestMyClass: - """ - A class with pytest unit tests. - """ - - def test_something(self): - my_class = MyClass() - assert my_class.do_something() == "abc" diff --git a/unittests/test_write_functions.py b/unittests/test_write_functions.py new file mode 100644 index 00000000..ab3f845b --- /dev/null +++ b/unittests/test_write_functions.py @@ -0,0 +1,1078 @@ +from dataclasses import dataclass +from typing import List + +import docx +import pandas as pd +import pytest + +from ahbextractor.helper.write_functions import ( + parse_bedingung_cell, + parse_paragraph_in_edifact_struktur_column_to_dataframe, + parse_paragraph_in_middle_column_to_dataframe, + write_dataelement_to_dataframe, + write_segment_name_to_dataframe, + write_segment_to_dataframe, + write_segmentgruppe_to_dataframe, +) + + +class TestParseFunctions: + + # create table test cell + # it contains per default an empty paragraph + test_document = docx.Document() + test_table = test_document.add_table(rows=1, cols=1) + test_cell = test_table.add_row().cells[0] + + # this left indent and tabstop positions are equal to + # the left indent and tabstop positions of the indicator paragraph + middle_cell_left_indent_position_of_indicator_paragraph = 36830 + middle_cell_tabstop_positions_of_indicator_paragraph = [436245, 1962785, 2578735, 3192780] + + edifact_struktur_cell_left_indent_position_of_segmentgroup_cells = 36830 + edifact_struktur_cell_left_indent_position_of_indicator_paragraph = 364490 + + @pytest.mark.parametrize( + "text_content, left_indent_position, cell_tabstop_positions, expected_df_row", + [ + pytest.param( + "", + None, + None, + { + "Segment Gruppe": "", + "Segment": "", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "", + "88888": "", + "99999": "", + "Bedingung": "", + }, + id="Segmentname Nachrichten-Kopfsegment", + ), + pytest.param( + "\tMuss\tMuss\tMuss", + None, + middle_cell_tabstop_positions_of_indicator_paragraph, + { + "Segment Gruppe": "", + "Segment": "", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "Muss", + "88888": "Muss", + "99999": "Muss", + "Bedingung": "", + }, + id="First UNH Segment", + ), + pytest.param( + "Nachrichten-Referenznummer\tX\tX\tX", + middle_cell_left_indent_position_of_indicator_paragraph, + middle_cell_tabstop_positions_of_indicator_paragraph[1:], + { + "Segment Gruppe": "", + "Segment": "", + "Datenelement": "", + "Codes und Qualifier": "Nachrichten-Referenznummer", + "Beschreibung": "", + "77777": "X", + "88888": "X", + "99999": "X", + "Bedingung": "", + }, + id="Qualifier", + ), + pytest.param( + "UTILM\tNetzanschluss-\tX\tX\tX", + middle_cell_left_indent_position_of_indicator_paragraph, + middle_cell_tabstop_positions_of_indicator_paragraph, + { + "Segment Gruppe": "", + "Segment": "", + "Datenelement": "", + "Codes und Qualifier": "UTILM", + "Beschreibung": "Netzanschluss-", + "77777": "X", + "88888": "X", + "99999": "X", + "Bedingung": "", + }, + id="First Dataelement", + ), + pytest.param( + "D\tStammdaten", + middle_cell_left_indent_position_of_indicator_paragraph, + middle_cell_tabstop_positions_of_indicator_paragraph[0:1], + { + "Segment Gruppe": "", + "Segment": "", + "Datenelement": "", + "Codes und Qualifier": "D", + "Beschreibung": "Stammdaten", + "77777": "", + "88888": "", + "99999": "", + "Bedingung": "", + }, + id="Dataelement without conditions", + ), + pytest.param( + "zugrundeliegenden", + middle_cell_tabstop_positions_of_indicator_paragraph[0], + None, + { + "Segment Gruppe": "", + "Segment": "", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "zugrundeliegenden", + "77777": "", + "88888": "", + "99999": "", + "Bedingung": "", + }, + id="only Beschreibung", + ), + pytest.param( + "\tMuss [16] U\n", + None, + middle_cell_tabstop_positions_of_indicator_paragraph[-1:], + { + "Segment Gruppe": "", + "Segment": "", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "", + "88888": "", + "99999": "Muss [16] U\n", + "Bedingung": "", + }, + id="Just one entry for one Prüfidentifikator", + ), + ], + ) + def test_parse_paragraph_in_middle_column( + self, text_content, left_indent_position, cell_tabstop_positions, expected_df_row + ): + + # insert text + self.test_cell.text = text_content + test_paragraph = self.test_cell.paragraphs[0] + + # set left indent positon + test_paragraph.paragraph_format.left_indent = left_indent_position + + tab_stops = test_paragraph.paragraph_format.tab_stops + # Length: https://python-docx.readthedocs.io/en/latest/api/shared.html#docx.shared.Length + + if cell_tabstop_positions is not None: + for tabstop_position in cell_tabstop_positions: + tab_stops.add_tab_stop(tabstop_position) + + # Initial two dataframes ... + df = pd.DataFrame(columns=expected_df_row.keys(), dtype="str") + expected_df = pd.DataFrame(columns=expected_df_row.keys(), dtype="str") + row_index = 0 + # ... with a row full of emtpy strings + initial_dataframe_row = (len(df.columns)) * [""] + df.loc[row_index] = initial_dataframe_row + expected_df.loc[row_index] = initial_dataframe_row + + parse_paragraph_in_middle_column_to_dataframe( + paragraph=test_paragraph, + dataframe=df, + row_index=row_index, + left_indent_position=self.middle_cell_left_indent_position_of_indicator_paragraph, + tabstop_positions=self.middle_cell_tabstop_positions_of_indicator_paragraph, + ) + + expected_df.loc[row_index] = expected_df_row + + assert expected_df.equals(df) + + # def test_not_implemented_middle_cell_paragraph(self): + # # insert text + # self.test_cell.text = "" + # test_paragraph = self.test_cell.paragraphs[0] + + # # set left indent positon + # test_paragraph.paragraph_format.left_indent = None + + # df = pd.DataFrame(dtype="str") + # row_index = 0 + + # with pytest.raises(NotImplementedError) as excinfo: + # parse_paragraph_in_middle_column_to_dataframe( + # paragraph=test_paragraph, + # dataframe=df, + # row_index=row_index, + # left_indent_position=self.left_indent_position_of_indicator_paragraph, + # tabstop_positions=self.tabstop_positions_of_indicator_paragraph, + # ) + + # assert "Could not parse paragraphe in middle cell with " in str(excinfo.value) + + @pytest.mark.parametrize( + "text_content, left_indent_position, expected_df_row", + [ + pytest.param( + "Nachrichten-Kopfsegment", + edifact_struktur_cell_left_indent_position_of_segmentgroup_cells, + { + "Segment Gruppe": "Nachrichten-Kopfsegment", + "Segment": "", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "", + "88888": "", + "99999": "", + "Bedingung": "", + }, + id="Segmentname", + ), + pytest.param( + "UNH", + edifact_struktur_cell_left_indent_position_of_indicator_paragraph, + { + "Segment Gruppe": "", + "Segment": "UNH", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "", + "88888": "", + "99999": "", + "Bedingung": "", + }, + id="Segment", + ), + pytest.param( + "UNH\t0062", + edifact_struktur_cell_left_indent_position_of_indicator_paragraph, + { + "Segment Gruppe": "", + "Segment": "UNH", + "Datenelement": "0062", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "", + "88888": "", + "99999": "", + "Bedingung": "", + }, + id="Segment with Dataelement", + ), + pytest.param( + "SG2", + edifact_struktur_cell_left_indent_position_of_segmentgroup_cells, + { + "Segment Gruppe": "SG2", + "Segment": "", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "", + "88888": "", + "99999": "", + "Bedingung": "", + }, + id="Segmentgruppe", + ), + pytest.param( + "SG2\tNAD", + edifact_struktur_cell_left_indent_position_of_segmentgroup_cells, + { + "Segment Gruppe": "SG2", + "Segment": "NAD", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "", + "88888": "", + "99999": "", + "Bedingung": "", + }, + id="Segmentgruppe with Segment", + ), + pytest.param( + "SG2\tNAD\t3035", + edifact_struktur_cell_left_indent_position_of_segmentgroup_cells, + { + "Segment Gruppe": "SG2", + "Segment": "NAD", + "Datenelement": "3035", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "", + "88888": "", + "99999": "", + "Bedingung": "", + }, + id="Segmentgruppe with Segment and Datenelement", + ), + ], + ) + def test_parse_paragraph_in_edifact_struktur_column_to_dataframe( + self, text_content, left_indent_position, expected_df_row + ): + + # insert text + self.test_cell.text = text_content + test_paragraph = self.test_cell.paragraphs[0] + + # set left indent positon + test_paragraph.paragraph_format.left_indent = left_indent_position + + # Initial two dataframes ... + df = pd.DataFrame(columns=expected_df_row.keys(), dtype="str") + expected_df = pd.DataFrame(columns=expected_df_row.keys(), dtype="str") + row_index = 0 + # ... with a row full of emtpy strings + initial_dataframe_row = (len(df.columns)) * [""] + df.loc[row_index] = initial_dataframe_row + expected_df.loc[row_index] = initial_dataframe_row + + parse_paragraph_in_edifact_struktur_column_to_dataframe( + paragraph=test_paragraph, + dataframe=df, + row_index=row_index, + edifact_struktur_cell_left_indent_position=self.edifact_struktur_cell_left_indent_position_of_indicator_paragraph, + ) + + expected_df.loc[row_index] = expected_df_row + + assert expected_df.equals(df) + + @pytest.mark.parametrize( + "text_content, expected_df_row", + [ + pytest.param( + "", + { + "Segment Gruppe": "", + "Segment": "", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "", + "88888": "", + "99999": "", + "Bedingung": "", + }, + id="Empty Bedingung", + ), + pytest.param( + """[12] Wenn SG4 +DTM+471 (Ende zum +nächstmöglichem +Termin) nicht vorhanden + +[13] Wenn SG4 +STS+E01++Z01 (Status +der Antwort: Zustimmung +mit Terminänderung) +nicht vorhanden +""", + { + "Segment Gruppe": "", + "Segment": "", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "", + "88888": "", + "99999": "", + "Bedingung": """[12] Wenn SG4 DTM+471 (Ende zum nächstmöglichem Termin) nicht vorhanden \n[13] Wenn SG4 STS+E01++Z01 (Status der Antwort: Zustimmung mit Terminänderung) nicht vorhanden """, + }, + id="First Bedingung in UTILMD AHB WiM", + ), + ], + ) + def test_parse_bedingung_cell(self, text_content, expected_df_row): + + # insert text + self.test_cell.text = text_content + + # Initial two dataframes ... + df = pd.DataFrame(columns=expected_df_row.keys(), dtype="str") + expected_df = pd.DataFrame(columns=expected_df_row.keys(), dtype="str") + row_index = 0 + # ... with a row full of emtpy strings + initial_dataframe_row = (len(df.columns)) * [""] + df.loc[row_index] = initial_dataframe_row + expected_df.loc[row_index] = initial_dataframe_row + + parse_bedingung_cell( + bedingung_cell=self.test_cell, + dataframe=df, + row_index=row_index, + ) + + expected_df.loc[row_index] = expected_df_row + + assert expected_df.equals(df) + + +@dataclass +class _Paragraph: + text: str + tabstop_positions: List[int] + left_indent_position: int + is_bold: bool = False + + +class TestWriteFunctions: + + # this left indent and tabstop positions are equal to + # the left indent and tabstop positions of the indicator paragraph + middle_cell_left_indent_position_of_indicator_paragraph = 36830 + middle_cell_tabstop_positions_of_indicator_paragraph = [436245, 1962785, 2578735, 3192780] + + edifact_struktur_cell_left_indent_position_of_indicator_paragraph = 364490 + edifact_struktur_cell_left_indent_position_of_segmentgroup_cells = 36830 + edifact_struktur_cell_tabstop_positions = [364490, 692150] + + def _prepare_docx_table_row(self, row_cells, expected_df_rows): + # create table test cell + # it contains per default an empty paragraph + test_document = docx.Document() + test_table = test_document.add_table(rows=1, cols=3) + + self.edifact_struktur_cell = test_table.add_row().cells[0] + self.middle_cell = test_table.add_row().cells[1] + self.bedingung_cell = test_table.add_row().cells[2] + + # prepare edifact struktur cell + current_paragraph = self.edifact_struktur_cell.paragraphs[0] + + current_paragraph.text = row_cells["edifact_struktur_cell"][0].text + current_paragraph.paragraph_format.left_indent = row_cells["edifact_struktur_cell"][0].left_indent_position + + if row_cells["edifact_struktur_cell"][0].tabstop_positions is not None: + for tabstop_position in row_cells["edifact_struktur_cell"][0].tabstop_positions: + self.edifact_struktur_cell.paragraphs[0].paragraph_format.tab_stops.add_tab_stop(tabstop_position) + + for _paragraph, i in zip( + row_cells["edifact_struktur_cell"][1:], range(1, len(row_cells["edifact_struktur_cell"])) + ): + self.edifact_struktur_cell.add_paragraph() + current_paragraph = self.edifact_struktur_cell.paragraphs[i] + current_paragraph.text = _paragraph.text + + current_paragraph.paragraph_format.left_indent = _paragraph.left_indent_position + current_tab_stops = current_paragraph.paragraph_format.tab_stops + + if _paragraph.tabstop_positions is not None: + for tabstop_position in _paragraph.tabstop_positions: + current_tab_stops.add_tab_stop(tabstop_position) + + # prepare middle cell + current_paragraph = self.middle_cell.paragraphs[0] + + current_paragraph.text = row_cells["middle_cell"][0].text + current_paragraph.runs[0].bold = row_cells["middle_cell"][0].is_bold + current_paragraph.paragraph_format.left_indent = row_cells["middle_cell"][0].left_indent_position + + if row_cells["middle_cell"][0].tabstop_positions is not None: + for tabstop_position in row_cells["middle_cell"][0].tabstop_positions: + self.middle_cell.paragraphs[0].paragraph_format.tab_stops.add_tab_stop(tabstop_position) + + for _paragraph, i in zip(row_cells["middle_cell"][1:], range(1, len(row_cells["middle_cell"]))): + self.middle_cell.add_paragraph() + current_paragraph = self.middle_cell.paragraphs[i] + current_paragraph.text = _paragraph.text + current_paragraph.runs[0].bold = row_cells["middle_cell"][i].is_bold + + current_paragraph.paragraph_format.left_indent = _paragraph.left_indent_position + current_tab_stops = current_paragraph.paragraph_format.tab_stops + + if _paragraph.tabstop_positions is not None: + for tabstop_position in _paragraph.tabstop_positions: + current_tab_stops.add_tab_stop(tabstop_position) + + # prepare bedingung cell + current_paragraph = self.bedingung_cell.paragraphs[0] + current_paragraph.text = row_cells["bedingung_cell"] + + # Initial two dataframes ... + df = pd.DataFrame(columns=expected_df_rows[0].keys(), dtype="str") + expected_df = pd.DataFrame(columns=expected_df_rows[0].keys(), dtype="str") + row_index = 0 + # ... with a row full of emtpy strings + initial_dataframe_row = (len(df.columns)) * [""] + df.loc[row_index] = initial_dataframe_row + expected_df.loc[row_index] = initial_dataframe_row + + return row_index, df, expected_df + + @pytest.mark.parametrize( + "row_cells, expected_df_rows", + [ + pytest.param( + { + "edifact_struktur_cell": [ + _Paragraph( + text="Nachrichten-Kopfsegment", + tabstop_positions=None, + left_indent_position=edifact_struktur_cell_left_indent_position_of_segmentgroup_cells, + ) + ], + "middle_cell": [_Paragraph(text="", tabstop_positions=None, left_indent_position=None)], + "bedingung_cell": "", + }, + [ + { + "Segment Gruppe": "Nachrichten-Kopfsegment", + "Segment": "", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "", + "88888": "", + "99999": "", + "Bedingung": "", + } + ], + id="Segmentname: Nachrichten-Kopfsegment", + ), + pytest.param( + { + "edifact_struktur_cell": [ + _Paragraph( + text="Ende zum", + tabstop_positions=None, + left_indent_position=edifact_struktur_cell_left_indent_position_of_segmentgroup_cells, + ), + _Paragraph( + text="(nächstmöglichem Termin)", + tabstop_positions=None, + left_indent_position=edifact_struktur_cell_left_indent_position_of_segmentgroup_cells, + ), + ], + "middle_cell": [_Paragraph(text="", tabstop_positions=None, left_indent_position=None)], + "bedingung_cell": "", + }, + [ + { + "Segment Gruppe": "Ende zum (nächstmöglichem Termin)", + "Segment": "", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "", + "88888": "", + "99999": "", + "Bedingung": "", + } + ], + id="Ende zum ...", + ), + ], + ) + def test_write_segment_name_to_dataframe( + self, + row_cells, + expected_df_rows, + ): + row_index, df, expected_df = self._prepare_docx_table_row(row_cells, expected_df_rows) + + write_segment_name_to_dataframe( + dataframe=df, + row_index=row_index, + edifact_struktur_cell=self.edifact_struktur_cell, + edifact_struktur_cell_left_indent_position=self.edifact_struktur_cell_left_indent_position_of_indicator_paragraph, + middle_cell=self.middle_cell, + middle_cell_left_indent_position=self.middle_cell_left_indent_position_of_indicator_paragraph, + tabstop_positions=self.middle_cell_tabstop_positions_of_indicator_paragraph, + bedingung_cell=self.bedingung_cell, + ) + + expected_df.loc[row_index] = expected_df_rows[0] + + assert expected_df.equals(df) + + @pytest.mark.parametrize( + "row_cells, expected_df_rows", + [ + pytest.param( + { + "edifact_struktur_cell": [ + _Paragraph( + text="SG2", + tabstop_positions=None, + left_indent_position=edifact_struktur_cell_left_indent_position_of_segmentgroup_cells, + ) + ], + "middle_cell": [ + _Paragraph( + text="\tMuss\tMuss\tMuss", + tabstop_positions=middle_cell_tabstop_positions_of_indicator_paragraph[1:], + left_indent_position=None, + ) + ], + "bedingung_cell": "", + }, + [ + { + "Segment Gruppe": "SG2", + "Segment": "", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "Muss", + "88888": "Muss", + "99999": "Muss", + "Bedingung": "", + } + ], + id="SG2", + ), + pytest.param( + { + "edifact_struktur_cell": [ + _Paragraph( + text="SG6", + tabstop_positions=None, + left_indent_position=edifact_struktur_cell_left_indent_position_of_segmentgroup_cells, + ) + ], + "middle_cell": [ + _Paragraph( + text="\tMuss\tMuss", + tabstop_positions=middle_cell_tabstop_positions_of_indicator_paragraph[2:], + left_indent_position=None, + ) + ], + "bedingung_cell": "", + }, + [ + { + "Segment Gruppe": "SG6", + "Segment": "", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "", + "88888": "Muss", + "99999": "Muss", + "Bedingung": "", + } + ], + id="SG6 Referenz Vorgangsnummer", + ), + pytest.param( + { + "edifact_struktur_cell": [ + _Paragraph( + text="SG8", + tabstop_positions=None, + left_indent_position=edifact_struktur_cell_left_indent_position_of_segmentgroup_cells, + ) + ], + "middle_cell": [ + _Paragraph( + text="\tMuss [138]\tMuss", + tabstop_positions=middle_cell_tabstop_positions_of_indicator_paragraph[1:3], + left_indent_position=None, + ) + ], + "bedingung_cell": """[138] Wenn SG5 +LOC+172 (Meldepunkt) +nicht vorhanden""", + }, + [ + { + "Segment Gruppe": "SG8", + "Segment": "", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "Muss [138]", + "88888": "Muss", + "99999": "", + "Bedingung": "[138] Wenn SG5 LOC+172 (Meldepunkt) nicht vorhanden", + } + ], + id="SG8", + ), + ], + ) + def test_write_segmentgruppe_to_dataframe( + self, + row_cells, + expected_df_rows, + ): + row_index, df, expected_df = self._prepare_docx_table_row(row_cells, expected_df_rows) + + write_segmentgruppe_to_dataframe( + dataframe=df, + row_index=row_index, + edifact_struktur_cell=self.edifact_struktur_cell, + edifact_struktur_cell_left_indent_position=self.edifact_struktur_cell_left_indent_position_of_indicator_paragraph, + middle_cell=self.middle_cell, + middle_cell_left_indent_position=self.middle_cell_left_indent_position_of_indicator_paragraph, + tabstop_positions=self.middle_cell_tabstop_positions_of_indicator_paragraph, + bedingung_cell=self.bedingung_cell, + ) + + expected_df.loc[row_index] = expected_df_rows[0] + + assert expected_df.equals(df) + + @pytest.mark.parametrize( + "row_cells, expected_df_rows", + [ + pytest.param( + { + "edifact_struktur_cell": [ + _Paragraph( + text="UNH", + tabstop_positions=None, + left_indent_position=edifact_struktur_cell_left_indent_position_of_indicator_paragraph, + ) + ], + "middle_cell": [ + _Paragraph( + text="\tMuss\tMuss\tMuss", + tabstop_positions=middle_cell_tabstop_positions_of_indicator_paragraph[1:], + left_indent_position=None, + ) + ], + "bedingung_cell": "", + }, + [ + { + "Segment Gruppe": "", + "Segment": "UNH", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "Muss", + "88888": "Muss", + "99999": "Muss", + "Bedingung": "", + } + ], + id="UNH", + ), + pytest.param( + { + "edifact_struktur_cell": [ + _Paragraph( + text="SG2\tNAD", + tabstop_positions=edifact_struktur_cell_tabstop_positions[:1], + left_indent_position=edifact_struktur_cell_left_indent_position_of_segmentgroup_cells, + ) + ], + "middle_cell": [ + _Paragraph( + text="\tMuss\tMuss\tMuss", + tabstop_positions=middle_cell_tabstop_positions_of_indicator_paragraph[1:], + left_indent_position=None, + ) + ], + "bedingung_cell": "", + }, + [ + { + "Segment Gruppe": "SG2", + "Segment": "NAD", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "Muss", + "88888": "Muss", + "99999": "Muss", + "Bedingung": "", + } + ], + id="Segmentgroup, Segment and Bedingung", + ), + pytest.param( + { + "edifact_struktur_cell": [ + _Paragraph( + text="SG4\tSTS", + tabstop_positions=edifact_struktur_cell_tabstop_positions[:1], + left_indent_position=edifact_struktur_cell_left_indent_position_of_segmentgroup_cells, + ) + ], + "middle_cell": [ + _Paragraph( + text="\tMuss [249]\tMuss [249]", + tabstop_positions=middle_cell_tabstop_positions_of_indicator_paragraph[2:], + left_indent_position=None, + ) + ], + "bedingung_cell": """[249] Innerhalb eines +SG4 IDE müssen alle +DE1131 der SG4 +STS+E01 den +identischen Wert +enthalten""", + }, + [ + { + "Segment Gruppe": "SG4", + "Segment": "STS", + "Datenelement": "", + "Codes und Qualifier": "", + "Beschreibung": "", + "77777": "", + "88888": "Muss [249]", + "99999": "Muss [249]", + "Bedingung": "[249] Innerhalb eines SG4 IDE müssen alle DE1131 der SG4 STS+E01 den identischen Wert enthalten", + } + ], + id="Segmentgroup and Segment", + ), + ], + ) + def test_write_segment_to_dataframe( + self, + row_cells, + expected_df_rows, + ): + row_index, df, expected_df = self._prepare_docx_table_row(row_cells, expected_df_rows) + + write_segment_to_dataframe( + dataframe=df, + row_index=row_index, + edifact_struktur_cell=self.edifact_struktur_cell, + edifact_struktur_cell_left_indent_position=self.edifact_struktur_cell_left_indent_position_of_indicator_paragraph, + middle_cell=self.middle_cell, + middle_cell_left_indent_position=self.middle_cell_left_indent_position_of_indicator_paragraph, + tabstop_positions=self.middle_cell_tabstop_positions_of_indicator_paragraph, + bedingung_cell=self.bedingung_cell, + ) + + expected_df.loc[row_index] = expected_df_rows[0] + + assert expected_df.equals(df) + + @pytest.mark.parametrize( + "row_cells, expected_df_rows", + [ + pytest.param( + { + "edifact_struktur_cell": [ + _Paragraph( + text="UNH\t0062", + tabstop_positions=edifact_struktur_cell_tabstop_positions[1:], + left_indent_position=edifact_struktur_cell_left_indent_position_of_indicator_paragraph, + ) + ], + "middle_cell": [ + _Paragraph( + text="Nachrichten-Referenznummer\tX\tX\tX", + tabstop_positions=middle_cell_tabstop_positions_of_indicator_paragraph[1:], + left_indent_position=middle_cell_left_indent_position_of_indicator_paragraph, + ) + ], + "bedingung_cell": "", + }, + [ + { + "Segment Gruppe": "", + "Segment": "UNH", + "Datenelement": "0062", + "Codes und Qualifier": "Nachrichten-Referenznummer", + "Beschreibung": "", + "77777": "X", + "88888": "X", + "99999": "X", + "Bedingung": "", + } + ], + id="UNH\t0062", + ), + pytest.param( + { + "edifact_struktur_cell": [ + _Paragraph( + text="UNH\t0065", + tabstop_positions=edifact_struktur_cell_tabstop_positions[:1], + left_indent_position=edifact_struktur_cell_left_indent_position_of_indicator_paragraph, + ) + ], + "middle_cell": [ + _Paragraph( + text="UTILM\tNetzanschluss-\tX\tX\tX", + tabstop_positions=middle_cell_tabstop_positions_of_indicator_paragraph, + left_indent_position=middle_cell_left_indent_position_of_indicator_paragraph, + ), + _Paragraph( + text="D\tStammdaten", + tabstop_positions=middle_cell_tabstop_positions_of_indicator_paragraph[:1], + left_indent_position=middle_cell_left_indent_position_of_indicator_paragraph, + ), + ], + "bedingung_cell": "", + }, + [ + { + "Segment Gruppe": "", + "Segment": "UNH", + "Datenelement": "0065", + "Codes und Qualifier": "UTILMD", + "Beschreibung": "Netzanschluss-Stammdaten", + "77777": "X", + "88888": "X", + "99999": "X", + "Bedingung": "", + } + ], + id="UNH\t0065", + ), + ], + ) + def test_single_line_write_dataelement_to_dataframe( + self, + row_cells, + expected_df_rows, + ): + + row_index, df, expected_df = self._prepare_docx_table_row(row_cells, expected_df_rows) + + write_dataelement_to_dataframe( + dataframe=df, + row_index=row_index, + edifact_struktur_cell=self.edifact_struktur_cell, + edifact_struktur_cell_left_indent_position=self.edifact_struktur_cell_left_indent_position_of_indicator_paragraph, + middle_cell=self.middle_cell, + middle_cell_left_indent_position=self.middle_cell_left_indent_position_of_indicator_paragraph, + tabstop_positions=self.middle_cell_tabstop_positions_of_indicator_paragraph, + bedingung_cell=self.bedingung_cell, + ) + + expected_df.loc[row_index] = expected_df_rows[0] + + assert expected_df.equals(df) + + @pytest.mark.parametrize( + "row_cells, expected_df_rows", + [ + pytest.param( + { + "edifact_struktur_cell": [ + _Paragraph( + text="SG2\tNAD\t3055", + tabstop_positions=edifact_struktur_cell_tabstop_positions, + left_indent_position=edifact_struktur_cell_left_indent_position_of_segmentgroup_cells, + ) + ], + "middle_cell": [ + _Paragraph( + text="9\tGS1\tX\tX\tX", + is_bold=True, + tabstop_positions=middle_cell_tabstop_positions_of_indicator_paragraph, + left_indent_position=middle_cell_left_indent_position_of_indicator_paragraph, + ), + _Paragraph( + text="293\tDE, BDEW\tX\tX\tX", + is_bold=True, + tabstop_positions=middle_cell_tabstop_positions_of_indicator_paragraph, + left_indent_position=middle_cell_left_indent_position_of_indicator_paragraph, + ), + _Paragraph( + text="(Bundesverband der", + is_bold=False, + tabstop_positions=None, + left_indent_position=middle_cell_tabstop_positions_of_indicator_paragraph[0], + ), + _Paragraph( + text="Energie- und", + is_bold=False, + tabstop_positions=None, + left_indent_position=middle_cell_tabstop_positions_of_indicator_paragraph[0], + ), + _Paragraph( + text="Wasserwirtschaft e.V.)", + is_bold=False, + tabstop_positions=None, + left_indent_position=middle_cell_tabstop_positions_of_indicator_paragraph[0], + ), + _Paragraph( + text="332\tDE, DVGW Service &\tX\tX\tX", + is_bold=True, + tabstop_positions=middle_cell_tabstop_positions_of_indicator_paragraph, + left_indent_position=middle_cell_left_indent_position_of_indicator_paragraph, + ), + _Paragraph( + text="Consult GmbH", + is_bold=False, + tabstop_positions=None, + left_indent_position=middle_cell_tabstop_positions_of_indicator_paragraph[0], + ), + ], + "bedingung_cell": "", + }, + [ + { + "Segment Gruppe": "SG2", + "Segment": "NAD", + "Datenelement": "3055", + "Codes und Qualifier": "9", + "Beschreibung": "GS1", + "77777": "X", + "88888": "X", + "99999": "X", + "Bedingung": "", + }, + { + "Segment Gruppe": "SG2", + "Segment": "NAD", + "Datenelement": "3055", + "Codes und Qualifier": "293", + "Beschreibung": "DE, BDEW (Bundesverband der Energie- und Wasserwirtschaft e.V.)", + "77777": "X", + "88888": "X", + "99999": "X", + "Bedingung": "", + }, + { + "Segment Gruppe": "SG2", + "Segment": "NAD", + "Datenelement": "3055", + "Codes und Qualifier": "332", + "Beschreibung": "DE, DVGW Service & Consult GmbH", + "77777": "X", + "88888": "X", + "99999": "X", + "Bedingung": "", + }, + ], + id="SG2\tNAD\t3055", + ), + ], + ) + def test_multi_line_write_dataelement_to_dataframe( + self, + row_cells, + expected_df_rows, + ): + row_index, df, expected_df = self._prepare_docx_table_row(row_cells, expected_df_rows) + + write_dataelement_to_dataframe( + dataframe=df, + row_index=row_index, + edifact_struktur_cell=self.edifact_struktur_cell, + edifact_struktur_cell_left_indent_position=self.edifact_struktur_cell_left_indent_position_of_indicator_paragraph, + middle_cell=self.middle_cell, + middle_cell_left_indent_position=self.middle_cell_left_indent_position_of_indicator_paragraph, + tabstop_positions=self.middle_cell_tabstop_positions_of_indicator_paragraph, + bedingung_cell=self.bedingung_cell, + ) + + for expected_df_row in expected_df_rows: + + expected_df.loc[row_index] = expected_df_row + row_index += 1 + + assert expected_df.equals(df)