Skip to content

Commit

Permalink
Search using fpstore
Browse files Browse the repository at this point in the history
  • Loading branch information
lalinsky committed Feb 21, 2024
1 parent 7a955dd commit 9263984
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 13 deletions.
5 changes: 4 additions & 1 deletion acoustid/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def read_env(self, prefix):

class FpstoreConfig(BaseConfig):
def __init__(self) -> None:
self.host = "127.0.0.1"
self.host = ""
self.port = 4659

def read_section(self, parser: RawConfigParser, section: str) -> None:
Expand All @@ -260,6 +260,9 @@ def read_env(self, prefix: str) -> None:
read_env_item(self, "host", prefix + "FPSTORE_HOST")
read_env_item(self, "port", prefix + "FPSTORE_PORT", convert=int)

def is_enabled(self) -> bool:
return bool(self.host)


class RedisConfig(BaseConfig):
def __init__(self):
Expand Down
71 changes: 60 additions & 11 deletions acoustid/data/fingerprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from acoustid import chromaprint, const
from acoustid import tables as schema
from acoustid.db import FingerprintDB, IngestDB
from acoustid.fpstore import FpstoreClient
from acoustid.indexclient import Index, IndexClientError, IndexClientPool

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -47,26 +48,38 @@ def decode_fingerprint(fingerprint_string):


class FingerprintSearcher(object):
def __init__(self, db, index_pool, fast=True, timeout=None):
# type: (FingerprintDB, IndexClientPool, bool, Optional[float]) -> None
def __init__(
self,
db: FingerprintDB,
index_pool: IndexClientPool,
fpstore: Optional[FpstoreClient] = None,
fast: bool = True,
timeout: Optional[float] = None,
) -> None:
self.db = db
self.index_pool = index_pool
self.fpstore = fpstore
self.min_score = const.TRACK_GROUP_MERGE_THRESHOLD
self.max_length_diff = const.FINGERPRINT_MAX_LENGTH_DIFF
self.max_offset = const.TRACK_MAX_OFFSET
self.fast = fast
self.timeout = timeout

def _create_search_query(self, fp, length, condition, max_results):
# type: (List[int], int, Any, Optional[int]) -> Any
def _create_search_query(self, length: int, condition: Any, max_results: Optional[int], compare_to: Optional[List[int]] = None) -> Any:
# construct the subquery
f_columns = [
f_columns: List[Any] = [
schema.fingerprint.c.id,
schema.fingerprint.c.track_id,
sql.func.acoustid_compare2(
schema.fingerprint.c.fingerprint, fp, self.max_offset
).label("score"),
]
if compare_to:
f_columns.append(
sql.func.acoustid_compare2(
schema.fingerprint.c.fingerprint, compare_to, self.max_offset
).label("score"),
)
else:
f_columns.append(sql.literal_column("1.0").label("score"))

f_where = sql.and_(
condition,
schema.fingerprint.c.length.between(
Expand Down Expand Up @@ -135,8 +148,44 @@ def _get_max_indexed_fingerprint_id(self, index):
# type: (Index) -> int
return int(index.get_attribute("max_document_id") or "0")

def search(self, fp, length, max_results=None):
# type: (List[int], int, Optional[int]) -> List[FingerprintMatch]
def _search_via_fpstore(self, fp: List[int], length: int, max_results: Optional[int] = None) -> List[FingerprintMatch]:
assert self.fpstore is not None

if max_results is None:
max_results = 100

matching_fingerprints = self.fpstore.search(fp, limit=max_results, fast_mode=self.fast)
if not matching_fingerprints:
return []

matching_fingerprint_ids: Dict[int, float] = {}
for m in matching_fingerprints:
matching_fingerprint_ids[m.fingerprint_id] = m.score

query = self._create_search_query(
length, schema.fingerprint.c.id.in_(matching_fingerprint_ids.keys()), max_results=max_results
)
if self.timeout:
timeout_ms = int(self.timeout * 1000)
self.db.execute("SET LOCAL statement_timeout TO {}".format(timeout_ms))
try:
results = self.db.execute(query)
except OperationalError as ex:
if "canceling statement due to statement timeout" in str(ex):
return []
raise

matches = []
for result in results:
match = FingerprintMatch(*result)
match = match._replace(score=matching_fingerprint_ids[match.fingerprint_id])
matches.append(match)
return matches

def search(self, fp: List[int], length: int, max_results: Optional[int] = None) -> List[FingerprintMatch]:
if self.fpstore is not None:
return self._search_via_fpstore(fp, length, max_results)

conditions = []

if self.fast:
Expand Down Expand Up @@ -174,7 +223,7 @@ def search(self, fp, length, max_results=None):
return []

query = self._create_search_query(
fp, length, sql.or_(*conditions), max_results=max_results
length, sql.or_(*conditions), max_results=max_results, compare_to=fp
)
if self.timeout:
timeout_ms = int(self.timeout * 1000)
Expand Down
10 changes: 10 additions & 0 deletions acoustid/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from acoustid._release import GIT_RELEASE
from acoustid.config import Config
from acoustid.db import DatabaseContext
from acoustid.fpstore import FpstoreClient
from acoustid.indexclient import IndexClientPool
from acoustid.utils import LocalSysLogHandler

Expand Down Expand Up @@ -81,6 +82,12 @@ def __init__(self, config_path, tests=False):
retry=redis_retry,
)

self.fpstore = (
FpstoreClient(self.config.fpstore)
if self.config.fpstore.is_enabled()
else None
)

self._console_logging_configured = False
if not tests:
self.setup_logging()
Expand Down Expand Up @@ -171,3 +178,6 @@ def run_script(func, option_cb=None, master_only=False):
engine.dispose()

script.index.dispose()

if script.fpstore:
script.fpstore.close()
4 changes: 3 additions & 1 deletion acoustid/scripts/update_user_agent_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ def run_update_user_agent_stats(script: Script, partition: int):
redis.hdel(root_key, key)
else:
if script.config.cluster.role == "master":
update_user_agent_stats(db, application_id, date, user_agent, ip, count)
update_user_agent_stats(
db, application_id, date, user_agent, ip, count
)
else:
call_internal_api(
script.config,
Expand Down

0 comments on commit 9263984

Please sign in to comment.