Skip to content

Commit

Permalink
Fix monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
janheinrichmerker committed Nov 20, 2023
1 parent d3b2288 commit 1004485
Showing 1 changed file with 24 additions and 29 deletions.
53 changes: 24 additions & 29 deletions archive_query_log/monitoring/home.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime
from typing import NamedTuple, Type

from elasticsearch_dsl.query import Script, Exists
from elasticsearch_dsl.query import Script, Exists, Query, Term
from expiringdict import ExpiringDict
from flask import render_template, Response, make_response

Expand Down Expand Up @@ -30,11 +30,11 @@ class Progress(NamedTuple):
DocumentType = Type[BaseDocument]

_statistics_cache: dict[
tuple[DocumentType, tuple[str, ...]],
tuple[DocumentType, str],
Statistics,
] = ExpiringDict(
max_len=100,
max_age_seconds=30,
max_age_seconds=60 * 3, # 3 minutes
)


Expand All @@ -53,20 +53,17 @@ def _get_statistics(
name: str,
description: str,
document: DocumentType,
required_fields: tuple[str, ...] = (),
filter_query: Query | None = None,
) -> Statistics:
key = (document, required_fields)
key = (document, repr(filter_query))
if key in _statistics_cache:
return _statistics_cache[key]

document.index().refresh(using=config.es.client)
stats = document.index().stats(using=config.es.client)
search = document.search(using=config.es.client)
if len(required_fields) > 0:
query = Exists(field=required_fields[0])
for required_field in required_fields[1:]:
query &= Exists(field=required_field)
search = search.filter(query)
if filter_query is not None:
search = search.filter(filter_query)
total = search.count()
last_modified_response = (
search
Expand All @@ -86,7 +83,7 @@ def _get_statistics(
total=total,
disk_size=(
_convert_bytes(stats["_all"]["total"]["store"]["size_in_bytes"])
if len(required_fields) == 0 else ""
if filter_query is None else None
),
last_modified=last_modified,
)
Expand All @@ -95,11 +92,11 @@ def _get_statistics(


_progress_cache: dict[
tuple[DocumentType, tuple[str, ...], str],
tuple[DocumentType, str, str],
Progress,
] = ExpiringDict(
max_len=100,
max_age_seconds=30,
max_age_seconds=60 * 1, # 1 minute
)


Expand All @@ -110,25 +107,22 @@ def _get_processed_progress(
description: str,
document: DocumentType,
timestamp_field: str,
required_fields: tuple[str, ...] = (),
filter_query: Query | None = None,
) -> Progress:
key = (document, required_fields, timestamp_field)
key = (document, repr(filter_query), timestamp_field)
if key in _progress_cache:
return _progress_cache[key]

document.index().refresh(using=config.es.client)
search = document.search(using=config.es.client)
if len(required_fields) > 0:
query = Exists(field=required_fields[0])
for required_field in required_fields[1:]:
query &= Exists(field=required_field)
search = search.filter(query)
if filter_query is not None:
search = search.filter(filter_query)
total = search.count()
search_processed = search.filter(
Exists(field="last_modified") &
Exists(field=timestamp_field) &
Script(
script=f"doc['last_modified'].value.isBefore("
script=f"!doc['last_modified'].value.isAfter("
f"doc['{timestamp_field}'].value)",
)
)
Expand Down Expand Up @@ -187,46 +181,46 @@ def home(config: Config) -> str | Response:
description="SERPs for which the query has been parsed "
"from the URL.",
document=Serp,
required_fields=("url_query",),
filter_query=Exists(field="url_query"),
),
_get_statistics(
config=config,
name="+ URL page",
description="SERPs for which the page has been parsed "
"from the URL.",
document=Serp,
required_fields=("url_page",),
filter_query=Exists(field="url_page"),
),
_get_statistics(
config=config,
name="+ URL offset",
description="SERPs for which the offset has been parsed "
"from the URL.",
document=Serp,
required_fields=("url_offset",),
filter_query=Exists(field="url_offset"),
),
_get_statistics(
config=config,
name="+ WARC",
description="SERPs for which the WARC has been downloaded.",
document=Serp,
required_fields=("warc_location",),
filter_query=Exists(field="warc_location"),
),
_get_statistics(
config=config,
name="+ WARC query",
description="SERPs for which the query has been parsed "
"from the WARC.",
document=Serp,
required_fields=("warc_query",),
filter_query=Exists(field="warc_query"),
),
_get_statistics(
config=config,
name="+ WARC snippets",
description="SERPs for which the snippets have been parsed "
"from the WARC.",
document=Serp,
required_fields=("warc_snippets",),
filter_query=Exists(field="warc_snippets"),
),
_get_statistics(
config=config,
Expand Down Expand Up @@ -310,6 +304,7 @@ def home(config: Config) -> str | Response:
output_name="SERPs",
description="Download WARCs.",
document=Serp,
filter_query=Term(capture__status_code=200),
timestamp_field="warc_downloader.last_downloaded",
),
_get_processed_progress(
Expand All @@ -318,17 +313,17 @@ def home(config: Config) -> str | Response:
output_name="SERPs",
description="Parse query from WARC contents.",
document=Serp,
filter_query=Exists(field="warc_location"),
timestamp_field="warc_query_parser.last_parsed",
required_fields=("warc_location",),
),
_get_processed_progress(
config=config,
input_name="SERPs",
output_name="SERPs",
description="Parse snippets from WARC contents.",
document=Serp,
filter_query=Exists(field="warc_location"),
timestamp_field="warc_snippets_parser.last_parsed",
required_fields=("warc_location",),
),
]

Expand Down

0 comments on commit 1004485

Please sign in to comment.