diff --git a/archive_query_log/monitoring/home.py b/archive_query_log/monitoring/home.py index 2fadecc5..d2cd3c72 100644 --- a/archive_query_log/monitoring/home.py +++ b/archive_query_log/monitoring/home.py @@ -1,7 +1,7 @@ from datetime import datetime from typing import NamedTuple, Type -from elasticsearch_dsl.query import Script, Exists +from elasticsearch_dsl.query import Script, Exists, Query, Term from expiringdict import ExpiringDict from flask import render_template, Response, make_response @@ -30,11 +30,11 @@ class Progress(NamedTuple): DocumentType = Type[BaseDocument] _statistics_cache: dict[ - tuple[DocumentType, tuple[str, ...]], + tuple[DocumentType, str], Statistics, ] = ExpiringDict( max_len=100, - max_age_seconds=30, + max_age_seconds=60 * 3, # 3 minutes ) @@ -53,20 +53,17 @@ def _get_statistics( name: str, description: str, document: DocumentType, - required_fields: tuple[str, ...] = (), + filter_query: Query | None = None, ) -> Statistics: - key = (document, required_fields) + key = (document, repr(filter_query)) if key in _statistics_cache: return _statistics_cache[key] document.index().refresh(using=config.es.client) stats = document.index().stats(using=config.es.client) search = document.search(using=config.es.client) - if len(required_fields) > 0: - query = Exists(field=required_fields[0]) - for required_field in required_fields[1:]: - query &= Exists(field=required_field) - search = search.filter(query) + if filter_query is not None: + search = search.filter(filter_query) total = search.count() last_modified_response = ( search @@ -86,7 +83,7 @@ def _get_statistics( total=total, disk_size=( _convert_bytes(stats["_all"]["total"]["store"]["size_in_bytes"]) - if len(required_fields) == 0 else "" + if filter_query is None else None ), last_modified=last_modified, ) @@ -95,11 +92,11 @@ def _get_statistics( _progress_cache: dict[ - tuple[DocumentType, tuple[str, ...], str], + tuple[DocumentType, str, str], Progress, ] = ExpiringDict( max_len=100, - max_age_seconds=30, + max_age_seconds=60 * 1, # 1 minute ) @@ -110,25 +107,22 @@ def _get_processed_progress( description: str, document: DocumentType, timestamp_field: str, - required_fields: tuple[str, ...] = (), + filter_query: Query | None = None, ) -> Progress: - key = (document, required_fields, timestamp_field) + key = (document, repr(filter_query), timestamp_field) if key in _progress_cache: return _progress_cache[key] document.index().refresh(using=config.es.client) search = document.search(using=config.es.client) - if len(required_fields) > 0: - query = Exists(field=required_fields[0]) - for required_field in required_fields[1:]: - query &= Exists(field=required_field) - search = search.filter(query) + if filter_query is not None: + search = search.filter(filter_query) total = search.count() search_processed = search.filter( Exists(field="last_modified") & Exists(field=timestamp_field) & Script( - script=f"doc['last_modified'].value.isBefore(" + script=f"!doc['last_modified'].value.isAfter(" f"doc['{timestamp_field}'].value)", ) ) @@ -187,7 +181,7 @@ def home(config: Config) -> str | Response: description="SERPs for which the query has been parsed " "from the URL.", document=Serp, - required_fields=("url_query",), + filter_query=Exists(field="url_query"), ), _get_statistics( config=config, @@ -195,7 +189,7 @@ def home(config: Config) -> str | Response: description="SERPs for which the page has been parsed " "from the URL.", document=Serp, - required_fields=("url_page",), + filter_query=Exists(field="url_page"), ), _get_statistics( config=config, @@ -203,14 +197,14 @@ def home(config: Config) -> str | Response: description="SERPs for which the offset has been parsed " "from the URL.", document=Serp, - required_fields=("url_offset",), + filter_query=Exists(field="url_offset"), ), _get_statistics( config=config, name="+ WARC", description="SERPs for which the WARC has been downloaded.", document=Serp, - required_fields=("warc_location",), + filter_query=Exists(field="warc_location"), ), _get_statistics( config=config, @@ -218,7 +212,7 @@ def home(config: Config) -> str | Response: description="SERPs for which the query has been parsed " "from the WARC.", document=Serp, - required_fields=("warc_query",), + filter_query=Exists(field="warc_query"), ), _get_statistics( config=config, @@ -226,7 +220,7 @@ def home(config: Config) -> str | Response: description="SERPs for which the snippets have been parsed " "from the WARC.", document=Serp, - required_fields=("warc_snippets",), + filter_query=Exists(field="warc_snippets"), ), _get_statistics( config=config, @@ -310,6 +304,7 @@ def home(config: Config) -> str | Response: output_name="SERPs", description="Download WARCs.", document=Serp, + filter_query=Term(capture__status_code=200), timestamp_field="warc_downloader.last_downloaded", ), _get_processed_progress( @@ -318,8 +313,8 @@ def home(config: Config) -> str | Response: output_name="SERPs", description="Parse query from WARC contents.", document=Serp, + filter_query=Exists(field="warc_location"), timestamp_field="warc_query_parser.last_parsed", - required_fields=("warc_location",), ), _get_processed_progress( config=config, @@ -327,8 +322,8 @@ def home(config: Config) -> str | Response: output_name="SERPs", description="Parse snippets from WARC contents.", document=Serp, + filter_query=Exists(field="warc_location"), timestamp_field="warc_snippets_parser.last_parsed", - required_fields=("warc_location",), ), ]