-
Notifications
You must be signed in to change notification settings - Fork 2
/
feed_ursus.py
executable file
·407 lines (326 loc) · 14.6 KB
/
feed_ursus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Convert UCLA Library CSV files for Ursus, our Blacklight installation."""
import csv
from collections import defaultdict
import json
import os
import re
import typing
import yaml
import click
from pysolr import Solr # type: ignore
import requests
import rich.progress
import mapper
import year_parser
import date_parser
# Custom Types
DLCSRecord = typing.Dict[str, typing.Any]
UrsusRecord = typing.Dict[str, typing.Any]
@click.command()
@click.argument("filename")
@click.option(
"--solr_url",
default=None,
help="URL of a solr instance, e.g. http://localhost:6983/solr/californica",
)
def load_csv(filename: str, solr_url: typing.Optional[str]):
"""Load data from a csv.
Args:
filename: A CSV file.
solr_url: API endpoint for a solr instance.
"""
solr_client = Solr(solr_url, always_commit=True) if solr_url else Solr("")
csv_data = { row["Item ARK"]: row for row in csv.DictReader(open(filename)) }
config = {
"collection_names": {
row["Item ARK"]: row["Title"] for row in csv_data.values() if row["Object Type"] == "Collection"
},
"controlled_fields": load_field_config("./fields"),
"child_works": collate_child_works(csv_data),
}
controlled_fields = load_field_config("./fields")
mapped_records = []
for row in rich.progress.track(csv_data.values(), description=f"Importing {filename}..."):
if row["Object Type"] not in ("ChildWork", "Page"):
mapped_records.append(map_record(row, solr_client, config=config))
if solr_url:
solr_client.add(mapped_records)
else:
print(json.dumps(mapped_records))
def collate_child_works(csv_data: csv.DictReader) -> typing.Dict:
# link pages to their parent works
child_works = defaultdict(list)
for row in csv_data.values():
if row["Object Type"] in ("ChildWork", "Page"):
child_works[row["Parent ARK"]].append(row)
return child_works
def load_field_config(base_path: str = "./fields") -> typing.Dict:
"""Load configuration of controlled metadata fields.
Args:
base_path: Path to a directory containing [field].yml files.
Returns:
A dict with field configuration.
"""
field_config: typing.Dict = {}
for path, _, files in os.walk(base_path):
for file_name in files:
field_name = os.path.splitext(file_name)[0]
with open(os.path.join(path, file_name), "r") as stream:
field_config[field_name] = yaml.safe_load(stream)
field_config[field_name]["terms"] = {
t["id"]: t["term"] for t in field_config[field_name]["terms"]
}
return field_config
# pylint: disable=bad-continuation
def map_field_value(
row: DLCSRecord, field_name: str, config: typing.Dict
) -> typing.Any:
"""Map value from a CSV cell to an object that will be passed to solr.
Mapping logic is defined by the FIELD_MAPPING dict, defined in mappery.py.
Keys of FIELD_MAPPING are output field names as used in Ursus. Values can
vary, and the behavior of map_field_value() will depend on that value.
If FIELD_MAPPING[field_name] is a string, then it will be interpreted as
the title of a CSV column to map. The value of that column will be split
using the MARC delimiter '|~|', and a list of one or more strings will be
returned (or an empty list, if the CSV column was empty).
If FIELD_MAPPING[field_name] is a list of strings, then they will all be
interpreted as CSV column names to be mapped. Each column will be processed
as above, and the resulting lists will be concatenated.
Finally, FIELD_MAPPING[field_name] can be a function, most likely defined
in mappery.py. If this is the case, that function will be called with the
input row (as a dict) as its only argument. That function should return a
type that matches the type of the solr field. This is the only way to
map to types other than lists of strings.
Args:
row: An input row containing a DLCS record.
field_name: The name of the Ursus/Solr field to map.
Returns:
A value to be submitted to solr. By default this is a list of strings,
however map_[SOLR_FIELD_NAME] functions can return other types.
"""
mapping: mapper.MappigDictValue = mapper.FIELD_MAPPING[field_name]
if mapping is None:
return None
if callable(mapping):
return mapping(row)
if isinstance(mapping, str):
mapping = [mapping]
if not isinstance(mapping, typing.Collection):
raise TypeError(
f"FIELD_MAPPING[field_name] must be iterable, unless it is None, Callable, or a string."
)
output: typing.List[str] = []
for csv_field in mapping:
input_value = row.get(csv_field)
if input_value:
if isinstance(input_value, str):
output.extend(input_value.split("|~|"))
else:
output.append(input_value)
bare_field_name = get_bare_field_name(field_name)
if bare_field_name in config.get("controlled_fields", {}):
terms = config["controlled_fields"][bare_field_name]["terms"]
output = [terms.get(value, value) for value in output]
return [value for value in output if value] # remove untruthy values like ''
def get_bare_field_name(field_name: str) -> str:
"""Strips the solr suffix and initial 'human_readable_' from a field name."""
return re.sub(r"_[^_]+$", "", field_name).replace("human_readable_", "")
def solr_transformed_dates(solr_client: Solr, parsed_dates: typing.List):
""" the dates in sorted list are transformed to solr format """
return [solr_client._from_python(date) for date in parsed_dates] # pylint: disable=protected-access
# pylint: disable=bad-continuation
def map_record(row: DLCSRecord, solr_client: Solr, config: typing.Dict) -> UrsusRecord: # pylint: disable=too-many-statements
"""Maps a metadata record from CSV to Ursus Solr.
Args:
record: A mapping representing the CSV record.
Returns:
A mapping representing the record to submit to Solr.
"""
record: UrsusRecord = {
field_name: map_field_value(row, field_name, config=config)
for field_name in mapper.FIELD_MAPPING
}
# THUMBNAIL
record["thumbnail_url_ss"] = (
record.get("thumbnail_url_ss")
or thumbnail_from_child(record, config=config)
or thumbnail_from_manifest(record)
)
# COLLECTION NAME
if "Parent ARK" in row and row["Parent ARK"] in config["collection_names"]:
dlcs_collection_name = config["collection_names"][row["Parent ARK"]]
record["dlcs_collection_name_tesim"] = [dlcs_collection_name]
# FIELDS
record["uniform_title_sim"] = record.get("uniform_title_tesim")
record["architect_sim"] = record.get("architect_tesim")
record["author_sim"] = record.get("author_tesim")
record["illuminator_sim"] = record.get("illuminator_tesim")
record["scribe_sim"] = record.get("scribe_tesim")
record["rubricator_sim"] = record.get("rubricator_tesim")
record["commentator_sim"] = record.get("commentator_tesim")
record["translator_sim"] = record.get("translator_tesim")
record["lyricist_sim"] = record.get("lyricist_tesim")
record["composer_sim"] = record.get("composer_tesim")
record["illustrator_sim"] = record.get("illustrator_tesim")
record["editor_sim"] = record.get("editor_tesim")
record["calligrapher_sim"] = record.get("calligrapher_tesim")
record["engraver_sim"] = record.get("engraver_tesim")
record["printmaker_sim"] = record.get("printmaker_tesim")
record["human_readable_language_sim"] = record.get("human_readable_language_tesim")
record["names_sim"] = name_fields(record)
record["keywords_sim"] = keywords_fields(record)
record["collection_sim"] = record.get("collection_ssi")
# explicit
record["features_sim"] = record.get("features_tesim")
# incipit
# inscription
record["script_sim"] = record.get("script_tesim")
record["writing_system_sim"] = record.get("writing_system_tesim")
record["year_isim"] = year_parser.integer_years(record.get("normalized_date_tesim"))
record["date_dtsim"] = solr_transformed_dates(solr_client,
(date_parser.get_dates(record.get("normalized_date_tesim"))))
record["place_of_origin_sim"] = record.get("place_of_origin_tesim")
record["associated_name_sim"] = record.get("associated_name_tesim")
record["form_sim"] = record.get("form_tesim")
record["support_sim"] = record.get("support_tesim")
record["genre_sim"] = record.get("genre_tesim")
record["subject_sim"] = record.get("subject_tesim")
record["location_sim"] = record.get("location_tesim")
record["named_subject_sim"] = record.get("named_subject_tesim")
record["human_readable_resource_type_sim"] = record.get("resource_type_tesim")
record["member_of_collections_ssim"] = record.get("dlcs_collection_name_tesim")
# SINAI INDEX
record["header_index_tesim"] = header_fields(record)
record["name_fields_index_tesim"] = name_fields_index(record)
# SORT FIELDS
titles = record.get("title_tesim")
if isinstance(titles, typing.Sequence) and len(titles) >= 1:
record["sort_title_ssort"] = titles[0]
# used a solr copyfield for shelfmark sorting
# shelfmarks = record.get("shelfmark_ssi")
# print(shelfmarks)
# if isinstance(shelfmarks, typing.Sequence) and len(shelfmarks) >= 1:
# print(shelfmarks[0])
# record["shelfmark_aplha_numeric_ssort"] = shelfmarks[0]
# -----------------------------------------------------------------------
years = record.get("year_isim")
if isinstance(years, typing.Sequence) and len(years) >= 1:
record["sort_year_isi"] = min(years)
dates = record.get("date_dtsim")
if isinstance(dates, typing.Sequence) and len(dates) >= 1:
record["date_dtsort"] = dates[0]
return record
def name_fields(record):
"""combine fields for the names facet"""
record["names_sim"] = record.get("author_tesim")
if record.get("author_tesim") is not None:
if record.get("names_sim") is not None:
record["names_sim"] = record["names_sim"] + record.get("author_tesim")
else:
record["names_sim"] = record.get("author_tesim")
if record.get("scribe_tesim") is not None:
if record.get("names_sim") is not None:
record["names_sim"] = record["names_sim"] + record.get("scribe_tesim")
else:
record["names_sim"] = record.get("scribe_tesim")
if record.get("associated_name_tesim") is not None:
if record.get("names_sim") is not None:
record["names_sim"] = record["names_sim"] + record.get("associated_name_tesim")
else:
record["names_sim"] = record.get("associated_name_tesim")
if record.get("translator_tesim") is not None:
if record.get("names_sim") is not None:
record["names_sim"] = record["names_sim"] + record.get("translator_tesim")
else:
record["names_sim"] = record.get("translator_tesim")
return record["names_sim"]
# Sinai Index Page
# record.get returns the default of en empty array if there is no record
# combine fields for the header value
def header_fields(record):
"""Header: shelfmark_ssi: 'Shelfmark' && extent_tesim: 'Format'"""
shelfmark = record.get("shelfmark_ssi", [])
extent = record.get("extent_tesim", [])
return shelfmark + extent
# Sinai Item Page
# record.get returns the default of en empty array if there is no record
# combine fields for the keywords value
def keywords_fields(record):
"""Keywords: genre_tesim: 'Genre' && features_tesim: 'Features' &&
place_of_origin_tesim: 'Place of Origin' && support_tesim: 'Support' &&
form_ssi: 'Form'
"""
genre = record.get("genre_tesim", [])
features = record.get("features_tesim", [])
place_of_origin = record.get("place_of_origin_tesim", [])
support = record.get("support_tesim", [])
form = record.get("form_ssi", [])
record["keywords_tesim"] = genre + features + place_of_origin + support + form
return record["keywords_tesim"]
# TITLE: uniform_title_one | uniform_title_two | descriptive_title_one | descriptive_title_two
# combine fields for the names value in the Name facet & for the index page
# Name: author_tesim && associated_name_tesim && scribe_tesim
def name_fields_index(record):
"""NAME: author_one| author_two | associated_one | associated_two | scribe_one"""
author = record.get("author_tesim", [])
associated_name = record.get("associated_name_tesim", [])
scribe = record.get("scribe_tesim", [])
name_fields_combined = author + associated_name + scribe
return name_fields_combined
def thumbnail_from_child(
record: UrsusRecord, config: typing.Dict
) -> typing.Optional[str]:
"""Picks a thumbnail by looking for child rows in the CSV.
Tries the following strategies in order, returning the first that succeeds:
- Thumbnail of a child record titled "f. 001r"
- Thumbnail of the first child record
- None
Args:
record: A mapping representing the CSV record.
config: A config object.
Returns:
A string containing the thumbnail URL
"""
if "child_works" not in config:
return None
ark = record["ark_ssi"]
children: list = config["child_works"][ark]
def sort_key(row: dict) -> str:
if row["Title"].startswith("f. "):
return "a" + row["Title"] # prefer records of this form, in alphanumeric sort order
else:
return "z" + row["Title"]
children.sort(key=sort_key)
for row in children:
thumb = mapper.thumbnail_url(row)
if thumb:
print(row["Title"])
return thumb
return None
def thumbnail_from_manifest(record: UrsusRecord) -> typing.Optional[str]:
"""Picks a thumbnail downloading the IIIF manifest.
Args:
record: A mapping representing the CSV record.
Returns:
A string containing the thumbnail URL
"""
try:
manifest_url = record.get("iiif_manifest_url_ssi")
if not isinstance(manifest_url, str):
return None
response = requests.get(manifest_url)
manifest = response.json()
canvases = {
c["label"]: c["images"][0]["resource"]["service"]["@id"]
for seq in manifest["sequences"]
for c in seq["canvases"]
}
return (
canvases.get("f. 001r") or list(canvases.values())[0]
) + "/full/!200,200/0/default.jpg"
except: # pylint: disable=bare-except
return None
if __name__ == "__main__":
load_csv() # pylint: disable=no-value-for-parameter