diff --git a/dcicutils/sheet_utils.py b/dcicutils/sheet_utils.py index 9633999d8..db310aeb2 100644 --- a/dcicutils/sheet_utils.py +++ b/dcicutils/sheet_utils.py @@ -1,11 +1,19 @@ import copy +from dcicutils.common import AnyJsonData from openpyxl import load_workbook from openpyxl.worksheet.worksheet import Worksheet from openpyxl.workbook.workbook import Workbook from typing import Any, Dict, List, Optional, Union +Header = str +Headers = List[str] +ParsedHeader = List[Union[str, int]] +ParsedHeaders = List[ParsedHeader] +SheetCellValue = Union[int, float, str] + + class WorkbookManager: @classmethod @@ -16,30 +24,30 @@ def load_workbook(cls, filename: str): def __init__(self, filename: str): self.filename: str = filename self.workbook: Optional[Workbook] = None - self.headers_by_sheetname: Dict[List[str]] = {} - self.content_by_sheetname: Dict[List[Any]] = {} + self.headers_by_sheetname: Dict[str, List[str]] = {} + self.content_by_sheetname: Dict[str, List[Any]] = {} - def sheet_headers(self, sheet: Worksheet) -> List[str]: - return self.headers_by_sheetname[sheet.title] + def sheet_headers(self, sheetname: str) -> List[str]: + return self.headers_by_sheetname[sheetname] - def sheet_content(self, sheet: Worksheet) -> List[Any]: - return self.content_by_sheetname[sheet.title] + def sheet_content(self, sheetname: str) -> List[Any]: + return self.content_by_sheetname[sheetname] @classmethod - def all_rows(cls, sheet: Worksheet): + def _all_rows(cls, sheet: Worksheet): row_max = sheet.max_row for row in range(2, row_max + 1): yield row @classmethod - def all_cols(cls, sheet: Worksheet): + def _all_cols(cls, sheet: Worksheet): col_max = sheet.max_column for col in range(1, col_max + 1): yield col - def load_headers(self, sheet: Worksheet): + def _load_headers(self, sheet: Worksheet): headers: List[str] = [str(sheet.cell(row=1, column=col).value) - for col in self.all_cols(sheet)] + for col in self._all_cols(sheet)] self.headers_by_sheetname[sheet.title] = headers def load_content(self): @@ -47,25 +55,44 @@ def load_content(self): self.workbook = workbook for sheetname in workbook.sheetnames: sheet: Worksheet = workbook[sheetname] - self.load_headers(sheet) + self._load_headers(sheet) content = [] - for row in self.all_rows(sheet): - row_dict = self.load_row(sheet=sheet, row=row) + for row in self._all_rows(sheet): + row_dict = self._load_row(sheet=sheet, row=row) content.append(row_dict) self.content_by_sheetname[sheetname] = content return self.content_by_sheetname - def load_row(self, *, sheet: Worksheet, row: int): - headers = self.sheet_headers(sheet) + def _load_row(self, *, sheet: Worksheet, row: int): + headers = self.sheet_headers(sheet.title) row_dict: Dict[str, Any] = {headers[col-1]: sheet.cell(row=row, column=col).value - for col in self.all_cols(sheet)} + for col in self._all_cols(sheet)} return row_dict class ItemTools: + """ + Implements operations on table-related data without pre-supposing the specific representation of the table. + It is assumed this can be used for data that was obtained from .json, .csv, .tsv, and .xlsx files because + it does not presuppose the source of the data nor where it will be written to. + + For the purpose of this class: + + * a 'header' is a string representing the top of a column. + + * a 'parsed header' is a list of strings and/or ints, after splitting at uses of '#' or '.', so that + "a.b.c" is represented as ["a", "b", "c"], and "x.y#0" is represented as ["x", "y", 0], and representing + each numeric token as an int instead of a string. + + * a 'headers' object is just a list of strings, each of which is a 'header'. + + * a 'parsed headers' object is a non-empty list of lists, each of which is a 'parsed header'. + e..g., the headers ["a.b.c", "x.y#0"] is represented as parsed hearders [["a", "b", "c"], ["x", "y", 0]]. + + """ @classmethod - def compute_patch_prototype(cls, parsed_headers): + def compute_patch_prototype(cls, parsed_headers: ParsedHeaders): prototype = {} for parsed_header in parsed_headers: parsed_header0 = parsed_header[0] @@ -75,7 +102,7 @@ def compute_patch_prototype(cls, parsed_headers): return prototype @classmethod - def assure_patch_prototype_shape(cls, *, parent: Union[Dict, List], keys: List[Union[int, str]]): + def assure_patch_prototype_shape(cls, *, parent: Union[Dict, List], keys: ParsedHeader): [key0, *more_keys] = keys key1 = more_keys[0] if more_keys else None if isinstance(key1, int): @@ -98,12 +125,12 @@ def assure_patch_prototype_shape(cls, *, parent: Union[Dict, List], keys: List[U return parent @classmethod - def parse_sheet_headers(cls, headers): + def parse_sheet_headers(cls, headers: Headers): return [cls.parse_sheet_header(header) for header in headers] @classmethod - def parse_sheet_header(cls, header) -> List[Union[int, str]]: + def parse_sheet_header(cls, header: Header) -> ParsedHeader: result = [] token = "" for i in range(len(header)): @@ -119,7 +146,7 @@ def parse_sheet_header(cls, header) -> List[Union[int, str]]: return result @classmethod - def set_path_value(cls, datum, path, value, force=False): + def set_path_value(cls, datum: Union[List, Dict], path: ParsedHeader, value: Any, force: bool = False): if (value is None or value == '') and not force: return [key, *more_path] = path @@ -129,7 +156,7 @@ def set_path_value(cls, datum, path, value, force=False): cls.set_path_value(datum[key], more_path, value) @classmethod - def parse_value(cls, value): + def parse_value(cls, value: SheetCellValue) -> AnyJsonData: if isinstance(value, str): lvalue = value.lower() # TODO: We could consult a schema to make this less heuristic, but this may do for now @@ -153,7 +180,7 @@ def parse_value(cls, value): except Exception: pass return value - else: # probably a number + else: # presumably a number (int or float) return value @@ -161,30 +188,30 @@ class ItemManager(ItemTools, WorkbookManager): def __init__(self, filename: str): super().__init__(filename=filename) - self.patch_prototypes_by_sheetname: Dict[Dict] = {} - self.parsed_headers_by_sheetname: Dict[List[List[Union[int, str]]]] = {} + self.patch_prototypes_by_sheetname: Dict[str, Dict] = {} + self.parsed_headers_by_sheetname: Dict[str, List[List[Union[int, str]]]] = {} - def sheet_patch_prototype(self, sheet: Worksheet) -> Dict: - return self.patch_prototypes_by_sheetname[sheet.title] + def sheet_patch_prototype(self, sheetname: str) -> Dict: + return self.patch_prototypes_by_sheetname[sheetname] - def sheet_parsed_headers(self, sheet: Worksheet) -> List[List[Union[int, str]]]: - return self.parsed_headers_by_sheetname[sheet.title] + def sheet_parsed_headers(self, sheetname: str) -> List[List[Union[int, str]]]: + return self.parsed_headers_by_sheetname[sheetname] - def load_headers(self, sheet: Worksheet): - super().load_headers(sheet) - self.compile_sheet_headers(sheet) + def _load_headers(self, sheet: Worksheet): + super()._load_headers(sheet) + self._compile_sheet_headers(sheet.title) - def compile_sheet_headers(self, sheet: Worksheet): - headers = self.headers_by_sheetname[sheet.title] + def _compile_sheet_headers(self, sheetname: str): + headers = self.headers_by_sheetname[sheetname] parsed_headers = self.parse_sheet_headers(headers) - self.parsed_headers_by_sheetname[sheet.title] = parsed_headers + self.parsed_headers_by_sheetname[sheetname] = parsed_headers prototype = self.compute_patch_prototype(parsed_headers) - self.patch_prototypes_by_sheetname[sheet.title] = prototype + self.patch_prototypes_by_sheetname[sheetname] = prototype - def load_row(self, *, sheet: Worksheet, row: int): - parsed_headers = self.sheet_parsed_headers(sheet) - patch_item = copy.deepcopy(self.sheet_patch_prototype(sheet)) - for col in self.all_cols(sheet): + def _load_row(self, *, sheet: Worksheet, row: int): + parsed_headers = self.sheet_parsed_headers(sheet.title) + patch_item = copy.deepcopy(self.sheet_patch_prototype(sheet.title)) + for col in self._all_cols(sheet): value = sheet.cell(row=row, column=col).value parsed_value = self.parse_value(value) self.set_path_value(patch_item, parsed_headers[col - 1], parsed_value)