diff --git a/setup.py b/setup.py index 5730379..c694778 100644 --- a/setup.py +++ b/setup.py @@ -182,7 +182,7 @@ class EagerWriter(Writer): __all__ = ["MATCHERS"] from typing import Tuple, List -from .core import UserAgentMatcher, OSMatcher, DeviceMatcher +from .matchers import UserAgentMatcher, OSMatcher, DeviceMatcher MATCHERS: Tuple[List[UserAgentMatcher], List[OSMatcher], List[DeviceMatcher]] = ([ """ diff --git a/src/ua_parser/__init__.py b/src/ua_parser/__init__.py index dcc06b5..2c6121d 100644 --- a/src/ua_parser/__init__.py +++ b/src/ua_parser/__init__.py @@ -16,29 +16,25 @@ This way importing anything but the top-level package should not be necessary unless you want to *implement* a parser. """ +from __future__ import annotations __all__ = [ - "BasicParser", - "CachingParser", + "BasicResolver", + "CachingResolver", "Clearing", "DefaultedParseResult", "Device", - "DeviceMatcher", "Domain", "LRU", "Locking", "Matchers", "OS", - "OSMatcher", "ParseResult", - "Parser", + "Resolver", "PartialParseResult", "UserAgent", - "UserAgentMatcher", "load_builtins", "load_lazy_builtins", - "load_data", - "load_yaml", "parse", "parse_device", "parse_os", @@ -48,43 +44,89 @@ import contextlib from typing import Callable, Optional -from .basic import Parser as BasicParser -from .caching import CachingParser, Clearing, Locking, LRU +from .basic import Resolver as BasicResolver +from .caching import CachingResolver, Clearing, Locking, LRU from .core import ( DefaultedParseResult, Device, - DeviceMatcher, Domain, Matchers, OS, - OSMatcher, - Parser, ParseResult, PartialParseResult, + Resolver, UserAgent, - UserAgentMatcher, ) -from .loaders import load_builtins, load_data, load_lazy_builtins, load_yaml +from .loaders import load_builtins, load_lazy_builtins -Re2Parser: Optional[Callable[[Matchers], Parser]] = None +Re2Resolver: Optional[Callable[[Matchers], Resolver]] = None with contextlib.suppress(ImportError): - from .re2 import Parser as Re2Parser + from .re2 import Resolver as Re2Resolver VERSION = (1, 0, 0) + + +class Parser: + @classmethod + def from_matchers(cls, m: Matchers, /) -> Parser: + if Re2Resolver is not None: + return cls(Re2Resolver(m)) + else: + return cls( + CachingResolver( + BasicResolver(m), + Locking(LRU(200)), + ) + ) + + def __init__(self, resolver: Resolver) -> None: + self.resolver = resolver + + def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: + """Parses the ``ua`` string, returning a parse result with *at least* + the requested :class:`domains ` resolved (whether to success or + failure). + + A parser may resolve more :class:`domains ` than + requested, but it *must not* resolve less. + """ + return self.resolver(ua, domains) + + def parse(self, ua: str) -> ParseResult: + """Convenience method for parsing all domains, and falling back to + default values for all failures. + """ + return self(ua, Domain.ALL).complete() + + def parse_user_agent(self, ua: str) -> Optional[UserAgent]: + """Convenience method for parsing the :class:`UserAgent` domain, + falling back to the default value in case of failure. + """ + return self(ua, Domain.USER_AGENT).user_agent + + def parse_os(self, ua: str) -> Optional[OS]: + """Convenience method for parsing the :class:`OS` domain, falling back + to the default value in case of failure. + """ + return self(ua, Domain.OS).os + + def parse_device(self, ua: str) -> Optional[Device]: + """Convenience method for parsing the :class:`Device` domain, falling + back to the default value in case of failure. + """ + return self(ua, Domain.DEVICE).device + + parser: Parser def __getattr__(name: str) -> Parser: global parser if name == "parser": - if Re2Parser is not None: - parser = Re2Parser(load_lazy_builtins()) - else: - parser = CachingParser( - BasicParser(load_builtins()), - Locking(LRU(200)), - ) + parser = Parser.from_matchers( + load_builtins() if Re2Resolver is None else load_lazy_builtins() + ) return parser raise AttributeError(f"module {__name__!r} has no attribute {name!r}") @@ -105,7 +147,7 @@ def parse(ua: str) -> ParseResult: # parser, a `global` access fails to and we get a NameError from . import parser - return parser.parse(ua) + return parser(ua, Domain.ALL).complete() def parse_user_agent(ua: str) -> Optional[UserAgent]: @@ -114,7 +156,7 @@ def parse_user_agent(ua: str) -> Optional[UserAgent]: """ from . import parser - return parser.parse_user_agent(ua) + return parser(ua, Domain.USER_AGENT).user_agent def parse_os(ua: str) -> Optional[OS]: @@ -123,7 +165,7 @@ def parse_os(ua: str) -> Optional[OS]: """ from . import parser - return parser.parse_os(ua) + return parser(ua, Domain.OS).os def parse_device(ua: str) -> Optional[Device]: @@ -132,4 +174,4 @@ def parse_device(ua: str) -> Optional[Device]: """ from . import parser - return parser.parse_device(ua) + return parser(ua, Domain.DEVICE).device diff --git a/src/ua_parser/_matchers.pyi b/src/ua_parser/_matchers.pyi index 7c4388a..2269fb4 100644 --- a/src/ua_parser/_matchers.pyi +++ b/src/ua_parser/_matchers.pyi @@ -2,7 +2,7 @@ __all__ = ["MATCHERS"] from typing import List, Tuple -from .core import DeviceMatcher, OSMatcher, UserAgentMatcher +from .matchers import DeviceMatcher, OSMatcher, UserAgentMatcher MATCHERS: Tuple[ List[UserAgentMatcher], diff --git a/src/ua_parser/basic.py b/src/ua_parser/basic.py index 58b4f6d..4575ac0 100644 --- a/src/ua_parser/basic.py +++ b/src/ua_parser/basic.py @@ -1,3 +1,5 @@ +__all__ = ["Resolver"] + from operator import methodcaller from typing import List @@ -7,13 +9,12 @@ Matcher, Matchers, OS, - Parser as AbstractParser, PartialParseResult, UserAgent, ) -class Parser(AbstractParser): +class Resolver: """A simple pure-python parser based around trying a numer of regular expressions in sequence for each domain, and returning a result when one matches. @@ -27,9 +28,7 @@ def __init__( self, matchers: Matchers, ) -> None: - self.user_agent_matchers = matchers[0] - self.os_matchers = matchers[1] - self.device_matchers = matchers[2] + self.user_agent_matchers, self.os_matchers, self.device_matchers = matchers def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: parse = methodcaller("__call__", ua) diff --git a/src/ua_parser/bench.py b/src/ua_parser/bench.py index b84bba3..e63ed2a 100644 --- a/src/ua_parser/bench.py +++ b/src/ua_parser/bench.py @@ -7,18 +7,18 @@ from typing import Any, Callable, Iterable, List, Optional from . import ( - BasicParser, - CachingParser, + BasicResolver, + CachingResolver, Clearing, Locking, LRU, Matchers, Parser, - load_builtins, - load_yaml, + Resolver, ) from .caching import Cache -from .re2 import Parser as Re2Parser +from .loaders import load_builtins, load_yaml +from .re2 import Resolver as Re2Resolver from .user_agent_parser import Parse CACHEABLE = { @@ -222,19 +222,19 @@ def run_csv(args: argparse.Namespace) -> None: def get_parser( parser: str, cache: str, cachesize: int, rules: Matchers ) -> Callable[[str], Any]: - p: Parser + r: Resolver if parser == "legacy": return Parse elif parser == "basic": - p = BasicParser(rules) + r = BasicResolver(rules) elif parser == "re2": - p = Re2Parser(rules) + r = Re2Resolver(rules) else: sys.exit(f"unknown parser {parser!r}") c: Callable[[int], Cache] if cache == "none": - return p.parse + return Parser(r).parse elif cache == "clearing": c = Clearing elif cache == "lru": @@ -244,7 +244,7 @@ def get_parser( else: sys.exit(f"unknown cache algorithm {cache!r}") - return CachingParser(p, c(cachesize)).parse + return Parser(CachingResolver(r, c(cachesize))).parse def run( diff --git a/src/ua_parser/caching.py b/src/ua_parser/caching.py index 358b574..f5667f4 100644 --- a/src/ua_parser/caching.py +++ b/src/ua_parser/caching.py @@ -1,12 +1,12 @@ import abc import threading from collections import OrderedDict -from typing import Dict, Optional +from typing import Dict, Optional, Protocol -from .core import Domain, Parser, PartialParseResult +from .core import Domain, PartialParseResult, Resolver __all__ = [ - "CachingParser", + "CachingResolver", "Cache", "Clearing", "Locking", @@ -14,7 +14,7 @@ ] -class Cache(abc.ABC): +class Cache(Protocol): """Cache abstract protocol. The :class:`CachingParser` will look values up, merge what was returned (possibly nothing) with what it got from its actual parser, and *re-set the result*. @@ -33,7 +33,7 @@ def __getitem__(self, key: str) -> Optional[PartialParseResult]: ... -class Clearing(Cache): +class Clearing: """A clearing cache, if the cache is full, just remove all the entries and re-fill from scratch. @@ -62,7 +62,7 @@ def __setitem__(self, key: str, value: PartialParseResult) -> None: self.cache[key] = value -class LRU(Cache): +class LRU: """Cache following a least-recently used replacement policy: when there is no more room in the cache, whichever entry was last seen the least recently is removed. @@ -103,7 +103,7 @@ def __setitem__(self, key: str, value: PartialParseResult) -> None: self.cache.popitem(last=False) -class Locking(Cache): +class Locking: """Locking cache decorator. Takes a non-thread-safe cache and ensures retrieving and setting entries is protected by a mutex. @@ -122,7 +122,7 @@ def __setitem__(self, key: str, value: PartialParseResult) -> None: self.cache[key] = value -class CachingParser(Parser): +class CachingResolver: """A wrapping parser which takes an underlying concrete :class:`Cache` for the actual caching and cache strategy. @@ -134,8 +134,8 @@ class CachingParser(Parser): really, they're immutable). """ - def __init__(self, parser: Parser, cache: Cache): - self.parser: Parser = parser + def __init__(self, parser: Resolver, cache: Cache): + self.parser: Resolver = parser self.cache: Cache = cache def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: diff --git a/src/ua_parser/core.py b/src/ua_parser/core.py index 54837e4..ca2fc75 100644 --- a/src/ua_parser/core.py +++ b/src/ua_parser/core.py @@ -1,22 +1,18 @@ import abc -import re from dataclasses import dataclass from enum import Flag, auto -from typing import Generic, List, Literal, Match, Optional, Pattern, Tuple, TypeVar +from typing import Callable, Generic, List, Optional, Tuple, TypeVar __all__ = [ "DefaultedParseResult", "Device", - "DeviceMatcher", "Domain", "Matchers", "OS", - "OSMatcher", "ParseResult", - "Parser", "PartialParseResult", + "Resolver", "UserAgent", - "UserAgentMatcher", ] @@ -155,70 +151,7 @@ def complete(self) -> ParseResult: ) -class Parser(abc.ABC): - @abc.abstractmethod - def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: - """Parses the ``ua`` string, returning a parse result with *at least* - the requested :class:`domains ` resolved (whether to success or - failure). - - A parser may resolve more :class:`domains ` than - requested, but it *must not* resolve less. - """ - ... - - def parse(self, ua: str) -> ParseResult: - """Convenience method for parsing all domains, and falling back to - default values for all failures. - """ - return self(ua, Domain.ALL).complete() - - def parse_user_agent(self, ua: str) -> Optional[UserAgent]: - """Convenience method for parsing the :class:`UserAgent` domain, - falling back to the default value in case of failure. - """ - return self(ua, Domain.USER_AGENT).user_agent - - def parse_os(self, ua: str) -> Optional[OS]: - """Convenience method for parsing the :class:`OS` domain, falling back - to the default value in case of failure. - """ - return self(ua, Domain.OS).os - - def parse_device(self, ua: str) -> Optional[Device]: - """Convenience method for parsing the :class:`Device` domain, falling - back to the default value in case of failure. - """ - return self(ua, Domain.DEVICE).device - - -def _get(m: Match[str], idx: int) -> Optional[str]: - return (m[idx] or None) if 0 < idx <= m.re.groups else None - - -def _replacer(repl: str, m: Match[str]) -> Optional[str]: - """The replacement rules are frustratingly subtle and innimical to - standard python fallback semantics: - - - if there is a non-null replacement pattern, then it must be used with - match groups as template parameters (at indices 1+) - - the result is stripped - - if it is an empty string, then it's replaced by a null - - otherwise fallback to a (possibly optional) match group - - or null (device brand has no fallback) - - Replacement rules only apply to OS and Device matchers, the UA - matcher has bespoke replacement semantics for the family (just - $1), and no replacement for the other fields, either there is a - static replacement or it falls back to the corresponding - (optional) match group. - - """ - if not repl: - return None - - return re.sub(r"\$(\d)", lambda n: _get(m, int(n[1])) or "", repl).strip() or None - +Resolver = Callable[[str, Domain], PartialParseResult] T = TypeVar("T") @@ -238,168 +171,6 @@ def flags(self) -> int: return 0 -class UserAgentMatcher(Matcher[UserAgent]): - regex: Pattern[str] - family: str - major: Optional[str] - minor: Optional[str] - patch: Optional[str] - patch_minor: Optional[str] - - def __init__( - self, - regex: str, - family: Optional[str] = None, - major: Optional[str] = None, - minor: Optional[str] = None, - patch: Optional[str] = None, - patch_minor: Optional[str] = None, - ) -> None: - self.regex = re.compile(regex) - self.family = family or "$1" - self.major = major - self.minor = minor - self.patch = patch - self.patch_minor = patch_minor - - def __call__(self, ua: str) -> Optional[UserAgent]: - if m := self.regex.search(ua): - return UserAgent( - family=( - self.family.replace("$1", m[1]) - if "$1" in self.family - else self.family - ), - major=self.major or _get(m, 2), - minor=self.minor or _get(m, 3), - patch=self.patch or _get(m, 4), - patch_minor=self.patch_minor or _get(m, 5), - ) - return None - - @property - def pattern(self) -> str: - return self.regex.pattern - - def __repr__(self) -> str: - fields = [ - ("family", self.family if self.family != "$1" else None), - ("major", self.major), - ("minor", self.minor), - ("patch", self.patch), - ("patch_minor", self.patch_minor), - ] - args = "".join(f", {k}={v!r}" for k, v in fields if v is not None) - - return f"UserAgentMatcher({self.pattern!r}{args})" - - -class OSMatcher(Matcher[OS]): - regex: Pattern[str] - family: str - major: str - minor: str - patch: str - patch_minor: str - - def __init__( - self, - regex: str, - family: Optional[str] = None, - major: Optional[str] = None, - minor: Optional[str] = None, - patch: Optional[str] = None, - patch_minor: Optional[str] = None, - ) -> None: - self.regex = re.compile(regex) - self.family = family or "$1" - self.major = major or "$2" - self.minor = minor or "$3" - self.patch = patch or "$4" - self.patch_minor = patch_minor or "$5" - - def __call__(self, ua: str) -> Optional[OS]: - if m := self.regex.search(ua): - family = _replacer(self.family, m) - if family is None: - raise ValueError(f"Unable to find OS family in {ua}") - return OS( - family=family, - major=_replacer(self.major, m), - minor=_replacer(self.minor, m), - patch=_replacer(self.patch, m), - patch_minor=_replacer(self.patch_minor, m), - ) - return None - - @property - def pattern(self) -> str: - return self.regex.pattern - - def __repr__(self) -> str: - fields = [ - ("family", self.family if self.family != "$1" else None), - ("major", self.major if self.major != "$2" else None), - ("minor", self.minor if self.minor != "$3" else None), - ("patch", self.patch if self.patch != "$4" else None), - ("patch_minor", self.patch_minor if self.patch_minor != "$5" else None), - ] - args = "".join(f", {k}={v!r}" for k, v in fields if v is not None) - - return f"OSMatcher({self.pattern!r}{args})" - - -class DeviceMatcher(Matcher[Device]): - regex: Pattern[str] - family: str - brand: str - model: str - - def __init__( - self, - regex: str, - regex_flag: Optional[Literal["i"]] = None, - family: Optional[str] = None, - brand: Optional[str] = None, - model: Optional[str] = None, - ) -> None: - self.regex = re.compile(regex, flags=re.IGNORECASE if regex_flag == "i" else 0) - self.family = family or "$1" - self.brand = brand or "" - self.model = model or "$1" - - def __call__(self, ua: str) -> Optional[Device]: - if m := self.regex.search(ua): - family = _replacer(self.family, m) - if family is None: - raise ValueError(f"Unable to find device family in {ua}") - return Device( - family=family, - brand=_replacer(self.brand, m), - model=_replacer(self.model, m), - ) - return None - - @property - def pattern(self) -> str: - return self.regex.pattern - - @property - def flags(self) -> int: - return self.regex.flags - - def __repr__(self) -> str: - fields = [ - ("family", self.family if self.family != "$1" else None), - ("brand", self.brand or None), - ("model", self.model if self.model != "$1" else None), - ] - iflag = ', "i"' if self.flags & re.IGNORECASE else "" - args = iflag + "".join(f", {k}={v!r}" for k, v in fields if v is not None) - - return f"DeviceMatcher({self.pattern!r}{args})" - - Matchers = Tuple[ List[Matcher[UserAgent]], List[Matcher[OS]], diff --git a/src/ua_parser/hitrates.py b/src/ua_parser/hitrates.py index a5739d5..61e19cd 100644 --- a/src/ua_parser/hitrates.py +++ b/src/ua_parser/hitrates.py @@ -1,29 +1,31 @@ import argparse import itertools +from typing import Callable, List from . import ( - CachingParser, + CachingResolver, Clearing, Domain, LRU, Parser, PartialParseResult, + Resolver, ) +from .caching import Cache -class Noop(Parser): - def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: - return PartialParseResult( - domains=domains, - string=ua, - user_agent=None, - os=None, - device=None, - ) +def Noop(ua: str, domains: Domain, /) -> PartialParseResult: + return PartialParseResult( + domains=domains, + string=ua, + user_agent=None, + os=None, + device=None, + ) -class Counter(Parser): - def __init__(self, parser: Parser) -> None: +class Counter: + def __init__(self, parser: Resolver) -> None: self.count = 0 self.parser = parser @@ -60,12 +62,13 @@ def main() -> None: print(total, "lines", uniques, "uniques") print(f"ideal hit rate: {(total - uniques)/total:.0%}") print() + caches: List[Callable[[int], Cache]] = [Clearing, LRU] for cache, cache_size in itertools.product( - [Clearing, LRU], + caches, args.cachesizes, ): - misses = Counter(Noop()) - parser = CachingParser(misses, cache(cache_size)) + misses = Counter(Noop) + parser = Parser(CachingResolver(misses, cache(cache_size))) for line in lines: parser.parse(line) diff --git a/src/ua_parser/lazy.py b/src/ua_parser/lazy.py index d9e0219..7311252 100644 --- a/src/ua_parser/lazy.py +++ b/src/ua_parser/lazy.py @@ -4,7 +4,8 @@ from functools import cached_property from typing import Literal, Optional, Pattern -from .core import Device, Matcher, OS, UserAgent, _get, _replacer +from .core import Device, Matcher, OS, UserAgent +from .utils import get, replacer class UserAgentMatcher(Matcher[UserAgent]): @@ -39,10 +40,10 @@ def __call__(self, ua: str) -> Optional[UserAgent]: if "$1" in self.family else self.family ), - major=self.major or _get(m, 2), - minor=self.minor or _get(m, 3), - patch=self.patch or _get(m, 4), - patch_minor=self.patch_minor or _get(m, 5), + major=self.major or get(m, 2), + minor=self.minor or get(m, 3), + patch=self.patch or get(m, 4), + patch_minor=self.patch_minor or get(m, 5), ) return None @@ -89,15 +90,15 @@ def __init__( def __call__(self, ua: str) -> Optional[OS]: if m := self.regex.search(ua): - family = _replacer(self.family, m) + family = replacer(self.family, m) if family is None: raise ValueError(f"Unable to find OS family in {ua}") return OS( family=family, - major=_replacer(self.major, m), - minor=_replacer(self.minor, m), - patch=_replacer(self.patch, m), - patch_minor=_replacer(self.patch_minor, m), + major=replacer(self.major, m), + minor=replacer(self.minor, m), + patch=replacer(self.patch, m), + patch_minor=replacer(self.patch_minor, m), ) return None @@ -141,13 +142,13 @@ def __init__( def __call__(self, ua: str) -> Optional[Device]: if m := self.regex.search(ua): - family = _replacer(self.family, m) + family = replacer(self.family, m) if family is None: raise ValueError(f"Unable to find device family in {ua}") return Device( family=family, - brand=_replacer(self.brand, m), - model=_replacer(self.model, m), + brand=replacer(self.brand, m), + model=replacer(self.model, m), ) return None diff --git a/src/ua_parser/loaders.py b/src/ua_parser/loaders.py index 66a294c..ab0ae34 100644 --- a/src/ua_parser/loaders.py +++ b/src/ua_parser/loaders.py @@ -1,14 +1,16 @@ from __future__ import annotations __all__ = [ + "DeviceDict", + "MatchersData", + "OSDict", + "UserAgentDict", "load_builtins", - "load_lazy_builtins", "load_data", + "load_json", + "load_lazy", + "load_lazy_builtins", "load_yaml", - "MatchersData", - "UserAgentDict", - "OSDict", - "DeviceDict", ] import io @@ -28,8 +30,8 @@ cast, ) -from . import lazy -from .core import DeviceMatcher, Matchers, OSMatcher, UserAgentMatcher +from . import lazy, matchers +from .core import Matchers if TYPE_CHECKING: PathOrFile = Union[str, os.PathLike[str], io.IOBase] @@ -93,7 +95,7 @@ class DeviceDict(_RegexDict, total=False): def load_data(d: MatchersData) -> Matchers: return ( [ - UserAgentMatcher( + matchers.UserAgentMatcher( p["regex"], p.get("family_replacement"), p.get("v1_replacement"), @@ -104,7 +106,7 @@ def load_data(d: MatchersData) -> Matchers: for p in d[0] ], [ - OSMatcher( + matchers.OSMatcher( p["regex"], p.get("os_replacement"), p.get("os_v1_replacement"), @@ -115,7 +117,7 @@ def load_data(d: MatchersData) -> Matchers: for p in d[1] ], [ - DeviceMatcher( + matchers.DeviceMatcher( p["regex"], p.get("regex_flag"), p.get("device_replacement"), diff --git a/src/ua_parser/matchers.py b/src/ua_parser/matchers.py new file mode 100644 index 0000000..6104da0 --- /dev/null +++ b/src/ua_parser/matchers.py @@ -0,0 +1,169 @@ +__all__ = ["UserAgentMatcher", "OSMatcher", "DeviceMatcher"] + +import re +from typing import Literal, Optional, Pattern + +from .core import Device, Matcher, OS, UserAgent +from .utils import get, replacer + + +class UserAgentMatcher(Matcher[UserAgent]): + regex: Pattern[str] + family: str + major: Optional[str] + minor: Optional[str] + patch: Optional[str] + patch_minor: Optional[str] + + def __init__( + self, + regex: str, + family: Optional[str] = None, + major: Optional[str] = None, + minor: Optional[str] = None, + patch: Optional[str] = None, + patch_minor: Optional[str] = None, + ) -> None: + self.regex = re.compile(regex) + self.family = family or "$1" + self.major = major + self.minor = minor + self.patch = patch + self.patch_minor = patch_minor + + def __call__(self, ua: str) -> Optional[UserAgent]: + if m := self.regex.search(ua): + return UserAgent( + family=( + self.family.replace("$1", m[1]) + if "$1" in self.family + else self.family + ), + major=self.major or get(m, 2), + minor=self.minor or get(m, 3), + patch=self.patch or get(m, 4), + patch_minor=self.patch_minor or get(m, 5), + ) + return None + + @property + def pattern(self) -> str: + return self.regex.pattern + + def __repr__(self) -> str: + fields = [ + ("family", self.family if self.family != "$1" else None), + ("major", self.major), + ("minor", self.minor), + ("patch", self.patch), + ("patch_minor", self.patch_minor), + ] + args = "".join(f", {k}={v!r}" for k, v in fields if v is not None) + + return f"UserAgentMatcher({self.pattern!r}{args})" + + +class OSMatcher(Matcher[OS]): + regex: Pattern[str] + family: str + major: str + minor: str + patch: str + patch_minor: str + + def __init__( + self, + regex: str, + family: Optional[str] = None, + major: Optional[str] = None, + minor: Optional[str] = None, + patch: Optional[str] = None, + patch_minor: Optional[str] = None, + ) -> None: + self.regex = re.compile(regex) + self.family = family or "$1" + self.major = major or "$2" + self.minor = minor or "$3" + self.patch = patch or "$4" + self.patch_minor = patch_minor or "$5" + + def __call__(self, ua: str) -> Optional[OS]: + if m := self.regex.search(ua): + family = replacer(self.family, m) + if family is None: + raise ValueError(f"Unable to find OS family in {ua}") + return OS( + family=family, + major=replacer(self.major, m), + minor=replacer(self.minor, m), + patch=replacer(self.patch, m), + patch_minor=replacer(self.patch_minor, m), + ) + return None + + @property + def pattern(self) -> str: + return self.regex.pattern + + def __repr__(self) -> str: + fields = [ + ("family", self.family if self.family != "$1" else None), + ("major", self.major if self.major != "$2" else None), + ("minor", self.minor if self.minor != "$3" else None), + ("patch", self.patch if self.patch != "$4" else None), + ("patch_minor", self.patch_minor if self.patch_minor != "$5" else None), + ] + args = "".join(f", {k}={v!r}" for k, v in fields if v is not None) + + return f"OSMatcher({self.pattern!r}{args})" + + +class DeviceMatcher(Matcher[Device]): + regex: Pattern[str] + family: str + brand: str + model: str + + def __init__( + self, + regex: str, + regex_flag: Optional[Literal["i"]] = None, + family: Optional[str] = None, + brand: Optional[str] = None, + model: Optional[str] = None, + ) -> None: + self.regex = re.compile(regex, flags=re.IGNORECASE if regex_flag == "i" else 0) + self.family = family or "$1" + self.brand = brand or "" + self.model = model or "$1" + + def __call__(self, ua: str) -> Optional[Device]: + if m := self.regex.search(ua): + family = replacer(self.family, m) + if family is None: + raise ValueError(f"Unable to find device family in {ua}") + return Device( + family=family, + brand=replacer(self.brand, m), + model=replacer(self.model, m), + ) + return None + + @property + def pattern(self) -> str: + return self.regex.pattern + + @property + def flags(self) -> int: + return self.regex.flags + + def __repr__(self) -> str: + fields = [ + ("family", self.family if self.family != "$1" else None), + ("brand", self.brand or None), + ("model", self.model if self.model != "$1" else None), + ] + iflag = ', "i"' if self.flags & re.IGNORECASE else "" + args = iflag + "".join(f", {k}={v!r}" for k, v in fields if v is not None) + + return f"DeviceMatcher({self.pattern!r}{args})" diff --git a/src/ua_parser/re2.py b/src/ua_parser/re2.py index 559879b..c8cdd0b 100644 --- a/src/ua_parser/re2.py +++ b/src/ua_parser/re2.py @@ -1,4 +1,4 @@ -from __future__ import annotations +__all__ = ["Resolver"] import re from typing import List @@ -11,13 +11,12 @@ Matcher, Matchers, OS, - Parser as AbstractParser, PartialParseResult, UserAgent, ) -class Parser(AbstractParser): +class Resolver: ua: re2.Filter user_agent_matchers: List[Matcher[UserAgent]] os: re2.Filter diff --git a/src/ua_parser/threaded.py b/src/ua_parser/threaded.py index 15b2390..a0a3d13 100644 --- a/src/ua_parser/threaded.py +++ b/src/ua_parser/threaded.py @@ -6,15 +6,15 @@ from typing import Iterable from . import ( - BasicParser, - CachingParser, + BasicResolver, + CachingResolver, Clearing, Locking, LRU, Parser, load_builtins, ) -from .re2 import Parser as Re2Parser +from .re2 import Resolver as Re2Resolver def worker( @@ -54,11 +54,11 @@ def main() -> None: args = ap.parse_args() lines = list(args.file) - basic = BasicParser(load_builtins()) + basic = BasicResolver(load_builtins()) for name, parser in [ - ("clearing", CachingParser(basic, Clearing(CACHESIZE))), - ("LRU", CachingParser(basic, Locking(LRU(CACHESIZE)))), - ("re2", Re2Parser(load_builtins())), + ("clearing", CachingResolver(basic, Clearing(CACHESIZE))), + ("LRU", CachingResolver(basic, Locking(LRU(CACHESIZE)))), + ("re2", Re2Resolver(load_builtins())), ]: # randomize the dataset for each thread, predictably, to # simulate distributed load (not great but better than diff --git a/src/ua_parser/utils.py b/src/ua_parser/utils.py new file mode 100644 index 0000000..f3afa48 --- /dev/null +++ b/src/ua_parser/utils.py @@ -0,0 +1,30 @@ +import re +from typing import Match, Optional + + +def get(m: Match[str], idx: int) -> Optional[str]: + return (m[idx] or None) if 0 < idx <= m.re.groups else None + + +def replacer(repl: str, m: Match[str]) -> Optional[str]: + """The replacement rules are frustratingly subtle and innimical to + standard python fallback semantics: + + - if there is a non-null replacement pattern, then it must be used with + match groups as template parameters (at indices 1+) + - the result is stripped + - if it is an empty string, then it's replaced by a null + - otherwise fallback to a (possibly optional) match group + - or null (device brand has no fallback) + + Replacement rules only apply to OS and Device matchers, the UA + matcher has bespoke replacement semantics for the family (just + $1), and no replacement for the other fields, either there is a + static replacement or it falls back to the corresponding + (optional) match group. + + """ + if not repl: + return None + + return re.sub(r"\$(\d)", lambda n: get(m, int(n[1])) or "", repl).strip() or None diff --git a/tests/test_caches.py b/tests/test_caches.py index 5969e46..e41d978 100644 --- a/tests/test_caches.py +++ b/tests/test_caches.py @@ -1,19 +1,18 @@ from collections import OrderedDict from ua_parser import ( - BasicParser, - CachingParser, + BasicResolver, + CachingResolver, Clearing, Device, - DeviceMatcher, Domain, LRU, OS, - OSMatcher, + Parser, PartialParseResult, UserAgent, - UserAgentMatcher, ) +from ua_parser.matchers import DeviceMatcher, OSMatcher, UserAgentMatcher def test_clearing(): @@ -21,7 +20,7 @@ def test_clearing(): entries. """ cache = Clearing(2) - p = CachingParser(BasicParser(([], [], [])), cache) + p = Parser(CachingResolver(BasicResolver(([], [], [])), cache)) p.parse("a") p.parse("b") @@ -42,7 +41,7 @@ def test_lru(): popped LRU-first. """ cache = LRU(2) - p = CachingParser(BasicParser(([], [], [])), cache) + p = Parser(CachingResolver(BasicResolver(([], [], [])), cache)) p.parse("a") p.parse("b") @@ -69,15 +68,17 @@ def test_backfill(): existing entry when new parts get parsed. """ cache = Clearing(2) - p = CachingParser( - BasicParser( - ( - [UserAgentMatcher("(a)")], - [OSMatcher("(a)")], - [DeviceMatcher("(a)")], - ) - ), - cache, + p = Parser( + CachingResolver( + BasicResolver( + ( + [UserAgentMatcher("(a)")], + [OSMatcher("(a)")], + [DeviceMatcher("(a)")], + ) + ), + cache, + ) ) p.parse_user_agent("a") diff --git a/tests/test_convenience_parser.py b/tests/test_convenience_parser.py new file mode 100644 index 0000000..7670e65 --- /dev/null +++ b/tests/test_convenience_parser.py @@ -0,0 +1,13 @@ +from ua_parser import Parser, ParseResult, PartialParseResult + + +def test_parser_utility() -> None: + """Tests that ``Parser``'s methods to behave as procedural + helpers, for users who may not wish to instantiate a parser or + something. + + Sadly the typing doesn't really play nicely with that. + + """ + r = Parser.parse(lambda s, d: PartialParseResult(d, None, None, None, s), "a") # type: ignore + assert r == ParseResult(None, None, None, "a") diff --git a/tests/test_core.py b/tests/test_core.py index 3a73faf..5d8eca8 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -26,34 +26,39 @@ from yaml import SafeLoader, load from ua_parser import ( - BasicParser, + BasicResolver, Device, OS, + Parser, ParseResult, UserAgent, - UserAgentMatcher, caching, load_builtins, load_lazy_builtins, ) +from ua_parser.matchers import UserAgentMatcher CORE_DIR = (pathlib.Path(__name__).parent.parent / "uap-core").resolve() PARSERS = [ - pytest.param(BasicParser(load_builtins()), id="basic"), - pytest.param(BasicParser(load_lazy_builtins()), id="lazy"), + pytest.param(Parser(BasicResolver(load_builtins())), id="basic"), + pytest.param(Parser(BasicResolver(load_lazy_builtins())), id="lazy"), pytest.param( - caching.CachingParser( - BasicParser(load_builtins()), - caching.Clearing(10), + Parser( + caching.CachingResolver( + BasicResolver(load_builtins()), + caching.Clearing(10), + ) ), id="clearing", ), pytest.param( - caching.CachingParser( - BasicParser(load_builtins()), - caching.LRU(10), + Parser( + caching.CachingResolver( + BasicResolver(load_builtins()), + caching.LRU(10), + ) ), id="lru", ), @@ -61,7 +66,7 @@ with contextlib.suppress(ImportError): from ua_parser import re2 - PARSERS.append(pytest.param(re2.Parser(load_builtins()), id="re2")) + PARSERS.append(pytest.param(Parser(re2.Resolver(load_builtins())), id="re2")) UA_FIELDS = {f.name for f in dataclasses.fields(UserAgent)} @@ -134,7 +139,7 @@ def test_devices(parser, test_file): def test_results(): - p = BasicParser(([UserAgentMatcher("(x)")], [], [])) + p = Parser(BasicResolver(([UserAgentMatcher("(x)")], [], []))) assert p.parse_user_agent("x") == UserAgent("x") assert p.parse_user_agent("y") is None diff --git a/tests/test_parsers_basics.py b/tests/test_parsers_basics.py index 9252745..895e89a 100644 --- a/tests/test_parsers_basics.py +++ b/tests/test_parsers_basics.py @@ -1,17 +1,17 @@ import io from ua_parser import ( - BasicParser, + BasicResolver, Domain, PartialParseResult, UserAgent, - UserAgentMatcher, - load_yaml, ) +from ua_parser.loaders import load_yaml +from ua_parser.matchers import UserAgentMatcher def test_trivial_matching(): - p = BasicParser(([UserAgentMatcher("(a)")], [], [])) + p = BasicResolver(([UserAgentMatcher("(a)")], [], [])) assert p("x", Domain.ALL) == PartialParseResult( string="x", @@ -31,7 +31,7 @@ def test_trivial_matching(): def test_partial(): - p = BasicParser(([UserAgentMatcher("(a)")], [], [])) + p = BasicResolver(([UserAgentMatcher("(a)")], [], [])) assert p("x", Domain.USER_AGENT) == PartialParseResult( string="x", @@ -60,7 +60,7 @@ def test_init_yaml(): device_parsers: [] """ ) - p = BasicParser(load_yaml(f)) + p = BasicResolver(load_yaml(f)) assert p("x", Domain.USER_AGENT) == PartialParseResult( string="x",