forked from ua-parser/uap-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
New API with full typing ======================== Seems pretty self-explanatory, rather than returning somewhat ad-hoc dicts this API works off of dataclasses, it should be compatible with the legacy version through the magic of ~~buying two of them~~ `dataclasses.asdict`. Parser API ========== The legacy version had "parsers" which really represent individual parsing rules. In the new API the job of a parser is what the top-level functions did, they wrap around the entire job of parsing a user-agent string. The core API is just `__call__`, with a selection flag for the domains (seems like the least bad term for what "user agent", "os", and "device" are, other alternatives I considered are "component" and "category", but I'm still ambivalent). Overridable helpers are provided which match the old API's methods (with PEP8 conventions), as well as the same style of helpers at the package toplevel. This resolves a number of limitations: Concurrency ----------- While the library should be thread-safe (and I need to find a way to test that) the ability to instantiate parsers should provide the opportunity for things like thread-local parsers, or actual parallelism if we start using native extensions (regex, re2). It also allows running multiple *parser configurations* concurrently, including e.g. multiple independent custom yaml sets. Not sure there's a use for it, but why not? At the very least it should make using custom YAML datasets much easier than having to set envvars. Customization ------------- Public APIs are provided both to instantiate and tune parsers, and to set the global parser. Hopefully this makes evaluating proposed parsers as well as evaluating & tuning caches (algorithm & size) easier. Even more so as we should provide some sort of evaluation CLI in ua-parser#163. Caches ------ In the old API, package-provided API could only be global and with a single implementation as it had to integrate with the toplevel parsing functions. By reifying the parsing job, a cache is just a parser which delegates the parse if it doesn't have a hit. This allows more easily providing, testing, and evolving alternative cache strategies. Bulk APIs --------- The current parser checks rules (regexes) one at a time on the input, but there are advanced regex APIs which can check a regex *set* and return which one(s) matched, allowing much more efficicent bulk matching e.g. google's re2, rust's regex. With the old scheme, this would be a pretty significant change in use / behaviour, obviating the use of the "parsers" with no recourse. Under the new parsing scheme, these can just be different "base" parsers, they can be the default, they can be cached, and users can instantiate their own parser instead. Fixes ua-parser#93, fixes ua-parser#142, closes ua-parser#116
- Loading branch information
Showing
11 changed files
with
765 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,66 @@ | ||
VERSION = (0, 16, 1) | ||
VERSION = (1, 0, 0) | ||
|
||
from typing import Optional | ||
from .types import * | ||
from ._re import Parser as BasicParser | ||
from .caching import CachingParser, Clearing, LRU | ||
|
||
|
||
_parser: Optional[Parser] = None | ||
|
||
|
||
def get_parser() -> Parser: | ||
"""Returns the global parser. | ||
Can be used to forcefully initialise the default parser if it's | ||
not initialised yet and no parser has been set. | ||
""" | ||
global _parser | ||
if _parser is None: | ||
_parser = CachingParser( | ||
BasicParser.from_regexes(), | ||
LRU(200), | ||
) | ||
|
||
return _parser | ||
|
||
|
||
def set_parser(p: Optional[Parser]): | ||
"""Sets the global parser. | ||
Can be used to set an application-specific parser or | ||
parser-configuration. | ||
""" | ||
global _parser | ||
_parser = p | ||
|
||
|
||
def parse(ua: str) -> ParseResult: | ||
"""Parses the :class:`.UserAgent`, :class:`.OS`, and :class:`.Device` | ||
information using the :func:`global parser <get_parser>`. | ||
Because each domain is usually parsed separately, prefer the | ||
domain-specific helpers if you're not going to use all of them. | ||
""" | ||
return get_parser().parse(ua) | ||
|
||
|
||
def parse_user_agent(ua: str) -> Optional[UserAgent]: | ||
"""Parses the :class:`browser <.UserAgent>` information using the | ||
:func:`global parser <get_parser>`. | ||
""" | ||
return get_parser().parse_user_agent(ua) | ||
|
||
|
||
def parse_os(ua: str) -> Optional[OS]: | ||
"""Parses the :class:`.OS` information using the :func:`global parser | ||
<get_parser>`. | ||
""" | ||
return get_parser().parse_os(ua) | ||
|
||
|
||
def parse_device(ua: str) -> Optional[Device]: | ||
"""Parses the :class:`.Device` information using the :func:`global | ||
parser <get_parser>`. | ||
""" | ||
return get_parser().parse_device(ua) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
from __future__ import annotations | ||
|
||
import io | ||
import os | ||
from itertools import starmap | ||
from typing import * | ||
from operator import methodcaller | ||
|
||
from dataclasses import dataclass | ||
from .types import ( | ||
Parser as BaseParser, | ||
PartialParseResult, | ||
Domain, | ||
UserAgent, | ||
OS, | ||
Device, | ||
) | ||
from .user_agent_parser import UserAgentParser, OSParser, DeviceParser | ||
|
||
load: Optional[Callable] | ||
SafeLoader: Optional[Type] | ||
try: | ||
from yaml import load, CSafeLoader as SafeLoader | ||
except ImportError: | ||
try: | ||
from yaml import load, SafeLoader | ||
except ImportError: | ||
load = SafeLoader = None | ||
|
||
|
||
@dataclass | ||
class Parser(BaseParser): | ||
"""Basic pure-python parser implementation. Tries every parser, | ||
sequentially, until it finds a match (or falls off). | ||
""" | ||
|
||
user_agent_parsers: List[UserAgentParser] | ||
os_parsers: List[OSParser] | ||
device_parsers: List[DeviceParser] | ||
|
||
@classmethod | ||
def from_regexes(cls) -> Parser: | ||
"""Instantiates a parser from the pre-compiled regex set. Currently | ||
not a singleton, but essentially free anyway after the initial | ||
call (which loads the pre-compiled code). | ||
""" | ||
from ._regexes import USER_AGENT_PARSERS, DEVICE_PARSERS, OS_PARSERS | ||
|
||
return cls( | ||
user_agent_parsers=USER_AGENT_PARSERS, | ||
os_parsers=OS_PARSERS, | ||
device_parsers=DEVICE_PARSERS, | ||
) | ||
|
||
if load: | ||
|
||
@classmethod | ||
def from_yaml(cls, path: Union[str, os.PathLike | io.IOBase]) -> Parser: | ||
"""Instantiates a parser from a YAML file-like object or path.""" | ||
if isinstance(path, (str, os.PathLike)): | ||
with open(path) as fp: | ||
regexes = load(fp, Loader=SafeLoader) # type: ignore | ||
else: | ||
regexes = load(path, Loader=SafeLoader) # type: ignore | ||
|
||
return cls( | ||
user_agent_parsers=[ | ||
UserAgentParser( | ||
p["regex"], | ||
p.get("family_replacement"), | ||
p.get("v1_replacement"), | ||
p.get("v2_replacement"), | ||
) | ||
for p in regexes["user_agent_parsers"] | ||
], | ||
os_parsers=[ | ||
OSParser( | ||
p["regex"], | ||
p.get("os_replacement"), | ||
p.get("os_v1_replacement"), | ||
p.get("os_v2_replacement"), | ||
p.get("os_v3_replacement"), | ||
p.get("os_v4_replacement"), | ||
) | ||
for p in regexes["os_parsers"] | ||
], | ||
device_parsers=[ | ||
DeviceParser( | ||
p["regex"], | ||
p.get("regex_flag"), | ||
p.get("device_replacement"), | ||
p.get("brand_replacement"), | ||
p.get("model_replacement"), | ||
) | ||
for p in regexes["device_parsers"] | ||
], | ||
) | ||
|
||
def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: | ||
parse = methodcaller("Parse", ua) | ||
return PartialParseResult( | ||
domains=domains, | ||
string=ua, | ||
user_agent=next( | ||
(UserAgent(*m) for m in map(parse, self.user_agent_parsers) if m[0]), | ||
None, | ||
) | ||
if Domain.USER_AGENT in domains | ||
else None, | ||
os=next( | ||
(OS(*m) for m in map(parse, self.os_parsers) if m[0]), | ||
None, | ||
) | ||
if Domain.OS in domains | ||
else None, | ||
device=next( | ||
(Device(*m) for m in map(parse, self.device_parsers) if m[0]), | ||
None, | ||
) | ||
if Domain.DEVICE in domains | ||
else None, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from typing import List | ||
from .user_agent_parser import UserAgentParser, OSParser, DeviceParser | ||
|
||
USER_AGENT_PARSERS: List[UserAgentParser] | ||
OS_PARSERS: List[OSParser] | ||
DEVICE_PARSERS: List[DeviceParser] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
import abc | ||
from collections import OrderedDict | ||
from typing import Dict, Optional, MutableMapping | ||
from .types import Parser, Domain, PartialParseResult | ||
|
||
|
||
class Cache(abc.ABC): | ||
@abc.abstractmethod | ||
def __setitem__(self, key: str, value: PartialParseResult): | ||
... | ||
|
||
@abc.abstractmethod | ||
def __getitem__(self, key: str) -> Optional[PartialParseResult]: | ||
... | ||
|
||
|
||
class Clearing(Cache): | ||
"""A clearing cache, if the cache is full, just remove all the entries | ||
and re-fill from scratch. | ||
This can also be used as a permanent cache by setting the | ||
``maxsize`` to infinity (or at least some very large value), | ||
however this is probably a bad idea as it *will* lead to an | ||
ever-growing memory allocation, until every possible user agent | ||
string has been seen. | ||
""" | ||
|
||
def __init__(self, maxsize: int): | ||
self.maxsize = maxsize | ||
self.cache: Dict[str, PartialParseResult] = {} | ||
|
||
def __getitem__(self, key: str) -> Optional[PartialParseResult]: | ||
return self.cache.get(key) | ||
|
||
def __setitem__(self, key: str, value: PartialParseResult): | ||
if key not in self.cache and len(self.cache) >= self.maxsize: | ||
self.cache.clear() | ||
|
||
self.cache[key] = value | ||
|
||
|
||
class LRU(Cache): | ||
"""Cache following a least-recently used replacement policy: when | ||
there is no more room in the cache, whichever entry was last seen | ||
the least recently is removed. | ||
""" | ||
|
||
def __init__(self, maxsize: int): | ||
self.maxsize = maxsize | ||
self.cache: OrderedDict[str, PartialParseResult] = OrderedDict() | ||
|
||
def __getitem__(self, key: str) -> Optional[PartialParseResult]: | ||
e = self.cache.get(key) | ||
if e: | ||
self.cache.move_to_end(key) | ||
return e | ||
|
||
def __setitem__(self, key: str, value: PartialParseResult): | ||
self.cache[key] = value | ||
self.cache.move_to_end(key) | ||
while len(self.cache) > self.maxsize: | ||
self.cache.popitem(last=False) | ||
|
||
|
||
class CachingParser(Parser): | ||
"""Decorating parser which can take a :class:`Cache` parameter in | ||
order to cache parse results (based on user-agent strings). | ||
""" | ||
|
||
def __init__(self, parser: Parser, cache: Cache): | ||
self.parser: Parser = parser | ||
self.cache: Cache = cache | ||
|
||
def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: | ||
entry = self.cache[ua] | ||
if entry: | ||
if domains in entry.domains: | ||
return entry | ||
|
||
domains &= ~entry.domains | ||
|
||
r = self.parser(ua, domains) | ||
if entry: | ||
r = PartialParseResult( | ||
string=ua, | ||
domains=entry.domains | r.domains, | ||
user_agent=entry.user_agent or r.user_agent, | ||
os=entry.os or r.os, | ||
device=entry.device or r.device, | ||
) | ||
self.cache[ua] = r | ||
return r |
Oops, something went wrong.