-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Some tools for working with spreadsheets (C4-1088) #275
Changes from 6 commits
4c84f0b
7b73a67
3d4573f
f4e5cfa
e9d2465
6e9060f
df12c91
6a39c8a
eedb5c6
a6b68fe
682c95a
3a103ee
56f702a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,191 @@ | ||
import copy | ||
|
||
from openpyxl import load_workbook | ||
from openpyxl.worksheet.worksheet import Worksheet | ||
from openpyxl.workbook.workbook import Workbook | ||
from typing import Any, Dict, List, Optional, Union | ||
|
||
|
||
class WorkbookManager: | ||
|
||
@classmethod | ||
def load_workbook(cls, filename: str): | ||
wb = cls(filename) | ||
return wb.load_content() | ||
|
||
def __init__(self, filename: str): | ||
self.filename: str = filename | ||
self.workbook: Optional[Workbook] = None | ||
self.headers_by_sheetname: Dict[List[str]] = {} | ||
self.content_by_sheetname: Dict[List[Any]] = {} | ||
|
||
def sheet_headers(self, sheet: Worksheet) -> List[str]: | ||
return self.headers_by_sheetname[sheet.title] | ||
|
||
def sheet_content(self, sheet: Worksheet) -> List[Any]: | ||
return self.content_by_sheetname[sheet.title] | ||
|
||
@classmethod | ||
def all_rows(cls, sheet: Worksheet): | ||
row_max = sheet.max_row | ||
for row in range(2, row_max + 1): | ||
yield row | ||
|
||
@classmethod | ||
def all_cols(cls, sheet: Worksheet): | ||
col_max = sheet.max_column | ||
for col in range(1, col_max + 1): | ||
yield col | ||
|
||
def load_headers(self, sheet: Worksheet): | ||
headers: List[str] = [str(sheet.cell(row=1, column=col).value) | ||
for col in self.all_cols(sheet)] | ||
self.headers_by_sheetname[sheet.title] = headers | ||
|
||
def load_content(self): | ||
workbook: Workbook = load_workbook(self.filename) | ||
self.workbook = workbook | ||
for sheetname in workbook.sheetnames: | ||
sheet: Worksheet = workbook[sheetname] | ||
self.load_headers(sheet) | ||
content = [] | ||
for row in self.all_rows(sheet): | ||
row_dict = self.load_row(sheet=sheet, row=row) | ||
content.append(row_dict) | ||
self.content_by_sheetname[sheetname] = content | ||
return self.content_by_sheetname | ||
|
||
def load_row(self, *, sheet: Worksheet, row: int): | ||
headers = self.sheet_headers(sheet) | ||
row_dict: Dict[str, Any] = {headers[col-1]: sheet.cell(row=row, column=col).value | ||
for col in self.all_cols(sheet)} | ||
return row_dict | ||
|
||
|
||
class ItemTools: | ||
|
||
@classmethod | ||
def compute_patch_prototype(cls, parsed_headers): | ||
prototype = {} | ||
for parsed_header in parsed_headers: | ||
parsed_header0 = parsed_header[0] | ||
if isinstance(parsed_header0, int): | ||
raise ValueError(f"A header cannot begin with a numeric ref: {parsed_header0}") | ||
cls.assure_patch_prototype_shape(parent=prototype, keys=parsed_header) | ||
return prototype | ||
|
||
@classmethod | ||
def assure_patch_prototype_shape(cls, *, parent: Union[Dict, List], keys: List[Union[int, str]]): | ||
[key0, *more_keys] = keys | ||
key1 = more_keys[0] if more_keys else None | ||
if isinstance(key1, int): | ||
placeholder = [] | ||
elif isinstance(key1, str): | ||
placeholder = {} | ||
else: | ||
placeholder = None | ||
if isinstance(key0, int): | ||
n = len(parent) | ||
if key0 == n: | ||
parent.append(placeholder) | ||
elif key0 > n: | ||
raise Exception("Numeric items must occur sequentially.") | ||
Comment on lines
+81
to
+84
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually,
Comment on lines
+83
to
+84
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When parsing these spreadsheets, we generally won't want to raise exceptions outright that won't be caught quickly because we want to accumulate all errors across the spreadsheet before reporting them back to the user for fixing. This could certainly be caught downstream, but may be worth considering here from the get-go. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, there might be some finessing to do on this. I had certainly been trying to minimize errors because indeed raising errors at all means you only get one bit of feedback instead of many and there's no obvious handler anyway, but some of these things I didn't work all the way through and figured I can come back to. |
||
elif isinstance(key0, str): | ||
if key0 not in parent: | ||
parent[key0] = placeholder | ||
if key1 is not None: | ||
cls.assure_patch_prototype_shape(parent=parent[key0], keys=more_keys) | ||
return parent | ||
|
||
@classmethod | ||
def parse_sheet_headers(cls, headers): | ||
return [cls.parse_sheet_header(header) | ||
for header in headers] | ||
|
||
@classmethod | ||
def parse_sheet_header(cls, header) -> List[Union[int, str]]: | ||
result = [] | ||
token = "" | ||
for i in range(len(header)): | ||
ch = header[i] | ||
if ch == '.' or ch == '#': | ||
if token: | ||
result.append(int(token) if token.isdigit() else token) | ||
token = "" | ||
else: | ||
token += ch | ||
if token: | ||
result.append(int(token) if token.isdigit() else token) | ||
return result | ||
|
||
@classmethod | ||
def set_path_value(cls, datum, path, value, force=False): | ||
if (value is None or value == '') and not force: | ||
return | ||
[key, *more_path] = path | ||
if not more_path: | ||
datum[key] = value | ||
else: | ||
cls.set_path_value(datum[key], more_path, value) | ||
|
||
@classmethod | ||
def parse_value(cls, value): | ||
if isinstance(value, str): | ||
lvalue = value.lower() | ||
# TODO: We could consult a schema to make this less heuristic, but this may do for now | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the purpose of this module is parsing spreadsheets purely for generating items to validate against schema, I think the schema parsing is required here. Otherwise, we will likely be casting to one type here only to check the schema later and cast back, for example if the field is a string per the schema and alphanumeric (including only numeric) inputs are accepted. If we want this module for parsing spreadsheets more generically, then no issues with this at all. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was just talking to Will about that today. It's not just validating. I think of it (well, Excel, really, or some other spreadsheet editor like OpenOffice or LibreOffice) as a kind of user interface layer that accepts more flexible syntax and offers more interesting editing capabilities. Your original ask was to tolerate differences in case, etc. That can't be done without schemas, I agree. That part is coming downstream. Stay tuned. Just trying to keep a separation of concerns. |
||
if lvalue == 'true': | ||
return True | ||
elif lvalue == 'false': | ||
return False | ||
elif lvalue == 'null' or lvalue == '': | ||
return None | ||
elif '|' in value: | ||
return [cls.parse_value(subvalue) for subvalue in value.split('|')] | ||
else: | ||
ch0 = value[0] | ||
if ch0 == '+' or ch0 == '-' or ch0.isdigit(): | ||
try: | ||
return int(value) | ||
except Exception: | ||
pass | ||
try: | ||
return float(value) | ||
except Exception: | ||
pass | ||
return value | ||
else: # probably a number | ||
return value | ||
|
||
|
||
class ItemManager(ItemTools, WorkbookManager): | ||
|
||
def __init__(self, filename: str): | ||
super().__init__(filename=filename) | ||
self.patch_prototypes_by_sheetname: Dict[Dict] = {} | ||
self.parsed_headers_by_sheetname: Dict[List[List[Union[int, str]]]] = {} | ||
|
||
def sheet_patch_prototype(self, sheet: Worksheet) -> Dict: | ||
return self.patch_prototypes_by_sheetname[sheet.title] | ||
|
||
def sheet_parsed_headers(self, sheet: Worksheet) -> List[List[Union[int, str]]]: | ||
return self.parsed_headers_by_sheetname[sheet.title] | ||
|
||
def load_headers(self, sheet: Worksheet): | ||
super().load_headers(sheet) | ||
self.compile_sheet_headers(sheet) | ||
|
||
def compile_sheet_headers(self, sheet: Worksheet): | ||
headers = self.headers_by_sheetname[sheet.title] | ||
parsed_headers = self.parse_sheet_headers(headers) | ||
self.parsed_headers_by_sheetname[sheet.title] = parsed_headers | ||
prototype = self.compute_patch_prototype(parsed_headers) | ||
self.patch_prototypes_by_sheetname[sheet.title] = prototype | ||
|
||
def load_row(self, *, sheet: Worksheet, row: int): | ||
parsed_headers = self.sheet_parsed_headers(sheet) | ||
patch_item = copy.deepcopy(self.sheet_patch_prototype(sheet)) | ||
for col in self.all_cols(sheet): | ||
value = sheet.cell(row=row, column=col).value | ||
parsed_value = self.parse_value(value) | ||
self.set_path_value(patch_item, parsed_headers[col - 1], parsed_value) | ||
return patch_item |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
name,age,mother.name,mother.age,father.name,father.age,friends#0.name,friends#0.age,friends#1.name,friends#1.age | ||
bill,23,mary,58,fred,63,sam,22,arthur,19 | ||
joe,9,estrella,35,anthony,34,anders,9,, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For dictionary-like type annotations, consider annotating the key type as well as the value type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that got fixed later. I don't even think the way I annotated it was legit. I was just asleep while typing.
But this is a really crappy way to notate dictionaries. Newer python (I think in 3.8) has better ways to declare this. Dictionaries are often not uniform in output type and this notation has trouble handling that.