From 9cb9559dd53cf2e6182dad61788917c11da46b60 Mon Sep 17 00:00:00 2001 From: Jan Heinrich Reimer Date: Mon, 20 Nov 2023 17:00:57 +0100 Subject: [PATCH] Improve monitoring --- archive_query_log/monitoring/home.py | 135 +++++++++++++++++++++----- archive_query_log/templates/home.html | 33 ++++--- 2 files changed, 131 insertions(+), 37 deletions(-) diff --git a/archive_query_log/monitoring/home.py b/archive_query_log/monitoring/home.py index 8ec40dc8..68e5f01b 100644 --- a/archive_query_log/monitoring/home.py +++ b/archive_query_log/monitoring/home.py @@ -15,12 +15,13 @@ class Statistics(NamedTuple): name: str description: str total: str - disk_size: str + disk_size: str | None last_modified: datetime | None class Progress(NamedTuple): - name: str + input_name: str + output_name: str description: str total: int current: int @@ -28,7 +29,10 @@ class Progress(NamedTuple): DocumentType = Type[BaseDocument] -_statistics_cache: dict[DocumentType, Statistics] = ExpiringDict( +_statistics_cache: dict[ + tuple[DocumentType, + tuple[str, ...]], Statistics, +] = ExpiringDict( max_len=100, max_age_seconds=30, ) @@ -49,15 +53,23 @@ def _get_statistics( name: str, description: str, document: DocumentType, + required_fields: tuple[str, ...] = (), ) -> Statistics: - key = document + key = (document, required_fields) if key in _statistics_cache: return _statistics_cache[key] document.index().refresh(using=config.es.client) stats = document.index().stats(using=config.es.client) + search = document.search(using=config.es.client) + if len(required_fields) > 0: + query = Exists(field=required_fields[0]) + for required_field in required_fields[1:]: + query &= Exists(field=required_field) + search = search.filter(query) + total = search.count() last_modified_response = ( - document.search(using=config.es.client) + search .query(Exists(field="last_modified")) .sort("-last_modified") .extra(size=1) @@ -71,16 +83,21 @@ def _get_statistics( statistics = Statistics( name=name, description=description, - total=stats["_all"]["primaries"]["docs"]["count"], - disk_size=_convert_bytes( - stats["_all"]["total"]["store"]["size_in_bytes"]), + total=total, + disk_size=( + _convert_bytes(stats["_all"]["total"]["store"]["size_in_bytes"]) + if len(required_fields) == 0 else "" + ), last_modified=last_modified, ) _statistics_cache[key] = statistics return statistics -_progress_cache: dict[tuple[DocumentType, str], Progress] = ExpiringDict( +_progress_cache: dict[ + tuple[DocumentType, tuple[str, ...], str], + Progress, +] = ExpiringDict( max_len=100, max_age_seconds=30, ) @@ -88,18 +105,25 @@ def _get_statistics( def _get_processed_progress( config: Config, - name: str, + input_name: str, + output_name: str, description: str, document: DocumentType, timestamp_field: str, + required_fields: tuple[str, ...] = (), ) -> Progress: - key = (document, timestamp_field) + key = (document, required_fields, timestamp_field) if key in _progress_cache: return _progress_cache[key] document.index().refresh(using=config.es.client) search = document.search(using=config.es.client) - total = search.extra(track_total_hits=True).execute().hits.total.value + if len(required_fields) > 0: + query = Exists(field=required_fields[0]) + for required_field in required_fields[1:]: + query &= Exists(field=required_field) + search = search.filter(query) + total = search.count() search_processed = search.filter( Exists(field="last_modified") & Exists(field=timestamp_field) & @@ -110,7 +134,8 @@ def _get_processed_progress( ) total_processed = search_processed.count() progress = Progress( - name=name, + input_name=input_name, + output_name=output_name, description=description, total=total, current=total_processed, @@ -156,6 +181,53 @@ def home(config: Config) -> str | Response: "identified among the captures.", document=Serp, ), + _get_statistics( + config=config, + name="+ URL query", + description="SERPs for which the query has been parsed " + "from the URL.", + document=Serp, + required_fields=("url_query",), + ), + _get_statistics( + config=config, + name="+ URL page", + description="SERPs for which the page has been parsed " + "from the URL.", + document=Serp, + required_fields=("url_page",), + ), + _get_statistics( + config=config, + name="+ URL offset", + description="SERPs for which the offset has been parsed " + "from the URL.", + document=Serp, + required_fields=("url_offset",), + ), + _get_statistics( + config=config, + name="+ WARC", + description="SERPs for which the WARC has been downloaded.", + document=Serp, + required_fields=("warc_location",), + ), + _get_statistics( + config=config, + name="+ WARC query", + description="SERPs for which the query has been parsed " + "from the WARC.", + document=Serp, + required_fields=("warc_query",), + ), + _get_statistics( + config=config, + name="+ WARC snippets", + description="SERPs for which the snippets have been parsed " + "from the WARC.", + document=Serp, + required_fields=("warc_snippets",), + ), _get_statistics( config=config, name="Results", @@ -185,21 +257,24 @@ def home(config: Config) -> str | Response: progress_list: list[Progress] = [ _get_processed_progress( config=config, - name="Archives → Sources", + input_name="Archives", + output_name="Sources", description="Build sources for all archives.", document=Archive, timestamp_field="last_built_sources", ), _get_processed_progress( config=config, - name="Providers → Sources", + input_name="Providers", + output_name="Sources", description="Build sources for all search providers.", document=Provider, timestamp_field="last_built_sources", ), _get_processed_progress( config=config, - name="Sources → Captures", + input_name="Sources", + output_name="Captures", description="Fetch CDX captures for all domains and " "prefixes in the sources.", document=Source, @@ -207,45 +282,53 @@ def home(config: Config) -> str | Response: ), _get_processed_progress( config=config, - name="Captures → SERPs", + input_name="Captures", + output_name="SERPs", description="Parse queries from capture URLs.", document=Capture, timestamp_field="url_query_parser.last_parsed", ), _get_processed_progress( config=config, - name="SERPs → SERPs", + input_name="SERPs", + output_name="SERPs", description="Parse page from SERP URLs.", document=Serp, timestamp_field="url_page_parser.last_parsed", ), _get_processed_progress( config=config, - name="SERPs → SERPs", + input_name="SERPs", + output_name="SERPs", description="Parse offset from SERP URLs.", document=Serp, timestamp_field="url_offset_parser.last_parsed", ), _get_processed_progress( config=config, - name="SERPs → SERPs", + input_name="SERPs", + output_name="SERPs", description="Download WARCs.", document=Serp, timestamp_field="warc_downloader.last_downloaded", ), _get_processed_progress( config=config, - name="SERPs → SERPs", - description="Parse query from SERP contents.", + input_name="SERPs", + output_name="SERPs", + description="Parse query from WARC contents.", document=Serp, - timestamp_field="serp_query_parser.last_parsed", + timestamp_field="warc_query_parser.last_parsed", + required_fields=("warc_location",), ), _get_processed_progress( config=config, - name="SERPs → SERPs", - description="Parse snippets from SERP contents.", + input_name="SERPs", + output_name="SERPs", + description="Parse snippets from WARC contents.", document=Serp, - timestamp_field="serp_snippets_parser.last_parsed", + timestamp_field="warc_snippets_parser.last_parsed", + required_fields=("warc_location",), ), ] diff --git a/archive_query_log/templates/home.html b/archive_query_log/templates/home.html index 2ec4f96d..47a41543 100644 --- a/archive_query_log/templates/home.html +++ b/archive_query_log/templates/home.html @@ -48,7 +48,11 @@

Archive Query Log

-

Stats

+ Monitor the crawling and parsing of the Archive Query Log. + Directly go to the detailed statistics or check the progress. +
+
+

Statistics

@@ -64,13 +68,20 @@

Stats

{% for statistics in statistics_list %} - + - +
{{ statistics.name }}{% if statistics.description %}{{ statistics.description }}{% else %}—{% endif %}{% if statistics.description %}{{ statistics.description }}{% else %}—{% endif %} {{ "{:,.0f}".format(statistics.total) }}{{ statistics.disk_size }} + {% if statistics.disk_size %} + {{ statistics.disk_size }} + {% else %} + — + {% endif %} + {% if statistics.last_modified %} -
-
+

Progress

- + + @@ -98,10 +110,9 @@

Progress

{% for progress in progress_list %} - - + + +
StageInputOutput Description Unprocessed Processed
{{ progress.name }} - {% if progress.description %}{{ progress.description }}{% else %}—{% endif %} - {{ progress.input_name }}{{ progress.output_name }}{{ progress.description }} {{ "{:,.0f}".format(progress.total - progress.current) }} {{ "{:,.0f}".format(progress.current) }}