diff --git a/pyproject.toml b/pyproject.toml index 48b6aa91..b42d432e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] name = "ua-parser" description = "Python port of Browserscope's user agent parser" -version = "1.0.0a" +version = "1.0.0a1" readme = "README.rst" requires-python = ">=3.8" dependencies = [] diff --git a/src/ua_parser/__init__.py b/src/ua_parser/__init__.py index f1c0a2a2..bdf34440 100644 --- a/src/ua_parser/__init__.py +++ b/src/ua_parser/__init__.py @@ -1 +1,66 @@ -VERSION = (0, 16, 1) +VERSION = (1, 0, 0) + +from typing import Optional +from .types import * +from ._re import Parser as BasicParser +from .caching import CachingParser, Clearing, LRU + + +_parser: Optional[Parser] = None + + +def get_parser() -> Parser: + """Returns the global parser. + + Can be used to forcefully initialise the default parser if it's + not initialised yet and no parser has been set. + """ + global _parser + if _parser is None: + _parser = CachingParser( + BasicParser.from_regexes(), + LRU(200), + ) + + return _parser + + +def set_parser(p: Optional[Parser]): + """Sets the global parser. + + Can be used to set an application-specific parser or + parser-configuration. + """ + global _parser + _parser = p + + +def parse(ua: str) -> ParseResult: + """Parses the :class:`.UserAgent`, :class:`.OS`, and :class:`.Device` + information using the :func:`global parser `. + + Because each domain is usually parsed separately, prefer the + domain-specific helpers if you're not going to use all of them. + """ + return get_parser().parse(ua) + + +def parse_user_agent(ua: str) -> Optional[UserAgent]: + """Parses the :class:`browser <.UserAgent>` information using the + :func:`global parser `. + """ + return get_parser().parse_user_agent(ua) + + +def parse_os(ua: str) -> Optional[OS]: + """Parses the :class:`.OS` information using the :func:`global parser + `. + """ + return get_parser().parse_os(ua) + + +def parse_device(ua: str) -> Optional[Device]: + """Parses the :class:`.Device` information using the :func:`global + parser `. + """ + return get_parser().parse_device(ua) diff --git a/src/ua_parser/_re.py b/src/ua_parser/_re.py new file mode 100644 index 00000000..807b52cc --- /dev/null +++ b/src/ua_parser/_re.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +import io +import os +from itertools import starmap +from typing import * +from operator import methodcaller + +from dataclasses import dataclass +from .types import ( + Parser as BaseParser, + PartialParseResult, + Domain, + UserAgent, + OS, + Device, +) +from .user_agent_parser import UserAgentParser, OSParser, DeviceParser + +load: Optional[Callable] +SafeLoader: Optional[Type] +try: + from yaml import load, CSafeLoader as SafeLoader +except ImportError: + try: + from yaml import load, SafeLoader + except ImportError: + load = SafeLoader = None + + +@dataclass +class Parser(BaseParser): + """Basic pure-python parser implementation. Tries every parser, + sequentially, until it finds a match (or falls off). + """ + + user_agent_parsers: List[UserAgentParser] + os_parsers: List[OSParser] + device_parsers: List[DeviceParser] + + @classmethod + def from_regexes(cls) -> Parser: + """Instantiates a parser from the pre-compiled regex set. Currently + not a singleton, but essentially free anyway after the initial + call (which loads the pre-compiled code). + """ + from ._regexes import USER_AGENT_PARSERS, DEVICE_PARSERS, OS_PARSERS + + return cls( + user_agent_parsers=USER_AGENT_PARSERS, + os_parsers=OS_PARSERS, + device_parsers=DEVICE_PARSERS, + ) + + if load: + + @classmethod + def from_yaml(cls, path: Union[str, os.PathLike | io.IOBase]) -> Parser: + """Instantiates a parser from a YAML file-like object or path.""" + if isinstance(path, (str, os.PathLike)): + with open(path) as fp: + regexes = load(fp, Loader=SafeLoader) # type: ignore + else: + regexes = load(path, Loader=SafeLoader) # type: ignore + + return cls( + user_agent_parsers=[ + UserAgentParser( + p["regex"], + p.get("family_replacement"), + p.get("v1_replacement"), + p.get("v2_replacement"), + ) + for p in regexes["user_agent_parsers"] + ], + os_parsers=[ + OSParser( + p["regex"], + p.get("os_replacement"), + p.get("os_v1_replacement"), + p.get("os_v2_replacement"), + p.get("os_v3_replacement"), + p.get("os_v4_replacement"), + ) + for p in regexes["os_parsers"] + ], + device_parsers=[ + DeviceParser( + p["regex"], + p.get("regex_flag"), + p.get("device_replacement"), + p.get("brand_replacement"), + p.get("model_replacement"), + ) + for p in regexes["device_parsers"] + ], + ) + + def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: + parse = methodcaller("Parse", ua) + return PartialParseResult( + domains=domains, + string=ua, + user_agent=next( + (UserAgent(*m) for m in map(parse, self.user_agent_parsers) if m[0]), + None, + ) + if Domain.USER_AGENT in domains + else None, + os=next( + (OS(*m) for m in map(parse, self.os_parsers) if m[0]), + None, + ) + if Domain.OS in domains + else None, + device=next( + (Device(*m) for m in map(parse, self.device_parsers) if m[0]), + None, + ) + if Domain.DEVICE in domains + else None, + ) diff --git a/src/ua_parser/_regexes.pyi b/src/ua_parser/_regexes.pyi new file mode 100644 index 00000000..9050f8b0 --- /dev/null +++ b/src/ua_parser/_regexes.pyi @@ -0,0 +1,6 @@ +from typing import List +from .user_agent_parser import UserAgentParser, OSParser, DeviceParser + +USER_AGENT_PARSERS: List[UserAgentParser] +OS_PARSERS: List[OSParser] +DEVICE_PARSERS: List[DeviceParser] diff --git a/src/ua_parser/caching.py b/src/ua_parser/caching.py new file mode 100644 index 00000000..57962dbc --- /dev/null +++ b/src/ua_parser/caching.py @@ -0,0 +1,92 @@ +import abc +from collections import OrderedDict +from typing import Dict, Optional, MutableMapping +from .types import Parser, Domain, PartialParseResult + + +class Cache(abc.ABC): + @abc.abstractmethod + def __setitem__(self, key: str, value: PartialParseResult): + ... + + @abc.abstractmethod + def __getitem__(self, key: str) -> Optional[PartialParseResult]: + ... + + +class Clearing(Cache): + """A clearing cache, if the cache is full, just remove all the entries + and re-fill from scratch. + + This can also be used as a permanent cache by setting the + ``maxsize`` to infinity (or at least some very large value), + however this is probably a bad idea as it *will* lead to an + ever-growing memory allocation, until every possible user agent + string has been seen. + """ + + def __init__(self, maxsize: int): + self.maxsize = maxsize + self.cache: Dict[str, PartialParseResult] = {} + + def __getitem__(self, key: str) -> Optional[PartialParseResult]: + return self.cache.get(key) + + def __setitem__(self, key: str, value: PartialParseResult): + if key not in self.cache and len(self.cache) >= self.maxsize: + self.cache.clear() + + self.cache[key] = value + + +class LRU(Cache): + """Cache following a least-recently used replacement policy: when + there is no more room in the cache, whichever entry was last seen + the least recently is removed. + """ + + def __init__(self, maxsize: int): + self.maxsize = maxsize + self.cache: OrderedDict[str, PartialParseResult] = OrderedDict() + + def __getitem__(self, key: str) -> Optional[PartialParseResult]: + e = self.cache.get(key) + if e: + self.cache.move_to_end(key) + return e + + def __setitem__(self, key: str, value: PartialParseResult): + self.cache[key] = value + self.cache.move_to_end(key) + while len(self.cache) > self.maxsize: + self.cache.popitem(last=False) + + +class CachingParser(Parser): + """Decorating parser which can take a :class:`Cache` parameter in + order to cache parse results (based on user-agent strings). + """ + + def __init__(self, parser: Parser, cache: Cache): + self.parser: Parser = parser + self.cache: Cache = cache + + def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: + entry = self.cache[ua] + if entry: + if domains in entry.domains: + return entry + + domains &= ~entry.domains + + r = self.parser(ua, domains) + if entry: + r = PartialParseResult( + string=ua, + domains=entry.domains | r.domains, + user_agent=entry.user_agent or r.user_agent, + os=entry.os or r.os, + device=entry.device or r.device, + ) + self.cache[ua] = r + return r diff --git a/src/ua_parser/types.py b/src/ua_parser/types.py new file mode 100644 index 00000000..664aff55 --- /dev/null +++ b/src/ua_parser/types.py @@ -0,0 +1,179 @@ +import abc + +from dataclasses import dataclass +from enum import Flag, auto +from typing import * + +__all__ = [ + "Parser", + "PartialParseResult", + "ParseResult", + "DefaultedParseResult", + "Domain", + "UserAgent", + "OS", + "Device", +] + + +@dataclass(frozen=True) +class UserAgent: + """Browser ("user agent" aka the software responsible for the request) + information parsed from the user agent string. + """ + + family: str = "Other" + major: Optional[str] = None + minor: Optional[str] = None + patch: Optional[str] = None + + +@dataclass(frozen=True) +class OS: + """OS information parsed from the user agent string.""" + + family: str = "Other" + major: Optional[str] = None + minor: Optional[str] = None + patch: Optional[str] = None + patch_minor: Optional[str] = None + + +@dataclass(frozen=True) +class Device: + """Device information parsed from the user agent string.""" + + family: str = "Other" + brand: Optional[str] = None + model: Optional[str] = None + + +class Domain(Flag): + """Hint for selecting which domains are requested when asking for a + :class:`ParseResult`. + """ + + #: browser (user agent) domain + USER_AGENT = auto() + #: os domain + OS = auto() + #: device domain + DEVICE = auto() + #: shortcut for all three domains + ALL = USER_AGENT | OS | DEVICE + + +@dataclass(frozen=True) +class DefaultedParseResult: + """Variant of :class:`.ParseResult` where attributes are set + to a default value if the parse fails. + + For all domains, the default value has ``family`` set to + ``"Other"`` and every other attribute set to ``None``. + """ + + user_agent: UserAgent + os: OS + device: Device + string: str + + +@dataclass(frozen=True) +class ParseResult: + """Complete parser result with fallback. + + For each attribute (and domain), either the parse was a success (a + match was found) and the corresponding data is set, or it was a + failure and the value is `None`. + """ + + user_agent: Optional[UserAgent] + os: Optional[OS] + device: Optional[Device] + string: str + + def with_defaults(self): + return DefaultedParseResult( + user_agent=self.user_agent or UserAgent(), + os=self.os or OS(), + device=self.device or Device(), + string=self.string, + ) + + +@dataclass(frozen=True) +class PartialParseResult: + """Potentially partial (incomplete) parser result. + + Domain fields (``user_agent``, ``os``, and ``device``) can be: + + - unset if not parsed yet + - set to a parsing failure + - set to a parsing success + + The `domains` flags specify which is which: if a `Domain` + flag is set, the corresponding attribute was looked up and is + either ``None`` for a parsing failure (no match was found) or a + value for a parsing success. + + If the flag is unset, the field has not been looked up yet. + """ + + domains: Domain + user_agent: Optional[UserAgent] + os: Optional[OS] + device: Optional[Device] + string: str + + def complete(self) -> ParseResult: + """Requires that the result be fully resolved (every attribute is set, + even if to a lookup failure). + + Replaces lookup failures by default values. + """ + if self.domains != Domain.ALL: + raise ValueError("Only a result with all attributes set can be completed") + + return ParseResult( + user_agent=self.user_agent, + os=self.os, + device=self.device, + string=self.string, + ) + + +class Parser(abc.ABC): + @abc.abstractmethod + def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: + """Parses the ``ua`` string, returning a parse result with *at least* + the requested :class:`domains ` resolved (whether to success or + failure). + + A parser may resolve more :class:`domains ` than + requested, but it *must not* resolve less. + """ + ... + + def parse(self, ua: str) -> ParseResult: + """Convenience method for parsing all domains, and falling back to + default values for all failures. + """ + return self(ua, Domain.ALL).complete() + + def parse_user_agent(self, ua: str) -> Optional[UserAgent]: + """Convenience method for parsing the :class:`UserAgent` domain, + falling back to the default value in case of failure. + """ + return self(ua, Domain.USER_AGENT).user_agent + + def parse_os(self, ua: str) -> Optional[OS]: + """Convenience method for parsing the :class:`OS` domain, falling back + to the default value in case of failure. + """ + return self(ua, Domain.OS).os + + def parse_device(self, ua: str) -> Optional[Device]: + """Convenience method for parsing the :class:`Device` domain, falling + back to the default value in case of failure. + """ + return self(ua, Domain.DEVICE).device diff --git a/tests/test_caches.py b/tests/test_caches.py new file mode 100644 index 00000000..cc8cf193 --- /dev/null +++ b/tests/test_caches.py @@ -0,0 +1,99 @@ +from collections import OrderedDict + +from ua_parser import ( + BasicParser, + PartialParseResult, + Domain, + UserAgent, + OS, + Device, + CachingParser, + Clearing, + LRU, +) + +from ua_parser.user_agent_parser import ( + UserAgentParser, + OSParser, + DeviceParser, +) + + +def test_clearing(): + """Tests that the cache correctly gets cleared to make room for new + entries. + """ + cache = Clearing(2) + p = CachingParser(BasicParser([], [], []), cache) + + p.parse("a") + p.parse("b") + + assert cache.cache == { + "a": PartialParseResult(Domain.ALL, None, None, None, "a"), + "b": PartialParseResult(Domain.ALL, None, None, None, "b"), + } + + p.parse("c") + assert cache.cache == { + "c": PartialParseResult(Domain.ALL, None, None, None, "c"), + } + + +def test_lru(): + """Tests that the cache entries do get moved when accessed, and are + popped LRU-first. + """ + cache = LRU(2) + p = CachingParser(BasicParser([], [], []), cache) + + p.parse("a") + p.parse("b") + + assert cache.cache == OrderedDict( + [ + ("a", PartialParseResult(Domain.ALL, None, None, None, "a")), + ("b", PartialParseResult(Domain.ALL, None, None, None, "b")), + ] + ) + + p.parse("a") + p.parse("c") + assert cache.cache == OrderedDict( + [ + ("a", PartialParseResult(Domain.ALL, None, None, None, "a")), + ("c", PartialParseResult(Domain.ALL, None, None, None, "c")), + ] + ) + + +def test_backfill(): + """Tests that caches handle partial parsing correctly, by updating the + existing entry when new parts get parsed. + """ + cache = Clearing(2) + p = CachingParser( + BasicParser( + [UserAgentParser("(a)")], + [OSParser("(a)")], + [DeviceParser("(a)")], + ), + cache, + ) + + p.parse_user_agent("a") + assert cache.cache == { + "a": PartialParseResult(Domain.USER_AGENT, UserAgent("a"), None, None, "a"), + } + p("a", Domain.OS) + assert cache.cache == { + "a": PartialParseResult( + Domain.USER_AGENT | Domain.OS, UserAgent("a"), OS("a"), None, "a" + ), + } + p.parse("a") + assert cache.cache == { + "a": PartialParseResult( + Domain.ALL, UserAgent("a"), OS("a"), Device("a", None, "a"), "a" + ), + } diff --git a/tests/test_core.py b/tests/test_core.py new file mode 100644 index 00000000..70ba578f --- /dev/null +++ b/tests/test_core.py @@ -0,0 +1,117 @@ +"""Tests UAP-Python using the UAP-core test suite +""" +import dataclasses +import logging +import pathlib +import platform +from operator import attrgetter + +import pytest # type: ignore + +if platform.python_implementation() == "PyPy": + from yaml import load, SafeLoader +else: + try: + from yaml import load, CSafeLoader as SafeLoader # type: ignore + except ImportError: + logging.getLogger(__name__).warning( + "PyYaml C extension not available to run tests, this will result " + "in dramatic tests slowdown." + ) + from yaml import load, SafeLoader + +from ua_parser import * +from ua_parser.user_agent_parser import UserAgentParser + +CORE_DIR = (pathlib.Path(__name__).parent.parent / "uap-core").resolve() + + +PARSERS = [ + pytest.param(BasicParser.from_regexes(), id="compiled"), +] + + +UA_FIELDS = {f.name for f in dataclasses.fields(UserAgent)} + + +@pytest.mark.parametrize("parser", PARSERS) +@pytest.mark.parametrize( + "test_file", + [ + CORE_DIR / "tests" / "test_ua.yaml", + CORE_DIR / "test_resources" / "firefox_user_agent_strings.yaml", + CORE_DIR / "test_resources" / "pgts_browser_list.yaml", + ], + ids=attrgetter("name"), +) +def test_ua(parser, test_file): + with test_file.open("rb") as f: + contents = load(f, Loader=SafeLoader) + + for test_case in contents["test_cases"]: + res = {k: v for k, v in test_case.items() if k in UA_FIELDS} + r = parser.parse_user_agent(test_case["user_agent_string"]) or UserAgent() + assert dataclasses.asdict(r) == res + + +OS_FIELDS = {f.name for f in dataclasses.fields(OS)} + + +@pytest.mark.parametrize("parser", PARSERS) +@pytest.mark.parametrize( + "test_file", + [ + CORE_DIR / "tests" / "test_os.yaml", + CORE_DIR / "test_resources" / "additional_os_tests.yaml", + ], + ids=attrgetter("name"), +) +def test_os(parser, test_file): + with test_file.open("rb") as f: + contents = load(f, Loader=SafeLoader) + + for test_case in contents["test_cases"]: + res = {k: v for k, v in test_case.items() if k in OS_FIELDS} + r = parser.parse_os(test_case["user_agent_string"]) or OS() + assert dataclasses.asdict(r) == res + + +DEVICE_FIELDS = {f.name for f in dataclasses.fields(Device)} + + +@pytest.mark.parametrize("parser", PARSERS) +@pytest.mark.parametrize( + "test_file", + [ + CORE_DIR / "tests" / "test_device.yaml", + ], + ids=attrgetter("name"), +) +def test_devices(parser, test_file): + with test_file.open("rb") as f: + contents = load(f, Loader=SafeLoader) + + for test_case in contents["test_cases"]: + res = {k: v for k, v in test_case.items() if k in DEVICE_FIELDS} + r = parser.parse_device(test_case["user_agent_string"]) or Device() + assert dataclasses.asdict(r) == res + + +def test_results(): + p = BasicParser([UserAgentParser("(x)")], [], []) + + assert p.parse_user_agent("x") == UserAgent("x") + assert p.parse_user_agent("y") is None + + assert p.parse("x") == ParseResult( + user_agent=UserAgent("x"), + os=None, + device=None, + string="x", + ) + assert p.parse("y") == ParseResult( + user_agent=None, + os=None, + device=None, + string="y", + ) diff --git a/tests/test_iterative.py b/tests/test_iterative.py new file mode 100644 index 00000000..f167182a --- /dev/null +++ b/tests/test_iterative.py @@ -0,0 +1,71 @@ +import io +from ua_parser import BasicParser, PartialParseResult, Domain, UserAgent +from ua_parser.user_agent_parser import UserAgentParser + + +def test_trivial_matching(): + p = BasicParser([UserAgentParser("(a)")], [], []) + + assert p("x", Domain.ALL) == PartialParseResult( + string="x", + domains=Domain.ALL, + user_agent=None, + os=None, + device=None, + ) + + assert p("a", Domain.ALL) == PartialParseResult( + string="a", + domains=Domain.ALL, + user_agent=UserAgent("a"), + os=None, + device=None, + ) + + +def test_partial(): + p = BasicParser([UserAgentParser("(a)")], [], []) + + assert p("x", Domain.USER_AGENT) == PartialParseResult( + string="x", + domains=Domain.USER_AGENT, + user_agent=None, + os=None, + device=None, + ) + + assert p("a", Domain.USER_AGENT) == PartialParseResult( + string="a", + domains=Domain.USER_AGENT, + user_agent=UserAgent("a"), + os=None, + device=None, + ) + + +def test_init_yaml(): + f = io.BytesIO( + b"""\ +user_agent_parsers: + - regex: (a) +os_parsers: [] +device_parsers: [] +""" + ) + p = BasicParser.from_yaml(f) + + assert p("x", Domain.USER_AGENT) == PartialParseResult( + string="x", + domains=Domain.USER_AGENT, + user_agent=None, + os=None, + device=None, + ) + + assert p("a", Domain.USER_AGENT) == PartialParseResult( + string="a", + domains=Domain.USER_AGENT, + user_agent=UserAgent("a"), + os=None, + device=None, + ) diff --git a/tests/test_legacy.py b/tests/test_legacy.py index 03feeda3..ea3f3257 100644 --- a/tests/test_legacy.py +++ b/tests/test_legacy.py @@ -4,14 +4,14 @@ import sys import warnings -import pytest +import pytest # type: ignore import yaml if platform.python_implementation() == "PyPy": from yaml import SafeLoader else: try: - from yaml import CSafeLoader as SafeLoader + from yaml import CSafeLoader as SafeLoader # type: ignore except ImportError: logging.getLogger(__name__).warning( "PyYaml C extension not available to run tests, this will result " diff --git a/tox.ini b/tox.ini index 57b17f90..36ac52da 100644 --- a/tox.ini +++ b/tox.ini @@ -2,12 +2,12 @@ min_version = 4.0 env_list = py3{8,9,10,11,12} pypy3.{8,9,10} - flake8, black + flake8, black, typecheck labels = test = py3{8,9,10,11,12},pypy3.{8,9,10} cpy = py3{8,9,10,11,12} pypy = pypy3.{8,9,10} - check = flake8, black + check = flake8, black, typecheck [testenv] # wheel install @@ -30,7 +30,14 @@ commands = flake8 {posargs} [testenv:black] package = skip deps = black -commands = black --check --diff . +commands = black --check --diff {posargs:.} + +[testenv:typecheck] +package = skip +deps = + mypy + types-PyYaml +commands = mypy --check-untyped-defs --no-implicit-optional {posargs:src tests} [flake8] max_line_length = 88