Skip to content

Commit

Permalink
Improve monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
janheinrichmerker committed Nov 20, 2023
1 parent 07c5c12 commit 9cb9559
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 37 deletions.
135 changes: 109 additions & 26 deletions archive_query_log/monitoring/home.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,24 @@ class Statistics(NamedTuple):
name: str
description: str
total: str
disk_size: str
disk_size: str | None
last_modified: datetime | None


class Progress(NamedTuple):
name: str
input_name: str
output_name: str
description: str
total: int
current: int


DocumentType = Type[BaseDocument]

_statistics_cache: dict[DocumentType, Statistics] = ExpiringDict(
_statistics_cache: dict[
tuple[DocumentType,
tuple[str, ...]], Statistics,
] = ExpiringDict(
max_len=100,
max_age_seconds=30,
)
Expand All @@ -49,15 +53,23 @@ def _get_statistics(
name: str,
description: str,
document: DocumentType,
required_fields: tuple[str, ...] = (),
) -> Statistics:
key = document
key = (document, required_fields)
if key in _statistics_cache:
return _statistics_cache[key]

document.index().refresh(using=config.es.client)
stats = document.index().stats(using=config.es.client)
search = document.search(using=config.es.client)
if len(required_fields) > 0:
query = Exists(field=required_fields[0])
for required_field in required_fields[1:]:
query &= Exists(field=required_field)
search = search.filter(query)
total = search.count()
last_modified_response = (
document.search(using=config.es.client)
search
.query(Exists(field="last_modified"))
.sort("-last_modified")
.extra(size=1)
Expand All @@ -71,35 +83,47 @@ def _get_statistics(
statistics = Statistics(
name=name,
description=description,
total=stats["_all"]["primaries"]["docs"]["count"],
disk_size=_convert_bytes(
stats["_all"]["total"]["store"]["size_in_bytes"]),
total=total,
disk_size=(
_convert_bytes(stats["_all"]["total"]["store"]["size_in_bytes"])
if len(required_fields) == 0 else ""
),
last_modified=last_modified,
)
_statistics_cache[key] = statistics
return statistics


_progress_cache: dict[tuple[DocumentType, str], Progress] = ExpiringDict(
_progress_cache: dict[
tuple[DocumentType, tuple[str, ...], str],
Progress,
] = ExpiringDict(
max_len=100,
max_age_seconds=30,
)


def _get_processed_progress(
config: Config,
name: str,
input_name: str,
output_name: str,
description: str,
document: DocumentType,
timestamp_field: str,
required_fields: tuple[str, ...] = (),
) -> Progress:
key = (document, timestamp_field)
key = (document, required_fields, timestamp_field)
if key in _progress_cache:
return _progress_cache[key]

document.index().refresh(using=config.es.client)
search = document.search(using=config.es.client)
total = search.extra(track_total_hits=True).execute().hits.total.value
if len(required_fields) > 0:
query = Exists(field=required_fields[0])
for required_field in required_fields[1:]:
query &= Exists(field=required_field)
search = search.filter(query)
total = search.count()
search_processed = search.filter(
Exists(field="last_modified") &
Exists(field=timestamp_field) &
Expand All @@ -110,7 +134,8 @@ def _get_processed_progress(
)
total_processed = search_processed.count()
progress = Progress(
name=name,
input_name=input_name,
output_name=output_name,
description=description,
total=total,
current=total_processed,
Expand Down Expand Up @@ -156,6 +181,53 @@ def home(config: Config) -> str | Response:
"identified among the captures.",
document=Serp,
),
_get_statistics(
config=config,
name="+ URL query",
description="SERPs for which the query has been parsed "
"from the URL.",
document=Serp,
required_fields=("url_query",),
),
_get_statistics(
config=config,
name="+ URL page",
description="SERPs for which the page has been parsed "
"from the URL.",
document=Serp,
required_fields=("url_page",),
),
_get_statistics(
config=config,
name="+ URL offset",
description="SERPs for which the offset has been parsed "
"from the URL.",
document=Serp,
required_fields=("url_offset",),
),
_get_statistics(
config=config,
name="+ WARC",
description="SERPs for which the WARC has been downloaded.",
document=Serp,
required_fields=("warc_location",),
),
_get_statistics(
config=config,
name="+ WARC query",
description="SERPs for which the query has been parsed "
"from the WARC.",
document=Serp,
required_fields=("warc_query",),
),
_get_statistics(
config=config,
name="+ WARC snippets",
description="SERPs for which the snippets have been parsed "
"from the WARC.",
document=Serp,
required_fields=("warc_snippets",),
),
_get_statistics(
config=config,
name="Results",
Expand Down Expand Up @@ -185,67 +257,78 @@ def home(config: Config) -> str | Response:
progress_list: list[Progress] = [
_get_processed_progress(
config=config,
name="Archives → Sources",
input_name="Archives",
output_name="Sources",
description="Build sources for all archives.",
document=Archive,
timestamp_field="last_built_sources",
),
_get_processed_progress(
config=config,
name="Providers → Sources",
input_name="Providers",
output_name="Sources",
description="Build sources for all search providers.",
document=Provider,
timestamp_field="last_built_sources",
),
_get_processed_progress(
config=config,
name="Sources → Captures",
input_name="Sources",
output_name="Captures",
description="Fetch CDX captures for all domains and "
"prefixes in the sources.",
document=Source,
timestamp_field="last_fetched_captures",
),
_get_processed_progress(
config=config,
name="Captures → SERPs",
input_name="Captures",
output_name="SERPs",
description="Parse queries from capture URLs.",
document=Capture,
timestamp_field="url_query_parser.last_parsed",
),
_get_processed_progress(
config=config,
name="SERPs → SERPs",
input_name="SERPs",
output_name="SERPs",
description="Parse page from SERP URLs.",
document=Serp,
timestamp_field="url_page_parser.last_parsed",
),
_get_processed_progress(
config=config,
name="SERPs → SERPs",
input_name="SERPs",
output_name="SERPs",
description="Parse offset from SERP URLs.",
document=Serp,
timestamp_field="url_offset_parser.last_parsed",
),
_get_processed_progress(
config=config,
name="SERPs → SERPs",
input_name="SERPs",
output_name="SERPs",
description="Download WARCs.",
document=Serp,
timestamp_field="warc_downloader.last_downloaded",
),
_get_processed_progress(
config=config,
name="SERPs → SERPs",
description="Parse query from SERP contents.",
input_name="SERPs",
output_name="SERPs",
description="Parse query from WARC contents.",
document=Serp,
timestamp_field="serp_query_parser.last_parsed",
timestamp_field="warc_query_parser.last_parsed",
required_fields=("warc_location",),
),
_get_processed_progress(
config=config,
name="SERPs → SERPs",
description="Parse snippets from SERP contents.",
input_name="SERPs",
output_name="SERPs",
description="Parse snippets from WARC contents.",
document=Serp,
timestamp_field="serp_snippets_parser.last_parsed",
timestamp_field="warc_snippets_parser.last_parsed",
required_fields=("warc_location",),
),
]

Expand Down
33 changes: 22 additions & 11 deletions archive_query_log/templates/home.html
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ <h1>Archive Query Log</h1>
</header>
<main>
<section>
<h2>Stats</h2>
Monitor the crawling and parsing of the Archive Query Log.
Directly go to the detailed <a href="#statistics">statistics</a> or check the <a href="#progress">progress</a>.
</section>
<section id="statistics">
<h2>Statistics</h2>
<figure>
<table>
<thead>
Expand All @@ -64,13 +68,20 @@ <h2>Stats</h2>
{% for statistics in statistics_list %}
<tr>
<td>{{ statistics.name }}</td>
<td>{% if statistics.description %}{{ statistics.description }}{% else %}—{% endif %}</td>
<td style="font-size: smaller">{% if statistics.description %}{{ statistics.description }}{% else %}—{% endif %}</td>
<td style="text-align: right">{{ "{:,.0f}".format(statistics.total) }}</td>
<td style="text-align: right">{{ statistics.disk_size }}</td>
<td style="text-align: right">
{% if statistics.disk_size %}
{{ statistics.disk_size }}
{% else %}
{% endif %}
</td>
<td>
{% if statistics.last_modified %}
<time datetime="{{ statistics.last_modified.isoformat() }}">
{{ statistics.last_modified.strftime("%c") }}
<time datetime="{{ statistics.last_modified.isoformat() }}"
title="{{ statistics.last_modified.strftime('%c (%Z)') }}">
{{ statistics.last_modified.strftime("%Y-%m-%d %H:%M:%S") }}
</time>
{% else %}
Expand All @@ -82,13 +93,14 @@ <h2>Stats</h2>
</table>
</figure>
</section>
<section>
<section id="progress">
<h2>Progress</h2>
<figure>
<table>
<thead>
<tr>
<th>Stage</th>
<th>Input</th>
<th>Output</th>
<th>Description</th>
<th>Unprocessed</th>
<th>Processed</th>
Expand All @@ -98,10 +110,9 @@ <h2>Progress</h2>
<tbody>
{% for progress in progress_list %}
<tr>
<td>{{ progress.name }}</td>
<td>
{% if progress.description %}{{ progress.description }}{% else %}—{% endif %}
</td>
<td>{{ progress.input_name }}</td>
<td>{{ progress.output_name }}</td>
<td style="font-size: smaller">{{ progress.description }}</td>
<td style="text-align: right">{{ "{:,.0f}".format(progress.total - progress.current) }}</td>
<td style="text-align: right">{{ "{:,.0f}".format(progress.current) }}</td>
<td>
Expand Down

0 comments on commit 9cb9559

Please sign in to comment.