From 4dcee6312aaa20269ea0b4f2571bbf35bf6175d1 Mon Sep 17 00:00:00 2001 From: yxie66 Date: Thu, 12 Mar 2020 15:26:34 -0700 Subject: [PATCH 1/3] feat(data_connector): implement pagination fix(data_connector): read pagination config on ImplicitTable class --- dataprep/data_connector/connector.py | 144 +++++++++++++++--- dataprep/data_connector/implicit_database.py | 8 +- dataprep/data_connector/schema.json | 9 ++ .../tests/data_connector/test_integration.py | 6 +- 4 files changed, 147 insertions(+), 20 deletions(-) diff --git a/dataprep/data_connector/connector.py b/dataprep/data_connector/connector.py index 269e8f1c8..525771051 100644 --- a/dataprep/data_connector/connector.py +++ b/dataprep/data_connector/connector.py @@ -2,7 +2,7 @@ This module contains the Connector class. Every data fetching action should begin with instantiating this Connector class. """ - +import math from pathlib import Path from typing import Any, Dict, List, Optional @@ -55,17 +55,17 @@ class Connector: >>> dc = Connector("yelp", auth_params={"access_token":access_token}) """ - _impdb: ImplicitDatabase - _vars: Dict[str, Any] - _auth_params: Dict[str, Any] - _session: Session - _jenv: Environment + impdb: ImplicitDatabase + vars: Dict[str, Any] + _auth: Dict[str, Any] + session: Session + jenv: Environment def __init__( self, config_path: str, - auth_params: Optional[Dict[str, Any]] = None, - **kwargs: Any, + _auth: Optional[Dict[str, Any]] = None, + **kwargs: Dict[str, Any], ) -> None: self._session = Session() if ( @@ -79,16 +79,16 @@ def __init__( ensure_config(config_path) path = config_directory() / config_path - self._impdb = ImplicitDatabase(path) - - self._vars = kwargs - self._auth_params = auth_params or {} - self._jenv = Environment(undefined=StrictUndefined) + self.vars = kwargs + self._auth = _auth or {} + self.jenv = Environment(undefined=StrictUndefined) def _fetch( self, table: ImplicitTable, - auth_params: Optional[Dict[str, Any]], + _auth: Optional[Dict[str, Any]], + _count: Optional[int], + _cursor: Optional[int], kwargs: Dict[str, Any], ) -> Response: method = table.method @@ -98,17 +98,17 @@ def _fetch( "params": {}, "cookies": {}, } - merged_vars = {**self._vars, **kwargs} - if table.authorization is not None: - table.authorization.build(req_data, auth_params or self._auth_params) + if table.authorization is not None: + table.authorization.build(req_data, _auth or self._auth) for key in ["headers", "params", "cookies"]: if getattr(table, key) is not None: instantiated_fields = getattr(table, key).populate( self._jenv, merged_vars ) req_data[key].update(**instantiated_fields) + if table.body is not None: # TODO: do we support binary body? instantiated_fields = table.body.populate(self._jenv, merged_vars) @@ -119,6 +119,30 @@ def _fetch( else: raise UnreachableError + if not table.pag_params: + resp: Response = self._session.send( # type: ignore + Request( + method=method, + url=url, + headers=req_data["headers"], + params=req_data["params"], + json=req_data.get("json"), + data=req_data.get("data"), + cookies=req_data["cookies"], + ).prepare() + ) + return resp + + pag_type = table.pag_params["type"] + count_key = table.pag_params["count_key"] + cursor_key = "" + if pag_type == "cursor": + cursor_key = table.pag_params["cursor_key"] + if pag_type == "limit": + cursor_key = table.pag_params["anchor_key"] + req_data["params"][count_key] = _count + req_data["params"][cursor_key] = _cursor + resp: Response = self._session.send( # type: ignore Request( method=method, @@ -136,6 +160,92 @@ def _fetch( return resp + def query( + self, + table: str, + _auth: Optional[Dict[str, Any]] = None, + _count: Optional[int] = None, + **where: Dict[str, Any], + ) -> pd.DataFrame: + """ + Query the API to get a table. + + Parameters + ---------- + table : str + The table name. + _auth : Optional[Dict[str, Any]] = None + The parameters for authentication. Usually the authentication parameters + should be defined when instantiating the Connector. In case some tables have different + authentication options, a different authentication parameter can be defined here. + This parameter will override the one from Connector if passed. + _count: Optional[int] = None + count of returned records. + **where: Any + The additional parameters required for the query. + """ + assert table in self.impdb.tables, f"No such table {table} in {self.impdb.name}" + + itable = self.impdb.tables[table] + + if not itable.pag_params: + resp = self._fetch( + table=itable, _auth=_auth, _count=-1, _cursor=-1, kwargs=where + ) + df = itable.from_response(resp) + return df + + max_count = int(itable.pag_params["max_count"]) + df = pd.DataFrame() + last_id = 0 + pag_type = itable.pag_params["type"] + + if _count is None: + _count = max_count + resp = self._fetch( + table=itable, _auth=_auth, _count=_count, _cursor=0, kwargs=where + ) + df = itable.from_response(resp) + + else: + cnt_to_fetch = 0 + count = _count or 1 + n_page = math.ceil(count / max_count) + remain = count % max_count + for i in range(n_page): + if i < n_page - 1: + cnt_to_fetch = max_count + else: + cnt_to_fetch = remain if remain > 0 else max_count + if pag_type == "cursor": + resp = self._fetch( + table=itable, + _auth=_auth, + _count=cnt_to_fetch, + _cursor=last_id - 1, + kwargs=where, + ) + elif pag_type == "limit": + resp = self._fetch( + table=itable, + _auth=_auth, + _count=cnt_to_fetch, + _cursor=i * max_count, + kwargs=where, + ) + else: + raise NotImplementedError + df_ = itable.from_response(resp) + if pag_type == "cursor": + last_id = int(df_[itable.pag_params["cursor_id"]][len(df_) - 1]) - 1 + if i == 0: + df = df_.copy() + else: + df = pd.concat([df, df_], axis=0) + df.reset_index(drop=True, inplace=True) + + return df + @property def table_names(self) -> List[str]: """ diff --git a/dataprep/data_connector/implicit_database.py b/dataprep/data_connector/implicit_database.py index 049787f36..25646a3f5 100644 --- a/dataprep/data_connector/implicit_database.py +++ b/dataprep/data_connector/implicit_database.py @@ -54,6 +54,7 @@ class ImplicitTable: # pylint: disable=too-many-instance-attributes body_ctype: str body: Optional[Fields] = None cookies: Optional[Fields] = None + pag_params: Dict[str, str] # Response related ctype: str @@ -67,7 +68,6 @@ def __init__(self, name: str, config: Dict[str, Any]) -> None: ) # This will throw errors if validate failed self.name = name self.config = config - request_def = config["request"] self.method = request_def["method"] @@ -85,6 +85,11 @@ def __init__(self, name: str, config: Dict[str, Any]) -> None: raise NotImplementedError self.authorization = Authorization(auth_type=auth_type, params=auth_params) + if "pagination" in request_def: + self.pag_params = request_def["pagination"] + else: + self.pag_params = {} + for key in ["headers", "params", "cookies"]: if key in request_def: setattr(self, key, Fields(request_def[key])) @@ -230,7 +235,6 @@ def __init__(self, config_path: Union[str, Path]) -> None: if table_config_path.suffix != ".json": # ifnote non json file continue - with open(table_config_path) as f: table_config = jload(f) diff --git a/dataprep/data_connector/schema.json b/dataprep/data_connector/schema.json index 27368ba67..964eec0cd 100644 --- a/dataprep/data_connector/schema.json +++ b/dataprep/data_connector/schema.json @@ -61,6 +61,15 @@ "params": { "$ref": "#/definitions/fields" }, + "pagination": { + "$id": "#/properties/request/properties/pagination", + "type":"object", + "anchor_key":"string", + "count_key":"string", + "cursor_id":"string", + "cursor_key":"string", + "max_count":"string" + }, "body": { "$id": "#/properties/request/properties/body", "type": "object", diff --git a/dataprep/tests/data_connector/test_integration.py b/dataprep/tests/data_connector/test_integration.py index dc0955fc5..28dfc8f46 100644 --- a/dataprep/tests/data_connector/test_integration.py +++ b/dataprep/tests/data_connector/test_integration.py @@ -4,7 +4,7 @@ def test_data_connector() -> None: token = environ["DATAPREP_DATA_CONNECTOR_YELP_TOKEN"] - dc = Connector("yelp", auth_params={"access_token": token}) + dc = Connector("yelp", _auth={"access_token": token}) df = dc.query("businesses", term="ramen", location="vancouver") assert len(df) > 0 @@ -14,3 +14,7 @@ def test_data_connector() -> None: schema = dc.show_schema("businesses") assert len(schema) > 0 + + df = dc.query("businesses", _count=120, term="ramen", location="vancouver") + + assert len(df) > 0 From e729213da5655a2fcb798e022e432c800f48e8e5 Mon Sep 17 00:00:00 2001 From: yxie66 Date: Fri, 1 May 2020 22:39:49 -0700 Subject: [PATCH 2/3] test(data_connector): add integration test for pagination feature --- dataprep/data_connector/connector.py | 220 ++++++++----------- dataprep/data_connector/implicit_database.py | 42 +++- dataprep/data_connector/schema.json | 30 ++- 3 files changed, 156 insertions(+), 136 deletions(-) diff --git a/dataprep/data_connector/connector.py b/dataprep/data_connector/connector.py index 525771051..ee208f902 100644 --- a/dataprep/data_connector/connector.py +++ b/dataprep/data_connector/connector.py @@ -55,11 +55,11 @@ class Connector: >>> dc = Connector("yelp", auth_params={"access_token":access_token}) """ - impdb: ImplicitDatabase - vars: Dict[str, Any] + _impdb: ImplicitDatabase + _vars: Dict[str, Any] _auth: Dict[str, Any] - session: Session - jenv: Environment + _session: Session + _jenv: Environment def __init__( self, @@ -79,9 +79,11 @@ def __init__( ensure_config(config_path) path = config_directory() / config_path - self.vars = kwargs + self._impdb = ImplicitDatabase(path) + + self._vars = kwargs self._auth = _auth or {} - self.jenv = Environment(undefined=StrictUndefined) + self._jenv = Environment(undefined=StrictUndefined) def _fetch( self, @@ -119,29 +121,30 @@ def _fetch( else: raise UnreachableError - if not table.pag_params: - resp: Response = self._session.send( # type: ignore - Request( - method=method, - url=url, - headers=req_data["headers"], - params=req_data["params"], - json=req_data.get("json"), - data=req_data.get("data"), - cookies=req_data["cookies"], - ).prepare() - ) - return resp - - pag_type = table.pag_params["type"] - count_key = table.pag_params["count_key"] - cursor_key = "" - if pag_type == "cursor": - cursor_key = table.pag_params["cursor_key"] - if pag_type == "limit": - cursor_key = table.pag_params["anchor_key"] - req_data["params"][count_key] = _count - req_data["params"][cursor_key] = _cursor + if table.pag_params is not None: + if table.pag_params.type == "null": + resp: Response = self._session.send( # type: ignore + Request( + method=method, + url=url, + headers=req_data["headers"], + params=req_data["params"], + json=req_data.get("json"), + data=req_data.get("data"), + cookies=req_data["cookies"], + ).prepare() + ) + return resp + + pag_type = table.pag_params.type + count_key = table.pag_params.count_key + cursor_key = "" + if pag_type == "cursor": + cursor_key = table.pag_params.cursor_key + if pag_type == "limit": + cursor_key = table.pag_params.anchor_key + req_data["params"][count_key] = _count + req_data["params"][cursor_key] = _cursor resp: Response = self._session.send( # type: ignore Request( @@ -165,11 +168,10 @@ def query( table: str, _auth: Optional[Dict[str, Any]] = None, _count: Optional[int] = None, - **where: Dict[str, Any], + **where: Any, ) -> pd.DataFrame: """ Query the API to get a table. - Parameters ---------- table : str @@ -184,65 +186,68 @@ def query( **where: Any The additional parameters required for the query. """ - assert table in self.impdb.tables, f"No such table {table} in {self.impdb.name}" + assert ( + table in self._impdb.tables + ), f"No such table {table} in {self._impdb.name}" - itable = self.impdb.tables[table] + itable = self._impdb.tables[table] - if not itable.pag_params: - resp = self._fetch( - table=itable, _auth=_auth, _count=-1, _cursor=-1, kwargs=where - ) - df = itable.from_response(resp) - return df - - max_count = int(itable.pag_params["max_count"]) - df = pd.DataFrame() - last_id = 0 - pag_type = itable.pag_params["type"] - - if _count is None: - _count = max_count - resp = self._fetch( - table=itable, _auth=_auth, _count=_count, _cursor=0, kwargs=where - ) - df = itable.from_response(resp) + if itable.pag_params is not None: + if itable.pag_params.type == "null": + resp = self._fetch( + table=itable, _auth=_auth, _count=-1, _cursor=-1, kwargs=where + ) + df = itable.from_response(resp) + return df + + max_count = int(itable.pag_params.max_count) + df = pd.DataFrame() + last_id = 0 + pag_type = itable.pag_params.type + + if _count is None: + _count = max_count + resp = self._fetch( + table=itable, _auth=_auth, _count=_count, _cursor=0, kwargs=where + ) + df = itable.from_response(resp) - else: - cnt_to_fetch = 0 - count = _count or 1 - n_page = math.ceil(count / max_count) - remain = count % max_count - for i in range(n_page): - if i < n_page - 1: - cnt_to_fetch = max_count - else: - cnt_to_fetch = remain if remain > 0 else max_count - if pag_type == "cursor": - resp = self._fetch( - table=itable, - _auth=_auth, - _count=cnt_to_fetch, - _cursor=last_id - 1, - kwargs=where, - ) - elif pag_type == "limit": - resp = self._fetch( - table=itable, - _auth=_auth, - _count=cnt_to_fetch, - _cursor=i * max_count, - kwargs=where, - ) - else: - raise NotImplementedError - df_ = itable.from_response(resp) - if pag_type == "cursor": - last_id = int(df_[itable.pag_params["cursor_id"]][len(df_) - 1]) - 1 - if i == 0: - df = df_.copy() - else: - df = pd.concat([df, df_], axis=0) - df.reset_index(drop=True, inplace=True) + else: + cnt_to_fetch = 0 + count = _count or 1 + n_page = math.ceil(count / max_count) + remain = count % max_count + for i in range(n_page): + remain = remain if remain > 0 else max_count + cnt_to_fetch = max_count if i < n_page - 1 else remain + if pag_type == "cursor": + resp = self._fetch( + table=itable, + _auth=_auth, + _count=cnt_to_fetch, + _cursor=last_id - 1, + kwargs=where, + ) + elif pag_type == "limit": + resp = self._fetch( + table=itable, + _auth=_auth, + _count=cnt_to_fetch, + _cursor=i * max_count, + kwargs=where, + ) + else: + raise NotImplementedError + df_ = itable.from_response(resp) + if pag_type == "cursor": + last_id = ( + int(df_[itable.pag_params.cursor_id][len(df_) - 1]) - 1 + ) + if i == 0: + df = df_.copy() + else: + df = pd.concat([df, df_], axis=0) + df.reset_index(drop=True, inplace=True) return df @@ -250,7 +255,6 @@ def query( def table_names(self) -> List[str]: """ Return all the names of the available tables in a list. - Note ---- We abstract each website as a database containing several tables. @@ -294,17 +298,14 @@ def show_schema(self, table_name: str) -> pd.DataFrame: """ This method shows the schema of the table that will be returned, so that the user knows what information to expect. - Parameters ---------- table_name The table name. - Returns ------- pd.DataFrame The returned data's schema. - Note ---- The schema is defined in the configuration file. @@ -320,40 +321,3 @@ def show_schema(self, table_name: str) -> pd.DataFrame: new_schema_dict["column_name"].append(k) new_schema_dict["data_type"].append(schema[k]["type"]) return pd.DataFrame.from_dict(new_schema_dict) - - def query( - self, table: str, auth_params: Optional[Dict[str, Any]] = None, **where: Any, - ) -> pd.DataFrame: - """ - Use this method to query the API and get the returned table. - - Example - ------- - >>> df = dc.query('businesses', term="korean", location="vancouver) - - Parameters - ---------- - table - The table name. - auth_params - The parameters for authentication. Usually the authentication parameters - should be defined when instantiating the Connector. In case some tables have different - authentication options, a different authentication parameter can be defined here. - This parameter will override the one from Connector if passed. - where - The additional parameters required for the query. - - Returns - ------- - pd.DataFrame - A DataFrame that contains the data returned by the website API. - """ - assert ( - table in self._impdb.tables - ), f"No such table {table} in {self._impdb.name}" - - itable = self._impdb.tables[table] - - resp = self._fetch(itable, auth_params, where) - - return itable.from_response(resp) diff --git a/dataprep/data_connector/implicit_database.py b/dataprep/data_connector/implicit_database.py index 25646a3f5..8df7a3108 100644 --- a/dataprep/data_connector/implicit_database.py +++ b/dataprep/data_connector/implicit_database.py @@ -37,6 +37,19 @@ class SchemaField(NamedTuple): description: Optional[str] +class Pagination: + """ + Schema of Pagination field + """ + + type: str + anchor_key: str + count_key: str + cursor_id: str + cursor_key: str + max_count: str + + class ImplicitTable: # pylint: disable=too-many-instance-attributes """ ImplicitTable class abstracts the request and the response to a Restful API, @@ -54,7 +67,7 @@ class ImplicitTable: # pylint: disable=too-many-instance-attributes body_ctype: str body: Optional[Fields] = None cookies: Optional[Fields] = None - pag_params: Dict[str, str] + pag_params: Optional[Pagination] # Response related ctype: str @@ -86,9 +99,32 @@ def __init__(self, name: str, config: Dict[str, Any]) -> None: self.authorization = Authorization(auth_type=auth_type, params=auth_params) if "pagination" in request_def: - self.pag_params = request_def["pagination"] + self.pag_params = Pagination() + self.pag_params.type = request_def["pagination"]["type"] + self.pag_params.max_count = request_def["pagination"]["max_count"] + self.pag_params.count_key = ( + request_def["pagination"]["count_key"] + if "count_key" in request_def["pagination"] + else "" + ) + self.pag_params.anchor_key = ( + request_def["pagination"]["anchor_key"] + if "anchor_key" in request_def["pagination"] + else "" + ) + self.pag_params.cursor_id = ( + request_def["pagination"]["cursor_id"] + if "cursor_id" in request_def["pagination"] + else "" + ) + self.pag_params.cursor_key = ( + request_def["pagination"]["cursor_key"] + if "cursor_key" in request_def["pagination"] + else "" + ) else: - self.pag_params = {} + self.pag_params = Pagination() + self.pag_params.type = "null" for key in ["headers", "params", "cookies"]: if key in request_def: diff --git a/dataprep/data_connector/schema.json b/dataprep/data_connector/schema.json index 964eec0cd..2176e6e41 100644 --- a/dataprep/data_connector/schema.json +++ b/dataprep/data_connector/schema.json @@ -64,11 +64,31 @@ "pagination": { "$id": "#/properties/request/properties/pagination", "type":"object", - "anchor_key":"string", - "count_key":"string", - "cursor_id":"string", - "cursor_key":"string", - "max_count":"string" + "properties": { + "type": { "type": "string" }, + "max_count": { "type": "string" }, + "anchor_key":{ + "type": "string", + "optional": true + }, + "count_key":{ + "type": "string", + "optional": true + }, + "cursor_id":{ + "type": "string", + "optional": true + }, + "cursor_key":{ + "type": "string", + "optional": true + } + }, + "required": [ + "type", + "max_count" + ], + "additionalProperties": false }, "body": { "$id": "#/properties/request/properties/body", From b1ced65b15d867a9a6d14314230a3cc92c5d3676 Mon Sep 17 00:00:00 2001 From: Weiyuan Wu Date: Tue, 26 May 2020 23:22:42 -0700 Subject: [PATCH 3/3] refactor: refactor pagination implementation --- dataprep/data_connector/connector.py | 159 +++++++++--------- dataprep/data_connector/errors.py | 17 ++ dataprep/data_connector/implicit_database.py | 49 ++---- dataprep/data_connector/schema.json | 22 ++- .../tests/data_connector/test_integration.py | 6 +- 5 files changed, 132 insertions(+), 121 deletions(-) diff --git a/dataprep/data_connector/connector.py b/dataprep/data_connector/connector.py index ee208f902..d5cffb44d 100644 --- a/dataprep/data_connector/connector.py +++ b/dataprep/data_connector/connector.py @@ -12,10 +12,9 @@ from ..errors import UnreachableError from .config_manager import config_directory, ensure_config -from .errors import RequestError +from .errors import RequestError, UniversalParameterOverridden from .implicit_database import ImplicitDatabase, ImplicitTable - INFO_TEMPLATE = Template( """{% for tb in tbs.keys() %} Table {{dbname}}.{{tb}} @@ -85,14 +84,19 @@ def __init__( self._auth = _auth or {} self._jenv = Environment(undefined=StrictUndefined) - def _fetch( + def _fetch( # pylint: disable=too-many-locals,too-many-branches self, table: ImplicitTable, - _auth: Optional[Dict[str, Any]], - _count: Optional[int], - _cursor: Optional[int], + *, + _count: Optional[int] = None, + _cursor: Optional[int] = None, + _auth: Optional[Dict[str, Any]] = None, kwargs: Dict[str, Any], ) -> Response: + assert (_count is None) == ( + _cursor is None + ), "_cursor and _count should both be None or not None" + method = table.method url = table.url req_data: Dict[str, Dict[str, Any]] = { @@ -104,6 +108,7 @@ def _fetch( if table.authorization is not None: table.authorization.build(req_data, _auth or self._auth) + for key in ["headers", "params", "cookies"]: if getattr(table, key) is not None: instantiated_fields = getattr(table, key).populate( @@ -121,29 +126,24 @@ def _fetch( else: raise UnreachableError - if table.pag_params is not None: - if table.pag_params.type == "null": - resp: Response = self._session.send( # type: ignore - Request( - method=method, - url=url, - headers=req_data["headers"], - params=req_data["params"], - json=req_data.get("json"), - data=req_data.get("data"), - cookies=req_data["cookies"], - ).prepare() - ) - return resp - + if table.pag_params is not None and _count is not None: pag_type = table.pag_params.type count_key = table.pag_params.count_key - cursor_key = "" if pag_type == "cursor": + assert table.pag_params.cursor_key is not None cursor_key = table.pag_params.cursor_key - if pag_type == "limit": + elif pag_type == "limit": + assert table.pag_params.anchor_key is not None cursor_key = table.pag_params.anchor_key + else: + raise UnreachableError() + + if count_key in req_data["params"]: + raise UniversalParameterOverridden(count_key, "_count") req_data["params"][count_key] = _count + + if cursor_key in req_data["params"]: + raise UniversalParameterOverridden(cursor_key, "_cursor") req_data["params"][cursor_key] = _cursor resp: Response = self._session.send( # type: ignore @@ -163,7 +163,7 @@ def _fetch( return resp - def query( + def query( # pylint: disable=too-many-locals self, table: str, _auth: Optional[Dict[str, Any]] = None, @@ -192,62 +192,61 @@ def query( itable = self._impdb.tables[table] - if itable.pag_params is not None: - if itable.pag_params.type == "null": - resp = self._fetch( - table=itable, _auth=_auth, _count=-1, _cursor=-1, kwargs=where - ) - df = itable.from_response(resp) - return df - - max_count = int(itable.pag_params.max_count) - df = pd.DataFrame() - last_id = 0 - pag_type = itable.pag_params.type - - if _count is None: - _count = max_count - resp = self._fetch( - table=itable, _auth=_auth, _count=_count, _cursor=0, kwargs=where - ) - df = itable.from_response(resp) - - else: - cnt_to_fetch = 0 - count = _count or 1 - n_page = math.ceil(count / max_count) - remain = count % max_count - for i in range(n_page): - remain = remain if remain > 0 else max_count - cnt_to_fetch = max_count if i < n_page - 1 else remain - if pag_type == "cursor": - resp = self._fetch( - table=itable, - _auth=_auth, - _count=cnt_to_fetch, - _cursor=last_id - 1, - kwargs=where, - ) - elif pag_type == "limit": - resp = self._fetch( - table=itable, - _auth=_auth, - _count=cnt_to_fetch, - _cursor=i * max_count, - kwargs=where, - ) - else: - raise NotImplementedError - df_ = itable.from_response(resp) - if pag_type == "cursor": - last_id = ( - int(df_[itable.pag_params.cursor_id][len(df_) - 1]) - 1 - ) - if i == 0: - df = df_.copy() - else: - df = pd.concat([df, df_], axis=0) - df.reset_index(drop=True, inplace=True) + if itable.pag_params is None: + resp = self._fetch(table=itable, _auth=_auth, kwargs=where) + df = itable.from_response(resp) + return df + + # Pagination is not None + max_count = itable.pag_params.max_count + dfs = [] + last_id = 0 + pag_type = itable.pag_params.type + + if _count is None: + # User doesn't specify _count + resp = self._fetch(table=itable, _auth=_auth, kwargs=where) + df = itable.from_response(resp) + else: + cnt_to_fetch = 0 + count = _count or 1 + n_page = math.ceil(count / max_count) + remain = count % max_count + for i in range(n_page): + remain = remain if remain > 0 else max_count + cnt_to_fetch = max_count if i < n_page - 1 else remain + + if pag_type == "cursor": + resp = self._fetch( + table=itable, + _auth=_auth, + _count=cnt_to_fetch, + _cursor=last_id - 1, + kwargs=where, + ) + elif pag_type == "limit": + resp = self._fetch( + table=itable, + _auth=_auth, + _count=cnt_to_fetch, + _cursor=i * max_count, + kwargs=where, + ) + else: + raise NotImplementedError + df_ = itable.from_response(resp) + + if len(df_) == 0: + # The API returns empty for this page, maybe we've reached the end + break + + if pag_type == "cursor": + last_id = int(df_[itable.pag_params.cursor_id][len(df_) - 1]) - 1 + + dfs.append(df_) + + df = pd.concat(dfs, axis=0) + df.reset_index(drop=True, inplace=True) return df diff --git a/dataprep/data_connector/errors.py b/dataprep/data_connector/errors.py index 408a1089d..c773ef573 100644 --- a/dataprep/data_connector/errors.py +++ b/dataprep/data_connector/errors.py @@ -32,3 +32,20 @@ def __init__(self, status_code: int, message: str) -> None: def __str__(self) -> str: return f"RequestError: status={self.status_code}, message={self.message}" + + +class UniversalParameterOverridden(Exception): + """ + The parameter is overrided by the universal parameter + """ + + param: str + uparam: str + + def __init__(self, param: str, uparam: str) -> None: + super().__init__() + self.param = param + self.uparam = uparam + + def __str__(self) -> str: + return f"the parameter {self.param} is overridden by {self.uparam}" diff --git a/dataprep/data_connector/implicit_database.py b/dataprep/data_connector/implicit_database.py index 8df7a3108..347db4b7c 100644 --- a/dataprep/data_connector/implicit_database.py +++ b/dataprep/data_connector/implicit_database.py @@ -43,11 +43,20 @@ class Pagination: """ type: str - anchor_key: str count_key: str - cursor_id: str - cursor_key: str - max_count: str + max_count: int + anchor_key: Optional[str] + cursor_id: Optional[str] + cursor_key: Optional[str] + + def __init__(self, pdef: Dict[str, Any]) -> None: + + self.type = pdef["type"] + self.max_count = pdef["max_count"] + self.count_key = pdef["count_key"] + self.anchor_key = pdef.get("anchor_key") + self.cursor_id = pdef.get("cursor_id") + self.cursor_key = pdef.get("cursor_key") class ImplicitTable: # pylint: disable=too-many-instance-attributes @@ -67,7 +76,7 @@ class ImplicitTable: # pylint: disable=too-many-instance-attributes body_ctype: str body: Optional[Fields] = None cookies: Optional[Fields] = None - pag_params: Optional[Pagination] + pag_params: Optional[Pagination] = None # Response related ctype: str @@ -81,6 +90,7 @@ def __init__(self, name: str, config: Dict[str, Any]) -> None: ) # This will throw errors if validate failed self.name = name self.config = config + request_def = config["request"] self.method = request_def["method"] @@ -99,36 +109,12 @@ def __init__(self, name: str, config: Dict[str, Any]) -> None: self.authorization = Authorization(auth_type=auth_type, params=auth_params) if "pagination" in request_def: - self.pag_params = Pagination() - self.pag_params.type = request_def["pagination"]["type"] - self.pag_params.max_count = request_def["pagination"]["max_count"] - self.pag_params.count_key = ( - request_def["pagination"]["count_key"] - if "count_key" in request_def["pagination"] - else "" - ) - self.pag_params.anchor_key = ( - request_def["pagination"]["anchor_key"] - if "anchor_key" in request_def["pagination"] - else "" - ) - self.pag_params.cursor_id = ( - request_def["pagination"]["cursor_id"] - if "cursor_id" in request_def["pagination"] - else "" - ) - self.pag_params.cursor_key = ( - request_def["pagination"]["cursor_key"] - if "cursor_key" in request_def["pagination"] - else "" - ) - else: - self.pag_params = Pagination() - self.pag_params.type = "null" + self.pag_params = Pagination(request_def["pagination"]) for key in ["headers", "params", "cookies"]: if key in request_def: setattr(self, key, Fields(request_def[key])) + if "body" in request_def: body_def = request_def["body"] self.body_ctype = body_def["ctype"] @@ -271,6 +257,7 @@ def __init__(self, config_path: Union[str, Path]) -> None: if table_config_path.suffix != ".json": # ifnote non json file continue + with open(table_config_path) as f: table_config = jload(f) diff --git a/dataprep/data_connector/schema.json b/dataprep/data_connector/schema.json index 2176e6e41..4ce149532 100644 --- a/dataprep/data_connector/schema.json +++ b/dataprep/data_connector/schema.json @@ -63,28 +63,32 @@ }, "pagination": { "$id": "#/properties/request/properties/pagination", - "type":"object", + "type": "object", "properties": { - "type": { "type": "string" }, - "max_count": { "type": "string" }, - "anchor_key":{ - "type": "string", - "optional": true + "type": { + "type": "string" + }, + "max_count": { + "type": "integer" }, - "count_key":{ + "anchor_key": { "type": "string", "optional": true }, - "cursor_id":{ + "count_key": { + "type": "string" + }, + "cursor_id": { "type": "string", "optional": true }, - "cursor_key":{ + "cursor_key": { "type": "string", "optional": true } }, "required": [ + "count_key", "type", "max_count" ], diff --git a/dataprep/tests/data_connector/test_integration.py b/dataprep/tests/data_connector/test_integration.py index 28dfc8f46..b7aff83ef 100644 --- a/dataprep/tests/data_connector/test_integration.py +++ b/dataprep/tests/data_connector/test_integration.py @@ -17,4 +17,8 @@ def test_data_connector() -> None: df = dc.query("businesses", _count=120, term="ramen", location="vancouver") - assert len(df) > 0 + assert len(df) == 120 + + df = dc.query("businesses", _count=10000, term="ramen", location="vancouver") + + assert len(df) < 1000