From 92e58de705b1679df6e6e246b470c6b82346359a Mon Sep 17 00:00:00 2001 From: Victor Duvert Date: Fri, 7 May 2021 16:30:08 +0200 Subject: [PATCH 1/6] add multi-http plugin --- docs/HTTP-transform.md | 437 +++++++++++ icons/HTTP-transform.png | Bin 0 -> 2321 bytes pom.xml | 2 +- .../source/common/BaseHttpSourceConfig.java | 8 +- .../http/transform/DynamicHttpTransform.java | 201 +++++ .../transform/DynamicHttpTransformConfig.java | 83 ++ .../transform/DynamicHttpTransformTest.java | 132 ++++ src/test/resources/user.json | 221 ++++++ widgets/HTTP-transform.json | 717 ++++++++++++++++++ 9 files changed, 1798 insertions(+), 3 deletions(-) create mode 100644 docs/HTTP-transform.md create mode 100644 icons/HTTP-transform.png create mode 100644 src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java create mode 100644 src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java create mode 100644 src/test/java/io/cdap/plugin/http/transform/DynamicHttpTransformTest.java create mode 100644 src/test/resources/user.json create mode 100644 widgets/HTTP-transform.json diff --git a/docs/HTTP-transform.md b/docs/HTTP-transform.md new file mode 100644 index 0000000..849723f --- /dev/null +++ b/docs/HTTP-transform.md @@ -0,0 +1,437 @@ +# HTTP Transform + +Description +----------- +This plugin reads data from HTTP/HTTPS page with an URL dynamically changing based on input data. +Paginated APIs are supported. For paginated APIs plugin reads available data and than waits for new pages to appear. +Data in JSON, XML, CSV, TSV, TEXT and BLOB formats is supported. + +Properties +---------- + +### General + +**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc. + +**URL:** Url to fetch to the first page. +The url must start with a protocol (e.g. http://). + +**HTTP Method:** HTTP request method. + +**Headers:** Headers to send with each HTTP request. + +**Request body:** Body to send with each HTTP request. + +### Format + +**Format:** Format of the HTTP response. This determines how the response is converted into output records. Possible values are:
+JSON - retrieves all records from the given json path +and transforms them into records according to the mapping.
+XML - retrieves all records from the given XPath +and transforms them into records according to the mapping.
+TSV - tab separated values. Columns are mapped to record fields in the order they are +listed in schema.
+CSV - comma separated values. Columns are mapped to record fields in the order they are +listed in schema.
+Text - transforms a single line of text into a single record with a string field `body` containing the result.
+BLOB - transforms the entire response into a single record with a byte array field `body` containing the result. + +**JSON/XML Result Path:** Path to the results. When the format is XML, this is an XPath. When the format is JSON, this is a JSON path. + +JSON path example: +``` +{ + "errors": [], + "response": { + "books": [ + { + "id": "1159142", + "title": "Agile Web Development with Rails", + "author": "Sam Ruby, Dave Thomas, David Heinemeier Hansson", + "printInfo": { + "page": 488, + "coverType": "hard", + "publisher": "Pragmatic Bookshelf" + } + }, + { + "id": "2375753", + "title": "Flask Web Development", + "author": "Miguel Grinberg", + "printInfo": { + "page": 543, + "coverType": "hard", + "publisher": "O'Reilly Media, Inc" + } + }, + { + "id": "547307", + "title": "Alex Homer, ASP.NET 2.0 Visual Web Developer 2005", + "author": "David Sussman", + "printInfo": { + "page": 543, + "coverType": "hard", + "publisher": "unknown" + } + } + ] + } +} + ``` +Json path to fetch books is `/response/books`. However if we need to fetch only `printInfo` we can specify +`/response/books/printInfo` as well. + +XPath example: +``` + + + + + Everyday Italian + Giada De Laurentiis + 2005 + + 15.0 + Discount up to 50% + + + + XQuery Kick Start + James McGovern + Per Bothner + 2003 + + 49.99 + No discount + + + ... + + + ... + + +``` + +XPath to fetch all books is `/bookstores/bookstore/book`. However a more precise selections can be done. E.g. +`/bookstores/bookstore/book[@category='web']`. + +**JSON/XML Fields Mapping:** Mapping of fields in a record to fields in retrieved element. The left column contains the +name of schema field. The right column contains path to it within a relative to an element. It can be either XPath or +JSON path. + +Example response: +``` +{ + "startAt":1, + "maxResults":5, + "total":15599, + "issues":[ + { + "id":"20276", + "key":"NETTY-14", + "fields":{ + "issuetype":{ + "name":"Bug", + "subtask":false + }, + "fixVersions":[ + "4.1.37" + ], + "description":"Test description for NETTY-14", + "project":{ + "id":"10301", + "key":"NETTY", + "name":"Netty-HTTP", + "projectCategory":{ + "id":"10002", + "name":"Infrastructure" + } + } + } + }, + { + "id":"19124", + "key":"NETTY-13", + "fields":{ + "issuetype":{ + "self":"https://issues.cask.co/rest/api/2/issuetype/4", + "name":"Improvement", + "subtask":false + }, + "fixVersions":[ + + ], + "description":"Test description for NETTY-13", + "project":{ + "id":"10301", + "key":"NETTY", + "name":"Netty-HTTP", + "projectCategory":{ + "id":"10002", + "name":"Infrastructure" + } + } + } + } + ] +} +``` + +Assume the result path is `/issues`. + +The mapping is: + +| Field Name | Field Path | +| --------------- |:-----------------------------------------:| +| type | /fields/issuetype/name | +| description | /fields/description | +| projectCategory | /fields/project/projectCategory/name | +| isSubtask | /fields/issuetype/subtask | +| fixVersions | /fields/fixVersions | + +The result records are: + +| key | type | isSubtask | description | projectCategory | fixVersions | +| -------- | ----------- | --------- | ----------------------------- | ---------------- | ----------- | +| NETTY-14 | Bug | false | Test description for NETTY-14 | Infrastructure | ["4.1.37"] | +| NETTY-13 | Improvement | false | Test description for NETTY-13 | Infrastructure | [] | + +Note, that field `key` was mapped without being included into the mapping. Mapping entries like `key: /key` +can be omitted as long as the field is present in schema. +
+ +**CSV Skip First Row:** Whether to skip the first row of the HTTP response. This is usually set if the first row is a header row. + +### Basic Authentication + +**Username:** Username for basic authentication. + +**Password:** Password for basic authentication. + +### HTTP Proxy + +**Proxy URL:** Proxy URL. Must contain a protocol, address and port. + +**Username:** Proxy username. + +**Password:** Proxy password. + +### Error Handling + +**HTTP Errors Handling:** Defines the error handling strategy to use for certain HTTP response codes. +The left column contains a regular expression for HTTP status code. The right column contains an action which +is done in case of match. If HTTP status code matches multiple regular expressions, the first specified in mapping +is matched. + +Example: + +| HTTP Code Regexp | Error Handling | +| ----------------- |:-----------------------:| +| 2.. | Success | +| 401 | Retry and fail | +| 4.. | Fail | +| 5.. | Retry and skip | +| .* | Fail | + +Note: pagination types "Link in response header", "Link in response body", "Token in response body" do not support +"Skip", "Retry and skip" options. + +**Non-HTTP Error Handling:** Error handling strategy to use when the HTTP response cannot be transformed to an output record. +Possible values are:
+Stop on error - Fails pipeline due to erroneous record.
+Send to error - Sends erroneous record's text to error port and continues.
+Skip on error - Ignores erroneous records. + +**Retry Policy:** Policy used to calculate delay between retries. + +**Linear Retry Interval:** Interval between retries. Is only used if retry policy is "linear". + +**Max Retry Duration:** Maximum time in seconds retries can take. + +**Connect Timeout:** Maximum time in seconds connection initialization is allowed to take. + +**Read Timeout:** Maximum time in seconds fetching data from the server is allowed to take. + +### Pagination + +**Pagination Type:** Strategy used to determine how to get next page. + +**Wait Time Between Pages:** Time in milliseconds to wait between HTTP requests for the next page. +

+ +##### Pagination type: None +Only single page is loaded. +
+##### Pagination type: Link in response header +In response there is a "Link" header, which contains an url marked as "next". Example:
+``` +; rel="first", +; rel="next", +; rel="last"` +``` +
+ +##### Pagination type: Link in response body +Every page contains a next page url. This pagination type is only supported for JSON and XML formats. +Pagination happens until no next page field is present or until page contains no elements. + +**Next Page JSON/XML Field Path:** A JSON path or an XPath to a field which contains next page url. +It can be either relative or absolute url. + +Example page response: +``` +{ + "results": [ + ... + ] + "_links": { + "self": "https://confluence.atlassian.com/rest/api/space/ADMINJIRASERVER0710/content/page", + "next": "/rest/api/space/ADMINJIRASERVER0710/content/page?limit=100&start=100", + "base": "https://confluence.atlassian.com", + "context": "" + } +} +``` +Next page field path is `_links/next`. +
+##### Pagination type: Token in response body +Every page contains a token, which is appended as an url parameter to obtain next page. +This type of pagination is only supported for JSON and XML formats. Pagination happens until no next page +token is present on the page or until page contains no elements. + +**Next Page Token Path:** A JSON path or an XPath to a field which contains next page token. + +**Next Page Url Parameter:** A parameter which is appended to url in order to specify next page token. + +Example plugin config: +``` +{ + "url": "https://www.googleapis.com/youtube/v3/search?part=snippet&maxResults=20&q=cask+cdap", + "resultPath": "/items" + "paginationType": "Token in response body", + "nextPageTokenPath": "/nextPageToken", + "nextPageUrlParameter": "pageToken" +} +``` + +First page response: +``` +{ + "nextPageToken": "CAEQAA", + "pageInfo": { + "totalResults": 208, + "resultsPerPage": 2 + }, + "items": [ + ... + ] +} +``` +Next page fetched by plugin will be url with `&pageToken=CAEQAA` appended. +
+##### Pagination type: Increment an index +Pagination by incrementing a {pagination.index} placeholder value in url. For this pagination type url is required +to contain above placeholder. + +**Start Index:** Start value of {pagination.index} placeholder + +**Max Index:** Maximum value of {pagination.index} placeholder. If empty, pagination will happen until the page with +no elements. + +**Index Increment:** A value which the {pagination.index} placeholder is incremented by. Increment can be negative. +
+##### Pagination type: Custom +Pagination using user provided code. The code decides how to retrieve a next page url based on previous page contents +and headers and when to finish pagination. + +**Custom Pagination Python Code:** A code which implements retrieving +a next page url based on previous page contents and headers. + +Example code: +``` +import json + +def get_next_page_url(url, page, headers): + """ + Based on previous page data generates next page url, when "Custom pagination" is enabled. + + Args: + url (string): previous page url + page (string): a body of previous page + headers (dict): a dictionary of headers from previous page + + """ + page_json = json.loads(page) + next_page_num = page_json['nextpage'] + + # stop the iteration + if next_page_num == None or next_page_num > 5: + return None + + return "https://searchcode.com/api/codesearch_I/?q=curl&p={}".format(next_page_num) +``` +The above code iterates over first five pages of searchcode.com results. When 'None' is returned the iteration +is stopped. + +### OAuth2 + +**OAuth2 Enabled:** If true, plugin will perform OAuth2 authentication. + +**Auth URL:** Endpoint for the authorization server used to retrieve the authorization code. + +**Token URL:** Endpoint for the resource server, which exchanges the authorization code for an access token. + +**Client ID:** Client identifier obtained during the Application registration process. + +**Client Secret:** Client secret obtained during the Application registration process. + +**Scopes:** Scope of the access request, which might have multiple space-separated values. + +**Refresh Token:** Token used to receive accessToken, which is end product of OAuth2. + +### Hawk Authentication + +**HAWK Authentication Enabled:** If true, plugin will perform HAWK authentication. + +**HAWK Auth ID:** HAWK Authentication ID + +**Hawk Auth Key:** HAWK Authentication Key + +**Algorithm:** Hash Algorithm used + +**ext:** Any application-specific information to be sent with the request. Ex: some-app-extra-data + +**app:** This provides binding between the credentials and the application in a way that prevents an attacker from ticking an application to use credentials issued to someone else. + +**dlg:** The application id of the application the credentials were directly issued to. + +**Include Payload Hash:** HAWK authentication provides optional support for payload validation. If this option is selected, the payload hash will be calculated and included in MAC calculation and in Authorization header + +### SSL/TLS + +**Verify HTTPS Trust Certificates:** If false, untrusted trust certificates (e.g. self signed), will not lead to an +error. Do not disable this in production environment on a network you do not entirely trust. Especially public internet. + +**Keystore File:** A path to a file which contains keystore. + +**Keystore Type:** Format of a keystore. + +**Keystore Password:** Password for a keystore. If a keystore is not password protected leave it empty. + +**Keystore Key Algorithm:** An algorithm used for keystore. + +**TrustStore File:** A path to a file which contains truststore. + +**TrustStore Type:** Format of a truststore. + +**TrustStore Password:** Password for a truststore. If a truststore is not password protected leave it empty. + +**TrustStore Key Algorithm:** An algorithm used for truststore. + +**Transport Protocols:** Transport protocols which are allowed for connection. + +**Cipher Suites:** Cipher suites which are allowed for connection. +Colons, commas or spaces are also acceptable separators. + +**Schema:** Output schema. Is required to be set. diff --git a/icons/HTTP-transform.png b/icons/HTTP-transform.png new file mode 100644 index 0000000000000000000000000000000000000000..933bf62be860484ae760288bfc532777bae46dd1 GIT binary patch literal 2321 zcmZ`)dpOf=AHVmrX@?e3(o|HMbh2KgFeDzSHcRDfTX`R}o~Y2g#C}tFgr-NmF=0~E zDqHH2a+sBp4vHup96GY*ti*=x{dxa??!)JMU*GGx@B8{**Z1@JWH9JnL_7%(03dpA z_wWS(g&a`;tB-_`V^_nGpnqVemj}?zPElQ1E|S3=*&Y}J0Ks^6qk!G?w$NQu|zai zT8sv!p~W~rsp;N>rqsDuI-4?CMx>=2zGj&nY|iE1Or#RW0#@C+ z09$*9`s%*_eH-Cri(TiJhF zd`EX%S?$i21$j-l^*KDH)Eh>cL%5gVz1Zf`Rb|Fl1hkar<%sXFj8{!vx}`muwEJC7 zgp&F4bN|ZHd6Jp`Sg$>?AinR?mtGF*=NJ$XF% zq1h_Ptb%XBiYknP4JUck&S>Ma`B^%mgRl~u~3%p1m2Z#KQK%`gT(w!Q3pScdx8ZQk^GrH_~f z==%ofdj&irThEe@}h zU)sQ1HFFLB=%;R%q%Q_P?B7hh2!>VuzWbx7#y+~jc{i;!F7g@FBD3W1iZkkXp-w+o~ol6{5s!_wG)QKcqUP&pUq3zi9Zst;tXux*(Lhf z+{RtfL6;rXNPp2472hizKZn($G%ivt|4)a9j4(~I5vWtN22SNt8CsEL?Z$`IMe(YxLMu>Z)3WnEij4=S>t-26%OoI-%0(EJL=Xii z>5@6!ICxR0yZJv&?CAq0>qzb5H6T2D8$Lbixe8&f&<^E+g*scD zHqPm`LMa$z6ZMlb-~{z#>!NoIe>1|7GKO>V|PbuU})78AKYZ5G2h* zlxSht8EX*G-arPTjs8?61p^40iKkYD?jWFP`tc`3teRif_uC<9^&3QZ8m+#3@5k?5)fmOP1WlD1UX{Z!b+g+iP==efMUBf|Ro- zv%HzNBxxBW586)dD)#vCICJ;m=S~xU;IEe5($ApYi`#}rQF!xlRiH*kh7Ps7n4V9- zZh)=XvSnAV#l;$2wmdL+;C)bGdW7Cm#8E;%S{1vU z4`1RSEuC6$q)cv2T?I_Ad|v-b^qWn*9p;bDYP}U3%=fC|`&&IfD37qdef@qQXG1QW z5F`>GBtf&DR(243wK$b?NxWPXWpX`VmMJm!NOKX}V8^z2.8.5 2.3.0 4.5.9 - 2.4.0-SNAPSHOT + 2.6.0 2.9.9 4.11 2.7.1 diff --git a/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java b/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java index a554dd6..9d85c47 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java +++ b/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java @@ -670,9 +670,13 @@ public List getTransportProtocolsList() { return getListFromString(transportProtocols); } - public void validate() { + public void validate(){ + validate(true); + } + + public void validate(boolean validateURL) { // Validate URL - if (!containsMacro(PROPERTY_URL)) { + if (validateURL && !containsMacro(PROPERTY_URL)) { try { // replace with placeholder with anything just during pagination new URI(getUrl().replaceAll(PAGINATION_INDEX_PLACEHOLDER_REGEX, "0")); diff --git a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java new file mode 100644 index 0000000..7b98f1a --- /dev/null +++ b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java @@ -0,0 +1,201 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.http.transform; + +import com.google.common.util.concurrent.RateLimiter; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.etl.api.*; +import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; +import io.cdap.plugin.http.source.common.RetryPolicy; +import io.cdap.plugin.http.source.common.error.ErrorHandling; +import io.cdap.plugin.http.source.common.error.HttpErrorHandler; +import io.cdap.plugin.http.source.common.error.RetryableErrorHandling; +import io.cdap.plugin.http.source.common.http.HttpClient; +import io.cdap.plugin.http.source.common.http.HttpResponse; +import io.cdap.plugin.http.source.common.pagination.page.BasePage; +import io.cdap.plugin.http.source.common.pagination.page.PageEntry; +import io.cdap.plugin.http.source.common.pagination.page.PageFactory; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; +import org.awaitility.pollinterval.FixedPollInterval; +import org.awaitility.pollinterval.IterativePollInterval; +import org.awaitility.pollinterval.PollInterval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Plugin returns records from HTTP source specified by link for each input record. Pagination via APIs is supported. + */ +@Plugin(type = Transform.PLUGIN_TYPE) +@Name(DynamicHttpTransform.NAME) +@Description("Read data from HTTP endpoint that changes dynamically depending on inputs data.") +public class DynamicHttpTransform extends Transform { + private static final Logger LOG = LoggerFactory.getLogger(DynamicHttpTransform.class); + static final String NAME = "HTTP"; + + private final DynamicHttpTransformConfig config; + private RateLimiter rateLimiter; + private final HttpClient httpClient; + private HttpResponse httpResponse; + private final PollInterval pollInterval; + private final HttpErrorHandler httpErrorHandler; + private String url; + private Integer httpStatusCode; + + private String prebuiltParameters; + + /** + * Constructor used by Data Fusion + * + * @param config the plugin configuration + */ + public DynamicHttpTransform(DynamicHttpTransformConfig config) { + this(config, new HttpClient(config)); + } + + /** + * Constructor used in unit tests + * + * @param config the plugin configuration + * @param httpClient the http client + */ + public DynamicHttpTransform(DynamicHttpTransformConfig config, HttpClient httpClient) { + this.config = config; + if(config.throttlingEnabled()) { + this.rateLimiter = RateLimiter.create(this.config.maxCallPerSeconds); + } + this.httpClient = httpClient; + this.httpErrorHandler = new HttpErrorHandler(config); + + if (config.getRetryPolicy().equals(RetryPolicy.LINEAR)) { + pollInterval = FixedPollInterval.fixed(config.getLinearRetryInterval(), TimeUnit.SECONDS); + } else { + pollInterval = IterativePollInterval.iterative(duration -> duration.multiply(2)); + } + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + config.validate(pipelineConfigurer.getStageConfigurer().getInputSchema()); // validate when macros not yet substituted + config.validateSchema(); + + pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema()); + } + + @Override + public void initialize(TransformContext context) throws Exception { + StringBuilder parametersBuilder = new StringBuilder(); + + Map queryParameters = config.getQueryParametersMap(); + if (queryParameters.size() > 0) { + parametersBuilder.append("?"); + for (Map.Entry e : queryParameters.entrySet()) { + parametersBuilder.append(e.getKey() + "=" + e.getValue() + "&"); + } + } + this.prebuiltParameters = parametersBuilder.toString(); // Yes there is a '&' at the end of tURL but the url is still valid ;) + + super.initialize(context); + } + + @Override + public void transform(StructuredRecord input, Emitter emitter) { + // Replace placeholders in URL + String url = config.getUrl(); + for (Map.Entry e : config.getUrlVariablesMap().entrySet()) { + String valueToUse = input.get(e.getValue()); + if (valueToUse != null) { + String placeholder = "{" + e.getKey() + "}"; + if (!url.contains(placeholder)) { + LOG.warn("Placeholder " + placeholder + " not found in url "+url); + } else { + url = url.replace(placeholder, valueToUse); + } + } else { + emitter.emitError(new InvalidEntry<>( + -1, "Cannot find required field " + e.getValue(), input)); + } + } + + this.url = url + prebuiltParameters; + + if(config.throttlingEnabled()) { + rateLimiter.acquire(); // Throttle + } + + long delay = (httpResponse == null || config.getWaitTimeBetweenPages() == null) ? 0L : config.getWaitTimeBetweenPages(); + + try { + Awaitility + .await().with() + .pollInterval(pollInterval) + .pollDelay(delay, TimeUnit.MILLISECONDS) + .timeout(config.getMaxRetryDuration(), TimeUnit.SECONDS) + .until(this::sendGet); // httpResponse is setup here + } catch (ConditionTimeoutException ex) { + // Retries failed. We don't need to do anything here. This will be handled using httpStatusCode below. + } + + ErrorHandling postRetryStrategy = httpErrorHandler.getErrorHandlingStrategy(httpStatusCode) + .getAfterRetryStrategy(); + + switch (postRetryStrategy) { + case STOP: + throw new IllegalStateException(String.format("Fetching from url '%s' returned status code '%d' and body '%s'", + url, httpStatusCode, httpResponse.getBody())); + default: + break; + } + + try { + BasePage basePage = createPageInstance(config, httpResponse, postRetryStrategy); + + PageEntry pageEntry = basePage.next(); + + if (!pageEntry.isError()) { + emitter.emit(pageEntry.getRecord()); + } else { + emitter.emitError(pageEntry.getError()); + } + }catch (IOException e){ + emitter.emitError(new InvalidEntry<>( + -1, "Exception parsing HTTP Response : "+e.getMessage(), input)); + } + } + + BasePage createPageInstance(BaseHttpSourceConfig config, HttpResponse httpResponse, + ErrorHandling postRetryStrategy) throws IOException { + return PageFactory.createInstance(config, httpResponse, httpErrorHandler, + !postRetryStrategy.equals(ErrorHandling.SUCCESS)); + } + + private boolean sendGet() throws IOException { + CloseableHttpResponse response = httpClient.executeHTTP(this.url); + this.httpResponse = new HttpResponse(response); + httpStatusCode = httpResponse.getStatusCode(); + + RetryableErrorHandling errorHandlingStrategy = httpErrorHandler.getErrorHandlingStrategy(httpStatusCode); + return !errorHandlingStrategy.shouldRetry(); + } +} diff --git a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java new file mode 100644 index 0000000..a4e079f --- /dev/null +++ b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java @@ -0,0 +1,83 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.http.transform; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Macro; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; + +import javax.annotation.Nullable; +import java.util.Map; + +/** + * Provides all the configurations required for configuring the plugin. + */ +public class DynamicHttpTransformConfig extends BaseHttpSourceConfig { + public static final String PROPERTY_URL_VARIABLES = "urlVariables"; + public static final String PROPERTY_QUERY_PARAMETERS = "queryParameters"; + public static final String PROPERTY_MAX_CALL_PER_SECONDS = "maxCallPerSeconds"; + + @Name(PROPERTY_URL_VARIABLES) + @Nullable + @Description("Variables used to dynamically construct the URL.") + @Macro + protected String urlVariables; + + @Nullable + @Name(PROPERTY_QUERY_PARAMETERS) + @Description("Query parameters") + @Macro + protected String queryParameters; + + @Nullable + @Name(PROPERTY_MAX_CALL_PER_SECONDS) + @Description("The maximum number of call made per seconds. 0 = throttling disabled") + @Macro + protected int maxCallPerSeconds; + + protected DynamicHttpTransformConfig(String referenceName) { + super(referenceName); + } + + public void validate(Schema inputSchema) { + super.validate(false); + + // Check that the needed fields exists in input schema + Map urlVariableMap = getUrlVariablesMap(); + for(String value: urlVariableMap.values()){ + if (inputSchema.getField(value) == null) { + throw new IllegalArgumentException("Field " + value + " is required in input data schema but wasn't found. Current input schema is : " + inputSchema); + } + } + + } + + public boolean throttlingEnabled(){ + return maxCallPerSeconds>0; + } + + public Map getQueryParametersMap() { + return getMapFromKeyValueString(queryParameters); + } + + + public Map getUrlVariablesMap() { + return getMapFromKeyValueString(urlVariables); + } + +} diff --git a/src/test/java/io/cdap/plugin/http/transform/DynamicHttpTransformTest.java b/src/test/java/io/cdap/plugin/http/transform/DynamicHttpTransformTest.java new file mode 100644 index 0000000..193c640 --- /dev/null +++ b/src/test/java/io/cdap/plugin/http/transform/DynamicHttpTransformTest.java @@ -0,0 +1,132 @@ +package io.cdap.plugin.http.transform; + +import com.google.common.base.Joiner; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.api.Transform; +import io.cdap.cdap.etl.mock.common.MockEmitter; +import io.cdap.plugin.http.source.common.http.HttpClient; +import org.apache.http.HttpEntity; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DynamicHttpTransformTest { + private void tmp_test(){ + } + + // The input schema + private static final Schema INPUT_SCHEMA = Schema.recordOf("input", + Schema.Field.of("firstName", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("lastName", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("mail", Schema.of(Schema.Type.STRING)), + Schema.Field.of("_id", Schema.of(Schema.Type.STRING)) + ); + + private static final String OUTPUT_SCHEMA = Schema.recordOf("input", + Schema.Field.of("firstName", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("lastName", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("mail", Schema.of(Schema.Type.STRING)), + Schema.Field.of("_id", Schema.of(Schema.Type.STRING)), + Schema.Field.of("assignedPrograms", Schema.nullableOf(Schema.of(Schema.Type.INT))), + Schema.Field.of("averageScore", Schema.nullableOf(Schema.of(Schema.Type.FLOAT))), + Schema.Field.of("paths", Schema.nullableOf(Schema.arrayOf(Schema.of(Schema.Type.STRING)))) + ).toString(); + + private StructuredRecord generateData(String idValue) { + StructuredRecord record = StructuredRecord.builder(INPUT_SCHEMA) + .set("firstName", "toto") + .set("mail", "toto.tata@tutu.com") + .set("_id", idValue) + .build(); + return record; + } + + static class BaseTestConfigHttp extends DynamicHttpTransformConfig { + BaseTestConfigHttp(String referenceName, String url, String queryParameters, String urlVariables, int maxCallPerSeconds) { + super(referenceName); + + this.schema = OUTPUT_SCHEMA; + this.url = url; + this.queryParameters = queryParameters; + this.urlVariables = urlVariables; + this.maxCallPerSeconds = maxCallPerSeconds; + this.httpMethod = "GET"; + this.oauth2Enabled = "false"; + this.httpErrorsHandling = "2..:Success,.*:Fail"; + this.errorHandling = "stopOnError"; + this.retryPolicy = "linear"; + this.maxRetryDuration = 10L; + this.linearRetryInterval = 1L; + this.waitTimeBetweenPages = 0L; + this.connectTimeout = 60; + this.readTimeout = 120; + this.format = "text"; + this.keystoreType = "Java KeyStore (JKS)"; + this.trustStoreType = "Java KeyStore (JKS)"; + this.transportProtocols = "TLSv1.2"; + this.format = "json"; + } + } + + @Test + public void testHttpDynamicTransformNominal() throws Exception { + List outputRecords = testHttpDynamicTransform("user.json"); + + Assert.assertTrue(outputRecords.size() == 1); + StructuredRecord outputRecord = outputRecords.get(0); + Assert.assertEquals("toto", outputRecord.get("firstName")); + Assert.assertEquals("tata", outputRecord.get("lastName")); + Assert.assertEquals("toto.tata@tutu.com", outputRecord.get("mail")); + Assert.assertEquals("the_id_value", outputRecord.get("_id")); + Assert.assertEquals(Arrays.asList("RATkiller 🐭", "Bienvenue 🙋", "Pro amiante"), outputRecord.get("paths")); + } + + public List testHttpDynamicTransform(String filepath)throws Exception { + HttpClient mockHttpClient = Mockito.mock(HttpClient.class); + CloseableHttpResponse mockHttpResponse = Mockito.mock(CloseableHttpResponse.class); + HttpEntity mockEntity = Mockito.mock(HttpEntity.class); + StatusLine statusLine = Mockito.mock(StatusLine.class); + + String baseURL = "myfakeurl.com/{id}"; + Map urlParameters = new HashMap<>(); + urlParameters.put("company", "xx"); + urlParameters.put("apiKey", "XX"); + Map urlVariables = new HashMap<>(); + urlVariables.put("id", "_id"); + String idValue = "the_id_value"; + String targetURL = "myfakeurl.com/the_id_value?apiKey=XX&company=xx&"; + Mockito.when(mockHttpClient.executeHTTP(targetURL/*Mockito.any()*/)).thenReturn(mockHttpResponse); + Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockEntity); + Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(statusLine); + Mockito.when(statusLine.getStatusCode()).thenReturn(200); + Mockito.when(mockEntity.getContent()).thenReturn(getClass().getClassLoader().getResourceAsStream(filepath)); + + BaseTestConfigHttp config = new BaseTestConfigHttp( + "HttpDynamicTransform-transform", + baseURL, + Joiner.on(",").withKeyValueSeparator(":").join(urlParameters), + Joiner.on(",").withKeyValueSeparator(":").join(urlVariables), + 10); + + Transform transform + = new DynamicHttpTransform(config, mockHttpClient); + + transform.initialize(null); + + MockEmitter emitter = new MockEmitter<>(); + + transform.transform(generateData(idValue), emitter); + transform.destroy(); + + return emitter.getEmitted(); + } + +} diff --git a/src/test/resources/user.json b/src/test/resources/user.json new file mode 100644 index 0000000..277669a --- /dev/null +++ b/src/test/resources/user.json @@ -0,0 +1,221 @@ +{ + "_id": "the_id_value", + "mail": "toto.tata@tutu.com", + "firstName": "toto", + "lastName": "tata", + "assignedPrograms": 11, + "averageScore": 75.2, + "paths": [ + "RATkiller 🐭", + "Bienvenue 🙋", + "Pro amiante" + ], + "certifications": [ + { + "_id": "someId1", + "name": "La circulation au travail", + "date": "2021-02-24T14:44:08.000Z" + }, + { + "_id": "someId2", + "name": "Évaluation d'accueil sécurité métier du déchet", + "date": "2020-10-20T08:18:55.000Z" + }, + { + "_id": "someId3", + "name": "Acciline + Module gestion des évènements", + "date": "2020-10-15T16:41:24.000Z" + }, + { + "_id": "someId4", + "name": "Connaissances approfondies AMIANTE", + "date": "2020-09-29T08:39:03.000Z" + }, + { + "_id": "someId15", + "name": "Pass Compliance - Anti corruption", + "date": "2020-09-28T08:39:46.000Z" + }, + { + "_id": "someId6", + "name": "Mes réflexes santé", + "date": "2020-05-11T20:26:24.000Z" + }, + { + "_id": "someId7", + "name": "Mes réflexes santé", + "date": "2020-05-02T09:25:23.000Z" + } + ], + "championAchievements": { + "coursesPublished": [ + "Champion", + "Guide", + "Modèle" + ], + "authoringToolExplored": [ + "Apprenti Champion" + ], + "repliesOnOwnActivitiesGiven": [ + "Pédagogue", + "Instituteur" + ], + "positiveReactionsOnOwnContent": [ + "Reconnu(e)", + "Renommé(e)", + "Acclamé(e)", + "Illustre" + ] + }, + "comments": 93, + "completedPrograms": 25, + "groups": [ + { + "_id": "someId1", + "name": "Learning@Veolia", + "public": false + }, + { + "_id": "someId2", + "name": "VEOLIA MIB", + "public": false, + "parent": "someId1" + }, + { + "_id": "someId3", + "name": "SARP - SESAME", + "public": false, + "parent": "someId2" + }, + { + "_id": "someId4", + "name": "VEOLIA Energie France", + "public": false, + "parent": "someId2" + }, + { + "_id": "someId5", + "name": "Grands Comptes", + "public": false, + "parent": "someId3" + } + ], + "imageUrl": "https://veolialearning.360learning.com/api/medias/user/the_id_value", + "labels": [ + "[BU] SARP", + "SARP SIEGE", + "SARP SA", + "[ZONE] MIB", + "Manager" + ], + "lastLoginAt": "2021-04-10T19:43:18.977Z", + "championStatus": "champion", + "learnerAchievements": { + "reactionsGiven": [ + "Membre de la communauté", + "Supporter" + ], + "repliesGiven": [ + "Main tendue", + "Contributeur(trice)", + "Pilier" + ], + "coursesPlayed": [ + "Curieux(se)", + "Assidu(e)" + ] + }, + "managers": [], + "messages": 95, + "publications": 0, + "reactions": 104, + "skills": [ + { + "_id": "5e53c15ab7004c4dfbc90fe6", + "name": "Conformité", + "assessmentScore": "", + "averageScore": 96, + "eLearningScore": 96 + }, + { + "_id": "5e53c16c0cc7e84db3c1c93c", + "name": "Anti corruption", + "assessmentScore": "", + "averageScore": 100, + "eLearningScore": 100 + }, + { + "_id": "59e8527ec6daad12ee0a860d", + "name": "Santé sécurité", + "assessmentScore": "", + "averageScore": 100, + "eLearningScore": 100 + }, + { + "_id": "5b3f4a973cb0111d4c2e02f8", + "name": "Standard sécurité", + "assessmentScore": "", + "averageScore": 100, + "eLearningScore": 100 + }, + { + "_id": "5b3f4a973cb0111d4c2e02d7", + "name": "Prévention des risques", + "assessmentScore": "", + "averageScore": 100, + "eLearningScore": 100 + }, + { + "_id": "5e71f3c6dd2b1734b380e040", + "name": "Suivi & reporting", + "assessmentScore": "", + "averageScore": 100, + "eLearningScore": 100 + }, + { + "_id": "5b3f4a963cb0111d4c2e026d", + "name": "Analyse de risques", + "assessmentScore": "", + "averageScore": 100, + "eLearningScore": 100 + }, + { + "_id": "5e71f35ef55a8837e4803d86", + "name": "Langage Preventeo", + "assessmentScore": "", + "averageScore": 100, + "eLearningScore": 100 + }, + { + "_id": "5e71f36b1602f434c12da815", + "name": "Fonctions Preventeo", + "assessmentScore": "", + "averageScore": 100, + "eLearningScore": 100 + }, + { + "_id": "5e71f377f55a8837e4803d88", + "name": "Veille réglementaire", + "assessmentScore": "", + "averageScore": 100, + "eLearningScore": 100 + }, + { + "_id": "59e85261c6daad12ee0a860a", + "name": "Management & Leadership", + "assessmentScore": "", + "averageScore": 100, + "eLearningScore": 100 + }, + { + "_id": "5d238c1fe451727e3a270a83", + "name": "Veolia", + "assessmentScore": "", + "averageScore": 100, + "eLearningScore": 100 + } + ], + "subordinates": [], + "toDeactivateAt": "", + "totalTimeSpentInMinutes": 914 +} \ No newline at end of file diff --git a/widgets/HTTP-transform.json b/widgets/HTTP-transform.json new file mode 100644 index 0000000..09b2a5f --- /dev/null +++ b/widgets/HTTP-transform.json @@ -0,0 +1,717 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "configuration-groups": [ + { + "label": "General", + "properties": [ + { + "widget-type": "textbox", + "label": "Reference Name", + "name": "referenceName" + }, + { + "widget-type": "textbox", + "label": "URL", + "name": "url" + }, + { + "widget-type": "keyvalue", + "label": "Query Parameters", + "name": "queryParameters", + "widget-attributes": { + "showDelimiter": "false" + } + }, + { + "widget-type": "keyvalue", + "label": "URL Variables", + "name": "urlVariables", + "widget-attributes": { + "showDelimiter": "false" + } + }, + { + "widget-type": "select", + "label": "HTTP Method", + "name": "httpMethod", + "widget-attributes": { + "values": [ + "GET", + "POST", + "PUT", + "DELETE", + "HEAD" + ], + "default": "GET" + } + }, + { + "widget-type": "keyvalue", + "label": "Headers", + "name": "headers", + "widget-attributes": { + "showDelimiter": "false" + } + }, + { + "widget-type": "textarea", + "name": "requestBody", + "label": "Request Body", + "widget-attributes": { + "rows": "5" + } + } + ] + }, { + "label": "Throttling", + "properties": [ + { + "widget-type": "number", + "label": "Max calls per seconds", + "name": "maxCallPerSeconds", + "widget-attributes": { + "min": "0", + "default": "0" + } + } + ] + }, + { + "label": "Format", + "properties": [ + { + "widget-type": "select", + "label": "Format", + "name": "format", + "widget-attributes": { + "values": [ + "json", + "xml", + "tsv", + "csv", + "text", + "blob" + ], + "default": "json" + } + }, + { + "widget-type": "textbox", + "label": "JSON/XML Result Path", + "name": "resultPath" + }, + { + "widget-type": "keyvalue", + "label": "JSON/XML Fields Mapping", + "name": "fieldsMapping", + "widget-attributes": { + "showDelimiter": "false" + } + }, + { + "widget-type": "radio-group", + "label": "CSV Skip First Row", + "name": "csvSkipFirstRow", + "widget-attributes": { + "layout": "inline", + "default": "false", + "options": [ + { + "id": "true", + "label": "true" + }, + { + "id": "false", + "label": "false" + } + ] + } + } + ] + }, + { + "label": "OAuth2", + "properties": [ + { + "widget-type": "toggle", + "label": "OAuth2 Enabled", + "name": "oauth2Enabled", + "widget-attributes": { + "default": "false", + "on": { + "label": "True", + "value": "true" + }, + "off": { + "label": "False", + "value": "false" + } + } + }, + { + "widget-type": "textbox", + "label": "Auth URL", + "name": "authUrl" + }, + { + "widget-type": "textbox", + "label": "Token URL", + "name": "tokenUrl" + }, + { + "widget-type": "textbox", + "label": "Client ID", + "name": "clientId" + }, + { + "widget-type": "password", + "label": "Client Secret", + "name": "clientSecret" + }, + { + "widget-type": "textbox", + "label": "Scopes", + "name": "scopes" + }, + { + "widget-type": "textbox", + "label": "Refresh Token", + "name": "refreshToken" + } + ] + }, + { + "label": "Basic Authentication", + "properties": [ + { + "widget-type": "textbox", + "label": "Username", + "name": "username" + }, + { + "widget-type": "password", + "label": "Password", + "name": "password" + } + ] + }, + { + "label": "HTTP Proxy", + "properties": [ + { + "widget-type": "textbox", + "label": "Proxy URL", + "name": "proxyUrl" + }, + { + "widget-type": "textbox", + "label": "Username", + "name": "proxyUsername" + }, + { + "widget-type": "password", + "label": "Password", + "name": "proxyPassword" + } + ] + }, + { + "label": "Error Handling", + "properties": [ + { + "widget-type": "keyvalue-dropdown", + "label": "HTTP Errors Handling", + "name": "httpErrorsHandling", + "widget-attributes": { + "default": "2..:Success,.*:Fail", + "showDelimiter": "false", + "dropdownOptions": [ + "Success", + "Fail", + "Skip", + "Send to error", + "Retry and fail", + "Retry and skip", + "Retry and send to error" + ], + "key-placeholder": "HTTP Status Code Regex" + } + }, + { + "widget-type": "radio-group", + "label": "Non-HTTP Error Handling", + "name": "errorHandling", + "widget-attributes": { + "layout": "inline", + "default": "stopOnError", + "options": [ + { + "id": "stopOnError", + "label": "Stop on error" + }, + { + "id": "sendToError", + "label": "Send to error" + }, + { + "id": "skipOnError", + "label": "Skip on error" + } + ] + } + }, + { + "widget-type": "radio-group", + "label": "Retry Policy", + "name": "retryPolicy", + "widget-attributes": { + "layout": "inline", + "default": "exponential", + "options": [ + { + "id": "exponential", + "label": "Exponential" + }, + { + "id": "linear", + "label": "Linear" + } + ] + } + }, + { + "widget-type": "number", + "label": "Linear Retry Interval", + "name": "linearRetryInterval", + "widget-attributes": { + "min": "0", + "default": "30" + } + }, + { + "widget-type": "number", + "label": "Max Retry Duration", + "name": "maxRetryDuration", + "widget-attributes": { + "min": "0", + "default": "600" + } + }, + { + "widget-type": "number", + "label": "Connect Timeout", + "name": "connectTimeout", + "widget-attributes": { + "min": "0", + "default": "120" + } + }, + { + "widget-type": "number", + "label": "Read Timeout", + "name": "readTimeout", + "widget-attributes": { + "min": "0", + "default": "120" + } + } + ] + }, + { + "label": "Pagination", + "properties": [ + { + "widget-type": "select", + "label": "Pagination Type", + "name": "paginationType", + "widget-attributes": { + "values": [ + "None", + "Link in response header", + "Link in response body", + "Token in response body", + "Increment an index", + "Custom" + ], + "default": "None" + } + }, + { + "widget-type": "textbox", + "label": "Start Index", + "name": "startIndex", + "widget-attributes": { + "placeholder": "For pagination type \"Increment an index\"" + } + }, + { + "widget-type": "textbox", + "label": "Max Index", + "name": "maxIndex", + "widget-attributes": { + "placeholder": "For pagination type \"Increment an index\"" + } + }, + { + "widget-type": "textbox", + "label": "Index Increment", + "name": "indexIncrement", + "widget-attributes": { + "placeholder": "For pagination type \"Increment an index\"" + } + }, + { + "widget-type": "textbox", + "label": "Next Page JSON/XML Field Path", + "name": "nextPageFieldPath", + "widget-attributes": { + "placeholder": "For pagination type \"Link in response body\"" + } + }, + { + "widget-type": "textbox", + "label": "Next Page Token Path", + "name": "nextPageTokenPath", + "widget-attributes": { + "placeholder": "For pagination type \"Token in response body\"" + } + }, + { + "widget-type": "textbox", + "label": "Next Page Url Parameter", + "name": "nextPageUrlParameter", + "widget-attributes": { + "placeholder": "For pagination type \"Token in response body\"" + } + }, + { + "widget-type": "python-editor", + "label": "Custom Pagination Python Code", + "name": "customPaginationCode", + "widget-attributes": { + "placeholder": "def get_next_page_url(url, page, headers):\n \"\"\"\n Based on previous page data generates next page url, when \"Custom pagination\" is enabled.\n Returns 'None' if no more pages to load \n\n Args:\n url (string): previous page url\n page (string): a body of previous page\n headers (dict): a dictionary of headers from previous page\n\n \"\"\"\n return \"https://next-page-url.com\"" + } + }, + { + "widget-type": "number", + "label": "Wait Time Between Pages (milliseconds)", + "name": "waitTimeBetweenPages", + "widget-attributes": { + "min": "0", + "default": "0" + } + } + ] + }, + { + "label": "SSL/TLS", + "properties": [ + { + "widget-type": "toggle", + "label": "Verify HTTPS Trust Certificates", + "name": "verifyHttps", + "widget-attributes": { + "default": "true", + "on": { + "label": "True", + "value": "true" + }, + "off": { + "label": "False", + "value": "false" + } + } + }, + { + "widget-type": "textbox", + "label": "Keystore File", + "name": "keystoreFile" + }, + { + "widget-type": "select", + "label": "Keystore Type", + "name": "keystoreType", + "widget-attributes": { + "default": "Java KeyStore (JKS)", + "values": [ + "Java KeyStore (JKS)", + "Java Cryptography Extension KeyStore (JCEKS)", + "PKCS #12" + ] + } + }, + { + "widget-type": "password", + "label": "Keystore Password", + "name": "keystorePassword" + }, + { + "widget-type": "textbox", + "label": "Keystore Key Algorithm", + "name": "keystoreKeyAlgorithm", + "widget-attributes": { + "default": "SunX509" + } + }, + { + "widget-type": "textbox", + "label": "TrustStore File", + "name": "trustStoreFile" + }, + { + "widget-type": "select", + "label": "TrustStore Type", + "name": "trustStoreType", + "widget-attributes": { + "default": "Java KeyStore (JKS)", + "values": [ + "Java KeyStore (JKS)", + "Java Cryptography Extension KeyStore (JCEKS)", + "PKCS #12" + ] + } + }, + { + "widget-type": "password", + "label": "TrustStore Password", + "name": "trustStorePassword" + }, + { + "widget-type": "textbox", + "label": "TrustStore Key Algorithm", + "name": "trustStoreKeyAlgorithm", + "widget-attributes": { + "default": "SunX509" + } + }, + { + "widget-type": "csv", + "label": "Transport Protocols", + "name": "transportProtocols", + "widget-attributes": { + "default": "TLSv1.2" + } + }, + { + "widget-type": "textbox", + "label": "Cipher Suites", + "name": "cipherSuites" + } + ] + } + ], + "emit-errors": true, + "outputs": [ + { + "name": "schema", + "label": "schema", + "widget-type": "schema", + "widget-attributes": { + "schema-types": [ + "boolean", + "int", + "long", + "float", + "double", + "bytes", + "string", + "array", + "record", + "map", + "union" + ], + "schema-default-type": "string", + "property-watch": "format" + } + } + ], + "filters": [ + { + "name": "Proxy authentication", + "condition": { + "property": "proxyUrl", + "operator": "exists" + }, + "show": [ + { + "name": "proxyUsername", + "type": "property" + }, + { + "name": "proxyPassword", + "type": "property" + } + ] + }, + { + "name": "Increment an index", + "condition": { + "property": "paginationType", + "operator": "equal to", + "value": "Increment an index" + }, + "show": [ + { + "name": "startIndex", + "type": "property" + }, + { + "name": "maxIndex", + "type": "property" + }, + { + "name": "indexIncrement", + "type": "property" + } + ] + }, + { + "name": "Token in Response Body", + "condition": { + "property": "paginationType", + "operator": "equal to", + "value": "Token in response body" + }, + "show": [ + { + "name": "nextPageTokenPath", + "type": "property" + }, + { + "name": "nextPageUrlParameter", + "type": "property" + } + ] + }, + { + "name": "Link in response body", + "condition": { + "property": "paginationType", + "operator": "equal to", + "value": "Link in response body" + }, + "show": [ + { + "name": "nextPageFieldPath", + "type": "property" + } + ] + }, + { + "name": "Custom pagination", + "condition": { + "property": "paginationType", + "operator": "equal to", + "value": "Custom" + }, + "show": [ + { + "name": "customPaginationCode", + "type": "property" + } + ] + }, + { + "name": "Pagination none", + "condition": { + "property": "paginationType", + "operator": "equal to", + "value": "None" + }, + "show": [ + { + "name": "waitTimeBetweenPages", + "type": "property" + } + ] + }, + { + "name": "OAuth 2 disabled", + "condition": { + "property": "oauth2Enabled", + "operator": "equal to", + "value": "false" + }, + "show": [ + { + "name": "username", + "type": "property" + }, + { + "name": "password", + "type": "property" + } + ] + }, + { + "name": "OAuth 2 enabled", + "condition": { + "property": "oauth2Enabled", + "operator": "equal to", + "value": "true" + }, + "show": [ + { + "name": "oauth2Enabled", + "type": "property" + }, + { + "name": "authUrl", + "type": "property" + }, + { + "name": "tokenUrl", + "type": "property" + }, + { + "name": "clientId", + "type": "property" + }, + { + "name": "clientSecret", + "type": "property" + }, + { + "name": "scopes", + "type": "property" + }, + { + "name": "refreshToken", + "type": "property" + } + ] + }, + { + "name": "JSON/XML Formatting", + "condition": { + "expression": "format == 'json' || format == 'xml'" + }, + "show": [ + { + "name": "resultPath", + "type": "property" + }, + { + "name": "fieldsMapping", + "type": "property" + } + ] + }, + { + "name": "CSV Formatting", + "condition": { + "property": "format", + "operator": "equal to", + "value": "csv" + }, + "show": [ + { + "name": "csvSkipFirstRow", + "type": "property" + } + ] + } + ] +} \ No newline at end of file From c5092c6c858b3e02833fcff36f3ac8c5bdd55ef6 Mon Sep 17 00:00:00 2001 From: Victor Duvert Date: Tue, 24 Aug 2021 12:29:09 +0200 Subject: [PATCH 2/6] fix --- .../plugin/http/transform/DynamicHttpTransform.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java index 7b98f1a..67441fd 100644 --- a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java +++ b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java @@ -171,12 +171,14 @@ public void transform(StructuredRecord input, Emitter emitter) try { BasePage basePage = createPageInstance(config, httpResponse, postRetryStrategy); - PageEntry pageEntry = basePage.next(); + while (basePage.hasNext()) { + PageEntry pageEntry = basePage.next(); - if (!pageEntry.isError()) { - emitter.emit(pageEntry.getRecord()); - } else { - emitter.emitError(pageEntry.getError()); + if (!pageEntry.isError()) { + emitter.emit(pageEntry.getRecord()); + } else { + emitter.emitError(pageEntry.getError()); + } } }catch (IOException e){ emitter.emitError(new InvalidEntry<>( From 7534cb4695eb2a72de249fb5f474d239bd0086f5 Mon Sep 17 00:00:00 2001 From: Victor Duvert Date: Tue, 24 Aug 2021 13:28:28 +0200 Subject: [PATCH 3/6] add possibility to reuse fields from the input record --- docs/HTTP-transform.md | 24 +--- .../source/common/BaseHttpSourceConfig.java | 23 ++-- .../http/transform/DynamicHttpTransform.java | 107 +++++++++++++--- .../transform/DynamicHttpTransformConfig.java | 116 ++++++++++++++++-- .../io/cdap/plugin/http/transform/Util.java | 40 ++++++ .../transform/DynamicHttpTransformTest.java | 101 ++++++++++++++- widgets/HTTP-streamingsource.json | 2 +- widgets/HTTP-transform.json | 88 +++++++------ 8 files changed, 402 insertions(+), 99 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/http/transform/Util.java diff --git a/docs/HTTP-transform.md b/docs/HTTP-transform.md index 849723f..cd02c40 100644 --- a/docs/HTTP-transform.md +++ b/docs/HTTP-transform.md @@ -2,7 +2,7 @@ Description ----------- -This plugin reads data from HTTP/HTTPS page with an URL dynamically changing based on input data. +This plugin reads data from HTTP/HTTPS page with an URL dynamically changing based on input data. Paginated APIs are supported. For paginated APIs plugin reads available data and than waits for new pages to appear. Data in JSON, XML, CSV, TSV, TEXT and BLOB formats is supported. @@ -22,6 +22,10 @@ The url must start with a protocol (e.g. http://). **Request body:** Body to send with each HTTP request. +**Reused Input Fields:** List of fields to retrieve from the input record. The output record will be a concatenation of fields from the response of the HTTP query and the fields from the input record specified here. + +**Rename Reused Input Fields:** Mapping to rename fields reused from the input record. It is not mandatory to rename a reused field but it can be usefull in case the input record and the response of the HTTP query have fields with the same name. + ### Format **Format:** Format of the HTTP response. This determines how the response is converted into output records. Possible values are:
@@ -390,24 +394,6 @@ is stopped. **Refresh Token:** Token used to receive accessToken, which is end product of OAuth2. -### Hawk Authentication - -**HAWK Authentication Enabled:** If true, plugin will perform HAWK authentication. - -**HAWK Auth ID:** HAWK Authentication ID - -**Hawk Auth Key:** HAWK Authentication Key - -**Algorithm:** Hash Algorithm used - -**ext:** Any application-specific information to be sent with the request. Ex: some-app-extra-data - -**app:** This provides binding between the credentials and the application in a way that prevents an attacker from ticking an application to use credentials issued to someone else. - -**dlg:** The application id of the application the credentials were directly issued to. - -**Include Payload Hash:** HAWK authentication provides optional support for payload validation. If this option is selected, the payload hash will be calculated and included in MAC calculation and in Authorization header - ### SSL/TLS **Verify HTTPS Trust Certificates:** If false, untrusted trust certificates (e.g. self signed), will not lead to an diff --git a/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java b/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java index 9d85c47..289f786 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java +++ b/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java @@ -670,11 +670,15 @@ public List getTransportProtocolsList() { return getListFromString(transportProtocols); } - public void validate(){ - validate(true); + public void validate() { + validate(true, true); } public void validate(boolean validateURL) { + validate(validateURL, true); + } + + public void validate(boolean validateURL, boolean validateErrorHandling) { // Validate URL if (validateURL && !containsMacro(PROPERTY_URL)) { try { @@ -687,7 +691,7 @@ public void validate(boolean validateURL) { } // Validate HTTP Error Handling Map - if (!containsMacro(PROPERTY_HTTP_ERROR_HANDLING)) { + if (validateErrorHandling && !containsMacro(PROPERTY_HTTP_ERROR_HANDLING)) { List httpErrorsHandlingEntries = getHttpErrorHandlingEntries(); boolean supportsSkippingPages = PaginationIteratorFactory .createInstance(this, null).supportsSkippingPages(); @@ -778,9 +782,7 @@ PAGINATION_INDEX_PLACEHOLDER, getPaginationType()), String reasonFormat = String.format("page format is '%s'", getFormat()); if (getFormat().equals(PageFormat.JSON) || getFormat().equals(PageFormat.XML)) { - if (!getFormat().equals(PageFormat.JSON)) { - assertIsSet(getResultPath(), PROPERTY_RESULT_PATH, reasonFormat); - } + assertIsSet(getResultPath(), PROPERTY_RESULT_PATH, reasonFormat); getFullFieldsMapping(); // can be null, but call getter to verify correctness of regexps } else { assertIsNotSet(getResultPath(), PROPERTY_RESULT_PATH, reasonFormat); @@ -865,15 +867,20 @@ public static List getListFromString(String value) { } public static Map getMapFromKeyValueString(String keyValueString) { + return getMapFromKeyValueString(keyValueString, ",", ":"); + } + + public static Map getMapFromKeyValueString(String keyValueString, String delimiter, + String kvDelimiter) { Map result = new LinkedHashMap<>(); if (Strings.isNullOrEmpty(keyValueString)) { return result; } - String[] mappings = keyValueString.split(","); + String[] mappings = keyValueString.split(delimiter); for (String map : mappings) { - String[] columns = map.split(":"); + String[] columns = map.split(kvDelimiter); result.put(columns[0], columns[1]); } return result; diff --git a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java index 67441fd..e64929f 100644 --- a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java +++ b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java @@ -20,7 +20,12 @@ import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.format.StructuredRecord; -import io.cdap.cdap.etl.api.*; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.api.Emitter; +import io.cdap.cdap.etl.api.InvalidEntry; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.Transform; +import io.cdap.cdap.etl.api.TransformContext; import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; import io.cdap.plugin.http.source.common.RetryPolicy; import io.cdap.plugin.http.source.common.error.ErrorHandling; @@ -31,6 +36,7 @@ import io.cdap.plugin.http.source.common.pagination.page.BasePage; import io.cdap.plugin.http.source.common.pagination.page.PageEntry; import io.cdap.plugin.http.source.common.pagination.page.PageFactory; +import org.apache.http.NoHttpResponseException; import org.apache.http.client.methods.CloseableHttpResponse; import org.awaitility.Awaitility; import org.awaitility.core.ConditionTimeoutException; @@ -41,11 +47,13 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** - * Plugin returns records from HTTP source specified by link for each input record. Pagination via APIs is supported. + * Plugin returns records from HTTP source specified by link. Pagination via APIs is supported. */ @Plugin(type = Transform.PLUGIN_TYPE) @Name(DynamicHttpTransform.NAME) @@ -65,6 +73,21 @@ public class DynamicHttpTransform extends Transform reusedInputs; + + private Map reusedInputsNameMap; + + // This is a reverse map compared to config.getReusedInputsNameMap() + // reversedReusedInputsNameMap uses the name of the field in the output schema as Key and + // the name of the field in the input schema as Value. + // In config.getReusedInputsNameMap(), the Key is the name of the field in the input schema, + // and the Value is the name of the field in the output schema (which is more intuitive for the user) + // Since the uniqueness of values is checked in configuration validation, + // this should not pause any problem and will be more efficient. + private Map reversedReusedInputsNameMap; + + private Schema outputSchema; + /** * Constructor used by Data Fusion * @@ -82,7 +105,7 @@ public DynamicHttpTransform(DynamicHttpTransformConfig config) { */ public DynamicHttpTransform(DynamicHttpTransformConfig config, HttpClient httpClient) { this.config = config; - if(config.throttlingEnabled()) { + if (config.throttlingEnabled()) { this.rateLimiter = RateLimiter.create(this.config.maxCallPerSeconds); } this.httpClient = httpClient; @@ -93,14 +116,23 @@ public DynamicHttpTransform(DynamicHttpTransformConfig config, HttpClient httpCl } else { pollInterval = IterativePollInterval.iterative(duration -> duration.multiply(2)); } + + this.reusedInputs = config.getReusedInputs(); + this.reusedInputsNameMap = config.getReusedInputsNameMap(); + this.reversedReusedInputsNameMap = new HashMap<>(); + for (Map.Entry e: reusedInputsNameMap.entrySet()) { + this.reversedReusedInputsNameMap.put(e.getValue(), e.getKey()); + } + this.outputSchema = config.getOutputSchema(); } @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { - config.validate(pipelineConfigurer.getStageConfigurer().getInputSchema()); // validate when macros not yet substituted + config.validate(pipelineConfigurer.getStageConfigurer().getInputSchema()); + // validate when macros not yet substituted config.validateSchema(); - pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema()); + pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getOutputSchema()); } @Override @@ -114,7 +146,8 @@ public void initialize(TransformContext context) throws Exception { parametersBuilder.append(e.getKey() + "=" + e.getValue() + "&"); } } - this.prebuiltParameters = parametersBuilder.toString(); // Yes there is a '&' at the end of tURL but the url is still valid ;) + this.prebuiltParameters = parametersBuilder.toString(); + // Yes there is a '&' at the end of tURL but the url is still valid ;) super.initialize(context); } @@ -128,7 +161,7 @@ public void transform(StructuredRecord input, Emitter emitter) if (valueToUse != null) { String placeholder = "{" + e.getKey() + "}"; if (!url.contains(placeholder)) { - LOG.warn("Placeholder " + placeholder + " not found in url "+url); + LOG.warn("Placeholder " + placeholder + " not found in url " + url); } else { url = url.replace(placeholder, valueToUse); } @@ -140,11 +173,13 @@ public void transform(StructuredRecord input, Emitter emitter) this.url = url + prebuiltParameters; - if(config.throttlingEnabled()) { + if (config.throttlingEnabled()) { rateLimiter.acquire(); // Throttle } - long delay = (httpResponse == null || config.getWaitTimeBetweenPages() == null) ? 0L : config.getWaitTimeBetweenPages(); + long delay = (httpResponse == null || config.getWaitTimeBetweenPages() == null) ? + 0L : + config.getWaitTimeBetweenPages(); try { Awaitility @@ -162,7 +197,8 @@ public void transform(StructuredRecord input, Emitter emitter) switch (postRetryStrategy) { case STOP: - throw new IllegalStateException(String.format("Fetching from url '%s' returned status code '%d' and body '%s'", + throw new IllegalStateException(String.format( + "Fetching from url '%s' returned status code '%d' and body '%s'", url, httpStatusCode, httpResponse.getBody())); default: break; @@ -175,14 +211,29 @@ public void transform(StructuredRecord input, Emitter emitter) PageEntry pageEntry = basePage.next(); if (!pageEntry.isError()) { - emitter.emit(pageEntry.getRecord()); + StructuredRecord retrievedDataRecord = pageEntry.getRecord(); + StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema); + + for (Schema.Field f : outputSchema.getFields()) { + String fieldName = f.getName(); + Object fieldValue; + + if (Util.isReusedField(f.getName(), reusedInputs, reusedInputsNameMap)) { + fieldValue = getReusedFieldValue(input, fieldName); + } else { + fieldValue = retrievedDataRecord.get(fieldName); + } + builder.set(fieldName, fieldValue); + } + + emitter.emit(builder.build()); } else { emitter.emitError(pageEntry.getError()); } } - }catch (IOException e){ + } catch (IOException e) { emitter.emitError(new InvalidEntry<>( - -1, "Exception parsing HTTP Response : "+e.getMessage(), input)); + -1, "Exception parsing HTTP Response : " + e.getMessage(), input)); } } @@ -192,12 +243,36 @@ BasePage createPageInstance(BaseHttpSourceConfig config, HttpResponse httpRespon !postRetryStrategy.equals(ErrorHandling.SUCCESS)); } + // TODO : Handle in a better way the case where server is not available private boolean sendGet() throws IOException { - CloseableHttpResponse response = httpClient.executeHTTP(this.url); - this.httpResponse = new HttpResponse(response); - httpStatusCode = httpResponse.getStatusCode(); + + try { + CloseableHttpResponse response = httpClient.executeHTTP(this.url); + this.httpResponse = new HttpResponse(response); + httpStatusCode = httpResponse.getStatusCode(); + } catch (NoHttpResponseException e) { + httpStatusCode = 443; + } RetryableErrorHandling errorHandlingStrategy = httpErrorHandler.getErrorHandlingStrategy(httpStatusCode); return !errorHandlingStrategy.shouldRetry(); } + + /** + * Retrieve the given field in the given record handling the case the field is reused. + * If the field name have been mapped, retrieve the original field name from the mapping and + * retrieve the field value associated to this name from record, + * Else, just retrieve the field value associated to the given fieldName + * @param inputRecord the record + * @param fieldName the field + * @return the field value + */ + public Object getReusedFieldValue(StructuredRecord inputRecord, String fieldName) { + if (reversedReusedInputsNameMap.containsKey(fieldName)) { + String fieldNameInInput = reversedReusedInputsNameMap.get(fieldName); + return inputRecord.get(fieldNameInInput); + } else { + return inputRecord.get(fieldName); + } + } } diff --git a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java index a4e079f..3e24ef6 100644 --- a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java +++ b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java @@ -15,14 +15,19 @@ */ package io.cdap.plugin.http.transform; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; -import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import javax.annotation.Nullable; /** * Provides all the configurations required for configuring the plugin. @@ -30,20 +35,42 @@ public class DynamicHttpTransformConfig extends BaseHttpSourceConfig { public static final String PROPERTY_URL_VARIABLES = "urlVariables"; public static final String PROPERTY_QUERY_PARAMETERS = "queryParameters"; + public static final String PROPERTY_REUSED_INPUTS = "reusedInputs"; + public static final String PROPERTY_RENAME_REUSED_INPUTS = "renameReusedInputs"; public static final String PROPERTY_MAX_CALL_PER_SECONDS = "maxCallPerSeconds"; @Name(PROPERTY_URL_VARIABLES) @Nullable - @Description("Variables used to dynamically construct the URL.") + @Description("Variables used to dynamically construct the URL through placeholder. " + + "Use the placeholder name as Key, and the field from input schema you want to use as Value.") @Macro protected String urlVariables; @Nullable @Name(PROPERTY_QUERY_PARAMETERS) - @Description("Query parameters") + @Description("Query parameters that will be append to the URL. For the URL http://my/url.com/, it will give : " + + "http://my/url.com/?[QueryParametersKey1]=[QueryParametersValue1]&" + + "[QueryParametersKey2]=[QueryParametersValue2] etc...") @Macro protected String queryParameters; + @Nullable + @Name(PROPERTY_REUSED_INPUTS) + @Description("List of fields from inputSchema that will be added to the output schema. " + + "If left empty, the output record will contains only the result of the HTTP " + + "query and the input record will be lost." + + "If a field reused from inputSchema has the same name as a field in the output schema, " + + "use the \"Rename Reused Input Fields\" to rename the field.") + @Macro + protected String reusedInputs; + + @Nullable + @Name(PROPERTY_RENAME_REUSED_INPUTS) + @Description("Rename a reused field from input schema. This should be used when a reused field " + + "from imput schema have the same name as a field from output schema.") + @Macro + protected String renameReusedInputs; + @Nullable @Name(PROPERTY_MAX_CALL_PER_SECONDS) @Description("The maximum number of call made per seconds. 0 = throttling disabled") @@ -55,29 +82,96 @@ protected DynamicHttpTransformConfig(String referenceName) { } public void validate(Schema inputSchema) { - super.validate(false); + super.validate(false, false); // Check that the needed fields exists in input schema - Map urlVariableMap = getUrlVariablesMap(); - for(String value: urlVariableMap.values()){ + Map urlVariableMap = getUrlVariablesMap(); + for (String value: urlVariableMap.values()) { if (inputSchema.getField(value) == null) { - throw new IllegalArgumentException("Field " + value + " is required in input data schema but wasn't found. Current input schema is : " + inputSchema); + throw new IllegalArgumentException("Field " + value + " is required in input data schema " + + "but wasn't found. Current input schema is : " + inputSchema); } } + Map reusedInputsNameMap = getReusedInputsNameMap(); + HashSet inputFields = new HashSet<>(); + HashSet outputFields = new HashSet<>(); + for (Map.Entry e: reusedInputsNameMap.entrySet()) { + if (inputSchema.getField(e.getKey()) == null) { + throw new IllegalArgumentException("Input field " + e.getKey() + " is configured to be renamed " + + "but is not present in the inputSchema"); + } + if (getSchema().getField(e.getValue()) != null) { + throw new IllegalArgumentException("Input field " + e.getKey() + " is configured to be " + + "renamed as " + e.getValue() + " but this field is already present in the outputSchema"); + } + + if (inputFields.add(e.getKey()) == false) { + throw new IllegalArgumentException("Input field " + e.getKey() + + " is configured multiple times to be renamed"); + } + if (outputFields.add(e.getValue()) == false) { + throw new IllegalArgumentException("Multiple fields configured to be renamed " + e.getValue()); + } + } + + + } - public boolean throttlingEnabled(){ - return maxCallPerSeconds>0; + public boolean throttlingEnabled() { + return maxCallPerSeconds > 0; } public Map getQueryParametersMap() { - return getMapFromKeyValueString(queryParameters); + return getMapFromKeyValueString(queryParameters, ",", ":"); } public Map getUrlVariablesMap() { - return getMapFromKeyValueString(urlVariables); + return getMapFromKeyValueString(urlVariables, ",", ":"); + } + + + public Map getReusedInputsNameMap() { + return getMapFromKeyValueString(renameReusedInputs, ",", ":"); + } + + public List getReusedInputs() { + List uniqueFieldList = new ArrayList<>(); + if (!Strings.isNullOrEmpty(reusedInputs)) { + for (String field : Splitter.on(',').trimResults().split(reusedInputs)) { + uniqueFieldList.add(field); + } + } + return uniqueFieldList; } + @Override + /** + * Return the data schema. + * In case no fields are reused from the input schema, this function is equals to getOutput() + * Otherwise, this function return the data schema without the reused fields. + */ + public Schema getSchema() { + Schema schema = getOutputSchema(); + List reusedInput = getReusedInputs(); + if (reusedInput.size() > 0) { + Map reusedInputsNameMap = getReusedInputsNameMap(); + + List fields = new ArrayList<>(); + for (Schema.Field f : schema.getFields()) { + if (!Util.isReusedField(f.getName(), reusedInput, reusedInputsNameMap)) { + fields.add(f); + } + } + schema = Schema.recordOf("record", fields); + } + return schema; + } + + + public Schema getOutputSchema() { + return super.getSchema(); + } } diff --git a/src/main/java/io/cdap/plugin/http/transform/Util.java b/src/main/java/io/cdap/plugin/http/transform/Util.java new file mode 100644 index 0000000..298d43e --- /dev/null +++ b/src/main/java/io/cdap/plugin/http/transform/Util.java @@ -0,0 +1,40 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.http.transform; + +import java.util.List; +import java.util.Map; + +/** + * Utilitary class + */ +public class Util { + /** + * Return true if the field (in output schema) is a reused field (coming from the input schema), false else + * @param fieldName the name of the field + * @param reusedInputs the list of reused input fields + * @param reusedInputsMapping the mapping of reused fields + * @return is reused + */ + public static boolean isReusedField( + String fieldName, + List reusedInputs, + Map reusedInputsMapping) { + return reusedInputsMapping.containsValue(fieldName) || + (reusedInputs.contains(fieldName) && !reusedInputsMapping.containsKey(fieldName)); + } +} diff --git a/src/test/java/io/cdap/plugin/http/transform/DynamicHttpTransformTest.java b/src/test/java/io/cdap/plugin/http/transform/DynamicHttpTransformTest.java index 193c640..59a3cbb 100644 --- a/src/test/java/io/cdap/plugin/http/transform/DynamicHttpTransformTest.java +++ b/src/test/java/io/cdap/plugin/http/transform/DynamicHttpTransformTest.java @@ -1,3 +1,19 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package io.cdap.plugin.http.transform; import com.google.common.base.Joiner; @@ -19,9 +35,6 @@ import java.util.Map; public class DynamicHttpTransformTest { - private void tmp_test(){ - } - // The input schema private static final Schema INPUT_SCHEMA = Schema.recordOf("input", Schema.Field.of("firstName", Schema.nullableOf(Schema.of(Schema.Type.STRING))), @@ -40,6 +53,18 @@ private void tmp_test(){ Schema.Field.of("paths", Schema.nullableOf(Schema.arrayOf(Schema.of(Schema.Type.STRING)))) ).toString(); + private static final String OUTPUT_SCHEMA_WITH_REUSED = Schema.recordOf("input", + Schema.Field.of("firstName", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("lastName", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("mail", Schema.of(Schema.Type.STRING)), + Schema.Field.of("renamed_mail", Schema.of(Schema.Type.STRING)), + Schema.Field.of("renamed_id", Schema.of(Schema.Type.STRING)), + Schema.Field.of("_id", Schema.of(Schema.Type.STRING)), + Schema.Field.of("assignedPrograms", Schema.nullableOf(Schema.of(Schema.Type.INT))), + Schema.Field.of("averageScore", Schema.nullableOf(Schema.of(Schema.Type.FLOAT))), + Schema.Field.of("paths", Schema.nullableOf(Schema.arrayOf(Schema.of(Schema.Type.STRING)))) + ).toString(); + private StructuredRecord generateData(String idValue) { StructuredRecord record = StructuredRecord.builder(INPUT_SCHEMA) .set("firstName", "toto") @@ -50,13 +75,16 @@ private StructuredRecord generateData(String idValue) { } static class BaseTestConfigHttp extends DynamicHttpTransformConfig { - BaseTestConfigHttp(String referenceName, String url, String queryParameters, String urlVariables, int maxCallPerSeconds) { + BaseTestConfigHttp(String outputSchema, String referenceName, String url, String queryParameters, + String urlVariables, int maxCallPerSeconds, String reusedInputs, String renameReusedInputs) { super(referenceName); - this.schema = OUTPUT_SCHEMA; + this.schema = outputSchema; this.url = url; this.queryParameters = queryParameters; this.urlVariables = urlVariables; + this.reusedInputs = reusedInputs; + this.renameReusedInputs = renameReusedInputs; this.maxCallPerSeconds = maxCallPerSeconds; this.httpMethod = "GET"; this.oauth2Enabled = "false"; @@ -89,6 +117,17 @@ public void testHttpDynamicTransformNominal() throws Exception { Assert.assertEquals(Arrays.asList("RATkiller 🐭", "Bienvenue 🙋", "Pro amiante"), outputRecord.get("paths")); } + + @Test + public void testHttpDynamicTransformPartial() throws Exception { + List outputRecords = testHttpDynamicTransform("user_partial.json"); + + Assert.assertTrue(outputRecords.size() == 1); + StructuredRecord outputRecord = outputRecords.get(0); + Assert.assertEquals("toto.tata@tutu.com", outputRecord.get("mail")); + Assert.assertEquals("the_id_value", outputRecord.get("_id")); + } + public List testHttpDynamicTransform(String filepath)throws Exception { HttpClient mockHttpClient = Mockito.mock(HttpClient.class); CloseableHttpResponse mockHttpResponse = Mockito.mock(CloseableHttpResponse.class); @@ -110,11 +149,14 @@ public List testHttpDynamicTransform(String filepath)throws Ex Mockito.when(mockEntity.getContent()).thenReturn(getClass().getClassLoader().getResourceAsStream(filepath)); BaseTestConfigHttp config = new BaseTestConfigHttp( + OUTPUT_SCHEMA, "HttpDynamicTransform-transform", baseURL, Joiner.on(",").withKeyValueSeparator(":").join(urlParameters), Joiner.on(",").withKeyValueSeparator(":").join(urlVariables), - 10); + 10, + "", + ""); Transform transform = new DynamicHttpTransform(config, mockHttpClient); @@ -129,4 +171,51 @@ public List testHttpDynamicTransform(String filepath)throws Ex return emitter.getEmitted(); } + @Test + public void testReuseInputs()throws Exception { + String filepath = "user.json"; + HttpClient mockHttpClient = Mockito.mock(HttpClient.class); + CloseableHttpResponse mockHttpResponse = Mockito.mock(CloseableHttpResponse.class); + HttpEntity mockEntity = Mockito.mock(HttpEntity.class); + StatusLine statusLine = Mockito.mock(StatusLine.class); + + Mockito.when(mockHttpClient.executeHTTP(Mockito.any())).thenReturn(mockHttpResponse); + Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockEntity); + Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(statusLine); + Mockito.when(statusLine.getStatusCode()).thenReturn(200); + Mockito.when(mockEntity.getContent()).thenReturn(getClass().getClassLoader().getResourceAsStream(filepath)); + + Map renamingMapping = new HashMap<>(); + renamingMapping.put("_id", "renamed_id"); + renamingMapping.put("mail", "renamed_mail"); + + BaseTestConfigHttp config = new BaseTestConfigHttp( + OUTPUT_SCHEMA_WITH_REUSED, + "HttpDynamicTransform-transform", + "", + "", + "", + 10, + "_id,mail", + Joiner.on(",").withKeyValueSeparator(":").join(renamingMapping)); + + Transform transform + = new DynamicHttpTransform(config, mockHttpClient); + + transform.initialize(null); + + MockEmitter emitter = new MockEmitter<>(); + + transform.transform(generateData("id_value"), emitter); + transform.destroy(); + + List outputRecords = emitter.getEmitted(); + + Assert.assertTrue(outputRecords.size() == 1); + StructuredRecord outputRecord = outputRecords.get(0); + Assert.assertEquals("toto.tata@tutu.com", outputRecord.get("mail")); + Assert.assertEquals("id_value", outputRecord.get("renamed_id")); + Assert.assertEquals(Arrays.asList("RATkiller 🐭", "Bienvenue 🙋", "Pro amiante"), outputRecord.get("paths")); + } + } diff --git a/widgets/HTTP-streamingsource.json b/widgets/HTTP-streamingsource.json index e7abdb6..417bad6 100644 --- a/widgets/HTTP-streamingsource.json +++ b/widgets/HTTP-streamingsource.json @@ -503,7 +503,7 @@ "name": "Proxy authentication", "condition": { "property": "proxyUrl", - "operator": "exists", + "operator": "exists" }, "show": [ { diff --git a/widgets/HTTP-transform.json b/widgets/HTTP-transform.json index 09b2a5f..f88a1c7 100644 --- a/widgets/HTTP-transform.json +++ b/widgets/HTTP-transform.json @@ -2,6 +2,7 @@ "metadata": { "spec-version": "1.5" }, + "display-name": "Multi HTTP", "configuration-groups": [ { "label": "General", @@ -32,6 +33,22 @@ "showDelimiter": "false" } }, + { + "widget-type": "input-field-selector", + "label": "Reused Input Fields", + "name": "reusedInputs", + "widget-attributes": { + "multiselect": "true" + } + }, + { + "widget-type": "keyvalue", + "label": "Rename Reused Input Fields", + "name": "renameReusedInputs", + "widget-attributes": { + "showDelimiter": "false" + } + }, { "widget-type": "select", "label": "HTTP Method", @@ -628,58 +645,53 @@ ] }, { - "name": "OAuth 2 disabled", - "condition": { - "property": "oauth2Enabled", - "operator": "equal to", - "value": "false" - }, - "show": [ - { - "name": "username", - "type": "property" - }, - { - "name": "password", - "type": "property" - } - ] - }, - { - "name": "OAuth 2 enabled", - "condition": { - "property": "oauth2Enabled", - "operator": "equal to", - "value": "true" - }, - "show": [ + "label": "OAuth2", + "properties": [ { + "widget-type": "toggle", + "label": "OAuth2 Enabled", "name": "oauth2Enabled", - "type": "property" + "widget-attributes": { + "default": "false", + "on": { + "label": "True", + "value": "true" + }, + "off": { + "label": "False", + "value": "false" + } + } }, { - "name": "authUrl", - "type": "property" + "widget-type": "textbox", + "label": "Auth URL", + "name": "authUrl" }, { - "name": "tokenUrl", - "type": "property" + "widget-type": "textbox", + "label": "Token URL", + "name": "tokenUrl" }, { - "name": "clientId", - "type": "property" + "widget-type": "textbox", + "label": "Client ID", + "name": "clientId" }, { - "name": "clientSecret", - "type": "property" + "widget-type": "password", + "label": "Client Secret", + "name": "clientSecret" }, { - "name": "scopes", - "type": "property" + "widget-type": "textbox", + "label": "Scopes", + "name": "scopes" }, { - "name": "refreshToken", - "type": "property" + "widget-type": "textbox", + "label": "Refresh Token", + "name": "refreshToken" } ] }, From 39108ff60bdb273fa247ed131f51c485e847625a Mon Sep 17 00:00:00 2001 From: Victor Duvert Date: Thu, 2 Sep 2021 16:55:10 +0200 Subject: [PATCH 4/6] fix : handle pagination properly and rework pagination classes to be properly tested through unit tests --- .../http/source/batch/HttpRecordReader.java | 6 +- .../BaseHttpPaginationIterator.java | 4 +- .../pagination/CustomPaginationIterator.java | 5 +- .../IncrementAnIndexPaginationIterator.java | 6 +- .../LinkInResponseBodyPaginationIterator.java | 6 +- ...inkInResponseHeaderPaginationIterator.java | 6 +- .../pagination/NonePaginationIterator.java | 15 +++- .../pagination/PaginationIteratorFactory.java | 28 +++++-- .../pagination/TokenPaginationIterator.java | 5 +- .../transform/DynamicHttpRecordReader.java | 55 +++++++++++++ .../http/transform/DynamicHttpTransform.java | 82 +++---------------- .../transform/DynamicHttpTransformConfig.java | 16 ++++ 12 files changed, 136 insertions(+), 98 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/http/transform/DynamicHttpRecordReader.java diff --git a/src/main/java/io/cdap/plugin/http/source/batch/HttpRecordReader.java b/src/main/java/io/cdap/plugin/http/source/batch/HttpRecordReader.java index 993c20f..edba1db 100644 --- a/src/main/java/io/cdap/plugin/http/source/batch/HttpRecordReader.java +++ b/src/main/java/io/cdap/plugin/http/source/batch/HttpRecordReader.java @@ -36,9 +36,9 @@ */ public class HttpRecordReader extends RecordReader { private static final Logger LOG = LoggerFactory.getLogger(HttpRecordReader.class); - private static final Gson gson = new GsonBuilder().create(); + protected static final Gson GSON = new GsonBuilder().create(); - private BaseHttpPaginationIterator httpPaginationIterator; + protected BaseHttpPaginationIterator httpPaginationIterator; private BasePage value; /** @@ -51,7 +51,7 @@ public class HttpRecordReader extends RecordReader { public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) { Configuration conf = taskAttemptContext.getConfiguration(); String configJson = conf.get(HttpInputFormatProvider.PROPERTY_CONFIG_JSON); - HttpBatchSourceConfig httpBatchSourceConfig = gson.fromJson(configJson, HttpBatchSourceConfig.class); + HttpBatchSourceConfig httpBatchSourceConfig = GSON.fromJson(configJson, HttpBatchSourceConfig.class); httpPaginationIterator = PaginationIteratorFactory.createInstance(httpBatchSourceConfig, null); } diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/BaseHttpPaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/BaseHttpPaginationIterator.java index ece3486..8c75dd9 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/pagination/BaseHttpPaginationIterator.java +++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/BaseHttpPaginationIterator.java @@ -60,9 +60,9 @@ public abstract class BaseHttpPaginationIterator implements Iterator, private Integer httpStatusCode; private HttpResponse response; - public BaseHttpPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state) { + public BaseHttpPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state, HttpClient httpClient) { this.config = config; - this.httpClient = new HttpClient(config); + this.httpClient = httpClient; this.nextPageUrl = config.getUrl(); this.httpErrorHandler = new HttpErrorHandler(config); diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/CustomPaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/CustomPaginationIterator.java index ad1e832..18dfef5 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/pagination/CustomPaginationIterator.java +++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/CustomPaginationIterator.java @@ -16,6 +16,7 @@ package io.cdap.plugin.http.source.common.pagination; import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; +import io.cdap.plugin.http.source.common.http.HttpClient; import io.cdap.plugin.http.source.common.http.HttpResponse; import io.cdap.plugin.http.source.common.pagination.page.BasePage; import io.cdap.plugin.http.source.common.pagination.state.PaginationIteratorState; @@ -32,8 +33,8 @@ public class CustomPaginationIterator extends BaseHttpPaginationIterator { private final JythonPythonExecutor pythonExecutor; - public CustomPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state) { - super(config, state); + public CustomPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state, HttpClient httpClient) { + super(config, state, httpClient); pythonExecutor = new JythonPythonExecutor(config.getCustomPaginationCode()); pythonExecutor.initialize(); } diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/IncrementAnIndexPaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/IncrementAnIndexPaginationIterator.java index 69084fa..51c4b7e 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/pagination/IncrementAnIndexPaginationIterator.java +++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/IncrementAnIndexPaginationIterator.java @@ -16,6 +16,7 @@ package io.cdap.plugin.http.source.common.pagination; import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; +import io.cdap.plugin.http.source.common.http.HttpClient; import io.cdap.plugin.http.source.common.http.HttpResponse; import io.cdap.plugin.http.source.common.pagination.page.BasePage; import io.cdap.plugin.http.source.common.pagination.state.IndexPaginationIteratorState; @@ -36,8 +37,9 @@ public class IncrementAnIndexPaginationIterator extends BaseHttpPaginationIterat private Long index; - public IncrementAnIndexPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state) { - super(config, state); + public IncrementAnIndexPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state, + HttpClient httpClient) { + super(config, state, httpClient); this.indexIncrement = config.getIndexIncrement(); this.maxIndex = config.getMaxIndex(); diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseBodyPaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseBodyPaginationIterator.java index cdc815d..a72b570 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseBodyPaginationIterator.java +++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseBodyPaginationIterator.java @@ -16,6 +16,7 @@ package io.cdap.plugin.http.source.common.pagination; import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; +import io.cdap.plugin.http.source.common.http.HttpClient; import io.cdap.plugin.http.source.common.http.HttpResponse; import io.cdap.plugin.http.source.common.pagination.page.BasePage; import io.cdap.plugin.http.source.common.pagination.state.PaginationIteratorState; @@ -34,8 +35,9 @@ public class LinkInResponseBodyPaginationIterator extends BaseHttpPaginationIter private final String address; - public LinkInResponseBodyPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state) { - super(config, state); + public LinkInResponseBodyPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state, + HttpClient httpClient) { + super(config, state, httpClient); URI uri = URI.create(config.getUrl()); this.address = uri.getScheme() + "://" + uri.getAuthority(); } diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseHeaderPaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseHeaderPaginationIterator.java index 5e59b35..6253d8a 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseHeaderPaginationIterator.java +++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseHeaderPaginationIterator.java @@ -16,6 +16,7 @@ package io.cdap.plugin.http.source.common.pagination; import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; +import io.cdap.plugin.http.source.common.http.HttpClient; import io.cdap.plugin.http.source.common.http.HttpResponse; import io.cdap.plugin.http.source.common.pagination.page.BasePage; import io.cdap.plugin.http.source.common.pagination.state.PaginationIteratorState; @@ -34,8 +35,9 @@ public class LinkInResponseHeaderPaginationIterator extends BaseHttpPaginationIt private static final Logger LOG = LoggerFactory.getLogger(LinkInResponseHeaderPaginationIterator.class); private static final Pattern nextLinkPattern = Pattern.compile("<(.+)>; rel=next"); - public LinkInResponseHeaderPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state) { - super(config, state); + public LinkInResponseHeaderPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state, + HttpClient httpClient) { + super(config, state, httpClient); } @Override diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/NonePaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/NonePaginationIterator.java index 80f48e3..69ad979 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/pagination/NonePaginationIterator.java +++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/NonePaginationIterator.java @@ -16,6 +16,7 @@ package io.cdap.plugin.http.source.common.pagination; import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; +import io.cdap.plugin.http.source.common.http.HttpClient; import io.cdap.plugin.http.source.common.http.HttpResponse; import io.cdap.plugin.http.source.common.pagination.page.BasePage; import io.cdap.plugin.http.source.common.pagination.state.PaginationIteratorState; @@ -28,8 +29,16 @@ public class NonePaginationIterator extends BaseHttpPaginationIterator { private static final Logger LOG = LoggerFactory.getLogger(NonePaginationIterator.class); - public NonePaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state) { - super(config, state); + boolean isMultiQuery; + + public NonePaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state, HttpClient httpClient) { + this(config, state, httpClient, false); + } + + public NonePaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state, HttpClient httpClient, + boolean isMultiQuery) { + super(config, state, httpClient); + this.isMultiQuery = isMultiQuery; } @Override @@ -39,6 +48,6 @@ protected String getNextPageUrl(HttpResponse response, BasePage page) { @Override public boolean supportsSkippingPages() { - return false; + return isMultiQuery; } } diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorFactory.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorFactory.java index c2cad0d..36784f1 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorFactory.java +++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorFactory.java @@ -16,6 +16,7 @@ package io.cdap.plugin.http.source.common.pagination; import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; +import io.cdap.plugin.http.source.common.http.HttpClient; import io.cdap.plugin.http.source.common.pagination.state.PaginationIteratorState; /** @@ -23,23 +24,34 @@ * the input config. */ public class PaginationIteratorFactory { - public static BaseHttpPaginationIterator createInstance(BaseHttpSourceConfig config, PaginationIteratorState state) { + public static BaseHttpPaginationIterator createInstance(BaseHttpSourceConfig config, PaginationIteratorState state, + boolean isMultiQuery, + HttpClient httpClient) { switch (config.getPaginationType()) { case NONE: - return new NonePaginationIterator(config, state); + return new NonePaginationIterator(config, state, httpClient, isMultiQuery); case LINK_IN_RESPONSE_HEADER: - return new LinkInResponseHeaderPaginationIterator(config, state); + return new LinkInResponseHeaderPaginationIterator(config, state, httpClient); case LINK_IN_RESPONSE_BODY: - return new LinkInResponseBodyPaginationIterator(config, state); + return new LinkInResponseBodyPaginationIterator(config, state, httpClient); case TOKEN_IN_RESPONSE_BODY: - return new TokenPaginationIterator(config, state); + return new TokenPaginationIterator(config, state, httpClient); case INCREMENT_AN_INDEX: - return new IncrementAnIndexPaginationIterator(config, state); + return new IncrementAnIndexPaginationIterator(config, state, httpClient); case CUSTOM: - return new CustomPaginationIterator(config, state); + return new CustomPaginationIterator(config, state, httpClient); default: throw new IllegalArgumentException( - String.format("Unsupported pagination type: '%s'", config.getPaginationType())); + String.format("Unsupported pagination type: '%s'", config.getPaginationType())); } } + + public static BaseHttpPaginationIterator createInstance(BaseHttpSourceConfig config, PaginationIteratorState state, + HttpClient httpClient) { + return createInstance(config, state, false, httpClient); + } + + public static BaseHttpPaginationIterator createInstance(BaseHttpSourceConfig config, PaginationIteratorState state) { + return createInstance(config, state, false, new HttpClient(config)); + } } diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/TokenPaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/TokenPaginationIterator.java index d3f8804..6208ff9 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/pagination/TokenPaginationIterator.java +++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/TokenPaginationIterator.java @@ -16,6 +16,7 @@ package io.cdap.plugin.http.source.common.pagination; import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; +import io.cdap.plugin.http.source.common.http.HttpClient; import io.cdap.plugin.http.source.common.http.HttpResponse; import io.cdap.plugin.http.source.common.pagination.page.BasePage; import io.cdap.plugin.http.source.common.pagination.state.PaginationIteratorState; @@ -33,8 +34,8 @@ public class TokenPaginationIterator extends BaseHttpPaginationIterator { private static final Logger LOG = LoggerFactory.getLogger(TokenPaginationIterator.class); - public TokenPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state) { - super(config, state); + public TokenPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state, HttpClient httpClient) { + super(config, state, httpClient); } @Override diff --git a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpRecordReader.java b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpRecordReader.java new file mode 100644 index 0000000..dd28ee9 --- /dev/null +++ b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpRecordReader.java @@ -0,0 +1,55 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.http.transform; + +import io.cdap.plugin.http.source.common.http.HttpClient; +import io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator; +import io.cdap.plugin.http.source.common.pagination.PaginationIteratorFactory; +import io.cdap.plugin.http.source.common.pagination.page.BasePage; + +import java.io.IOException; + +/** + * RecordReader implementation, which reads text records representations and http codes + * using {@link BaseHttpPaginationIterator} subclasses. + */ +public class DynamicHttpRecordReader { + protected BaseHttpPaginationIterator httpPaginationIterator; + private BasePage value; + + public DynamicHttpRecordReader(DynamicHttpTransformConfig dynamicHttpTransformConfig, HttpClient httpClient) { + httpPaginationIterator = PaginationIteratorFactory.createInstance(dynamicHttpTransformConfig, null, + true, httpClient); + } + + public boolean nextKeyValue() { + if (!httpPaginationIterator.hasNext()) { + return false; + } + value = httpPaginationIterator.next(); + return true; + } + + public BasePage getCurrentValue() { + return value; + } + + public void close() throws IOException { + if (httpPaginationIterator != null) { + httpPaginationIterator.close(); + } + } +} diff --git a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java index e64929f..7d494e2 100644 --- a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java +++ b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java @@ -27,22 +27,13 @@ import io.cdap.cdap.etl.api.Transform; import io.cdap.cdap.etl.api.TransformContext; import io.cdap.plugin.http.source.common.BaseHttpSourceConfig; -import io.cdap.plugin.http.source.common.RetryPolicy; import io.cdap.plugin.http.source.common.error.ErrorHandling; import io.cdap.plugin.http.source.common.error.HttpErrorHandler; -import io.cdap.plugin.http.source.common.error.RetryableErrorHandling; import io.cdap.plugin.http.source.common.http.HttpClient; import io.cdap.plugin.http.source.common.http.HttpResponse; import io.cdap.plugin.http.source.common.pagination.page.BasePage; import io.cdap.plugin.http.source.common.pagination.page.PageEntry; import io.cdap.plugin.http.source.common.pagination.page.PageFactory; -import org.apache.http.NoHttpResponseException; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.awaitility.Awaitility; -import org.awaitility.core.ConditionTimeoutException; -import org.awaitility.pollinterval.FixedPollInterval; -import org.awaitility.pollinterval.IterativePollInterval; -import org.awaitility.pollinterval.PollInterval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +41,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; /** * Plugin returns records from HTTP source specified by link. Pagination via APIs is supported. @@ -65,11 +55,7 @@ public class DynamicHttpTransform extends Transform duration.multiply(2)); - } - this.reusedInputs = config.getReusedInputs(); this.reusedInputsNameMap = config.getReusedInputsNameMap(); this.reversedReusedInputsNameMap = new HashMap<>(); @@ -155,7 +135,7 @@ public void initialize(TransformContext context) throws Exception { @Override public void transform(StructuredRecord input, Emitter emitter) { // Replace placeholders in URL - String url = config.getUrl(); + String url = config.getBaseUrl(); for (Map.Entry e : config.getUrlVariablesMap().entrySet()) { String valueToUse = input.get(e.getValue()); if (valueToUse != null) { @@ -171,44 +151,20 @@ public void transform(StructuredRecord input, Emitter emitter) } } - this.url = url + prebuiltParameters; + String processedURL = url + prebuiltParameters; + + config.setProcessedURL(processedURL); if (config.throttlingEnabled()) { rateLimiter.acquire(); // Throttle } - long delay = (httpResponse == null || config.getWaitTimeBetweenPages() == null) ? - 0L : - config.getWaitTimeBetweenPages(); - - try { - Awaitility - .await().with() - .pollInterval(pollInterval) - .pollDelay(delay, TimeUnit.MILLISECONDS) - .timeout(config.getMaxRetryDuration(), TimeUnit.SECONDS) - .until(this::sendGet); // httpResponse is setup here - } catch (ConditionTimeoutException ex) { - // Retries failed. We don't need to do anything here. This will be handled using httpStatusCode below. - } + DynamicHttpRecordReader reader = new DynamicHttpRecordReader(config, httpClient); + while (reader.nextKeyValue()) { + BasePage page = reader.getCurrentValue(); - ErrorHandling postRetryStrategy = httpErrorHandler.getErrorHandlingStrategy(httpStatusCode) - .getAfterRetryStrategy(); - - switch (postRetryStrategy) { - case STOP: - throw new IllegalStateException(String.format( - "Fetching from url '%s' returned status code '%d' and body '%s'", - url, httpStatusCode, httpResponse.getBody())); - default: - break; - } - - try { - BasePage basePage = createPageInstance(config, httpResponse, postRetryStrategy); - - while (basePage.hasNext()) { - PageEntry pageEntry = basePage.next(); + while (page.hasNext()) { + PageEntry pageEntry = page.next(); if (!pageEntry.isError()) { StructuredRecord retrievedDataRecord = pageEntry.getRecord(); @@ -231,9 +187,6 @@ public void transform(StructuredRecord input, Emitter emitter) emitter.emitError(pageEntry.getError()); } } - } catch (IOException e) { - emitter.emitError(new InvalidEntry<>( - -1, "Exception parsing HTTP Response : " + e.getMessage(), input)); } } @@ -243,21 +196,6 @@ BasePage createPageInstance(BaseHttpSourceConfig config, HttpResponse httpRespon !postRetryStrategy.equals(ErrorHandling.SUCCESS)); } - // TODO : Handle in a better way the case where server is not available - private boolean sendGet() throws IOException { - - try { - CloseableHttpResponse response = httpClient.executeHTTP(this.url); - this.httpResponse = new HttpResponse(response); - httpStatusCode = httpResponse.getStatusCode(); - } catch (NoHttpResponseException e) { - httpStatusCode = 443; - } - - RetryableErrorHandling errorHandlingStrategy = httpErrorHandler.getErrorHandlingStrategy(httpStatusCode); - return !errorHandlingStrategy.shouldRetry(); - } - /** * Retrieve the given field in the given record handling the case the field is reused. * If the field name have been mapped, retrieve the original field name from the mapping and diff --git a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java index 3e24ef6..0b766e4 100644 --- a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java +++ b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java @@ -77,6 +77,22 @@ public class DynamicHttpTransformConfig extends BaseHttpSourceConfig { @Macro protected int maxCallPerSeconds; + + // processedURL is static so that it does not appears in plugin configuration panel + private static String processedURL; + + public String getBaseUrl() { + return url; + } + @Override + public String getUrl() { + return processedURL; + } + + public void setProcessedURL(String processedURL) { + this.processedURL = processedURL; + } + protected DynamicHttpTransformConfig(String referenceName) { super(referenceName); } From ad33400a12e4cf03f8554ca4e43a5530b274828a Mon Sep 17 00:00:00 2001 From: Victor Duvert Date: Mon, 13 Sep 2021 11:00:13 +0200 Subject: [PATCH 5/6] handle skiping of pagination in case of multiQuery (quickfix) --- .../pagination/LinkInResponseBodyPaginationIterator.java | 6 ++++-- .../source/common/pagination/NonePaginationIterator.java | 4 ---- .../common/pagination/PaginationIteratorFactory.java | 4 ++-- .../source/common/pagination/TokenPaginationIterator.java | 8 ++++++-- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseBodyPaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseBodyPaginationIterator.java index a72b570..654d3b8 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseBodyPaginationIterator.java +++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseBodyPaginationIterator.java @@ -35,11 +35,13 @@ public class LinkInResponseBodyPaginationIterator extends BaseHttpPaginationIter private final String address; + boolean isMultiQuery; public LinkInResponseBodyPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state, - HttpClient httpClient) { + HttpClient httpClient, boolean isMultiQuery) { super(config, state, httpClient); URI uri = URI.create(config.getUrl()); this.address = uri.getScheme() + "://" + uri.getAuthority(); + this.isMultiQuery = isMultiQuery; } @Override @@ -64,6 +66,6 @@ protected String getNextPageUrl(HttpResponse response, BasePage page) { @Override public boolean supportsSkippingPages() { - return false; + return isMultiQuery; } } diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/NonePaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/NonePaginationIterator.java index 69ad979..1920bd9 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/pagination/NonePaginationIterator.java +++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/NonePaginationIterator.java @@ -31,10 +31,6 @@ public class NonePaginationIterator extends BaseHttpPaginationIterator { boolean isMultiQuery; - public NonePaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state, HttpClient httpClient) { - this(config, state, httpClient, false); - } - public NonePaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state, HttpClient httpClient, boolean isMultiQuery) { super(config, state, httpClient); diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorFactory.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorFactory.java index 36784f1..e3158d6 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorFactory.java +++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorFactory.java @@ -33,9 +33,9 @@ public static BaseHttpPaginationIterator createInstance(BaseHttpSourceConfig con case LINK_IN_RESPONSE_HEADER: return new LinkInResponseHeaderPaginationIterator(config, state, httpClient); case LINK_IN_RESPONSE_BODY: - return new LinkInResponseBodyPaginationIterator(config, state, httpClient); + return new LinkInResponseBodyPaginationIterator(config, state, httpClient, isMultiQuery); case TOKEN_IN_RESPONSE_BODY: - return new TokenPaginationIterator(config, state, httpClient); + return new TokenPaginationIterator(config, state, httpClient, isMultiQuery); case INCREMENT_AN_INDEX: return new IncrementAnIndexPaginationIterator(config, state, httpClient); case CUSTOM: diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/TokenPaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/TokenPaginationIterator.java index 6208ff9..cf242a3 100644 --- a/src/main/java/io/cdap/plugin/http/source/common/pagination/TokenPaginationIterator.java +++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/TokenPaginationIterator.java @@ -34,8 +34,12 @@ public class TokenPaginationIterator extends BaseHttpPaginationIterator { private static final Logger LOG = LoggerFactory.getLogger(TokenPaginationIterator.class); - public TokenPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state, HttpClient httpClient) { + boolean isMultiQuery; + + public TokenPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state, HttpClient httpClient + , boolean isMultiQuery) { super(config, state, httpClient); + this.isMultiQuery = isMultiQuery; } @Override @@ -57,6 +61,6 @@ protected String getNextPageUrl(HttpResponse response, BasePage page) { @Override public boolean supportsSkippingPages() { - return false; + return isMultiQuery; } } From 93c1d1e86d8c1c3316e876d28829cc5a2f15ffad Mon Sep 17 00:00:00 2001 From: Victor Duvert Date: Fri, 24 Sep 2021 11:41:08 +0200 Subject: [PATCH 6/6] cleanup : remove query-parameter part to be consistent with other HTTP plugins + rearrange configuration widget --- .../http/transform/DynamicHttpTransform.java | 18 +----- .../transform/DynamicHttpTransformConfig.java | 13 ----- .../transform/DynamicHttpTransformTest.java | 12 +--- widgets/HTTP-transform.json | 56 ++++++------------- 4 files changed, 21 insertions(+), 78 deletions(-) diff --git a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java index 7d494e2..bc83a59 100644 --- a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java +++ b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java @@ -57,8 +57,6 @@ public class DynamicHttpTransform extends Transform reusedInputs; private Map reusedInputsNameMap; @@ -117,18 +115,6 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { @Override public void initialize(TransformContext context) throws Exception { - StringBuilder parametersBuilder = new StringBuilder(); - - Map queryParameters = config.getQueryParametersMap(); - if (queryParameters.size() > 0) { - parametersBuilder.append("?"); - for (Map.Entry e : queryParameters.entrySet()) { - parametersBuilder.append(e.getKey() + "=" + e.getValue() + "&"); - } - } - this.prebuiltParameters = parametersBuilder.toString(); - // Yes there is a '&' at the end of tURL but the url is still valid ;) - super.initialize(context); } @@ -151,9 +137,7 @@ public void transform(StructuredRecord input, Emitter emitter) } } - String processedURL = url + prebuiltParameters; - - config.setProcessedURL(processedURL); + config.setProcessedURL(url); if (config.throttlingEnabled()) { rateLimiter.acquire(); // Throttle diff --git a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java index 0b766e4..d3716d9 100644 --- a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java +++ b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java @@ -34,7 +34,6 @@ */ public class DynamicHttpTransformConfig extends BaseHttpSourceConfig { public static final String PROPERTY_URL_VARIABLES = "urlVariables"; - public static final String PROPERTY_QUERY_PARAMETERS = "queryParameters"; public static final String PROPERTY_REUSED_INPUTS = "reusedInputs"; public static final String PROPERTY_RENAME_REUSED_INPUTS = "renameReusedInputs"; public static final String PROPERTY_MAX_CALL_PER_SECONDS = "maxCallPerSeconds"; @@ -46,14 +45,6 @@ public class DynamicHttpTransformConfig extends BaseHttpSourceConfig { @Macro protected String urlVariables; - @Nullable - @Name(PROPERTY_QUERY_PARAMETERS) - @Description("Query parameters that will be append to the URL. For the URL http://my/url.com/, it will give : " + - "http://my/url.com/?[QueryParametersKey1]=[QueryParametersValue1]&" + - "[QueryParametersKey2]=[QueryParametersValue2] etc...") - @Macro - protected String queryParameters; - @Nullable @Name(PROPERTY_REUSED_INPUTS) @Description("List of fields from inputSchema that will be added to the output schema. " + @@ -139,10 +130,6 @@ public boolean throttlingEnabled() { return maxCallPerSeconds > 0; } - public Map getQueryParametersMap() { - return getMapFromKeyValueString(queryParameters, ",", ":"); - } - public Map getUrlVariablesMap() { return getMapFromKeyValueString(urlVariables, ",", ":"); diff --git a/src/test/java/io/cdap/plugin/http/transform/DynamicHttpTransformTest.java b/src/test/java/io/cdap/plugin/http/transform/DynamicHttpTransformTest.java index 59a3cbb..5c57769 100644 --- a/src/test/java/io/cdap/plugin/http/transform/DynamicHttpTransformTest.java +++ b/src/test/java/io/cdap/plugin/http/transform/DynamicHttpTransformTest.java @@ -75,13 +75,12 @@ private StructuredRecord generateData(String idValue) { } static class BaseTestConfigHttp extends DynamicHttpTransformConfig { - BaseTestConfigHttp(String outputSchema, String referenceName, String url, String queryParameters, + BaseTestConfigHttp(String outputSchema, String referenceName, String url, String urlVariables, int maxCallPerSeconds, String reusedInputs, String renameReusedInputs) { super(referenceName); this.schema = outputSchema; this.url = url; - this.queryParameters = queryParameters; this.urlVariables = urlVariables; this.reusedInputs = reusedInputs; this.renameReusedInputs = renameReusedInputs; @@ -134,14 +133,11 @@ public List testHttpDynamicTransform(String filepath)throws Ex HttpEntity mockEntity = Mockito.mock(HttpEntity.class); StatusLine statusLine = Mockito.mock(StatusLine.class); - String baseURL = "myfakeurl.com/{id}"; - Map urlParameters = new HashMap<>(); - urlParameters.put("company", "xx"); - urlParameters.put("apiKey", "XX"); + String baseURL = "myfakeurl.com/{id}?apiKey=XX&company=xx"; Map urlVariables = new HashMap<>(); urlVariables.put("id", "_id"); String idValue = "the_id_value"; - String targetURL = "myfakeurl.com/the_id_value?apiKey=XX&company=xx&"; + String targetURL = "myfakeurl.com/the_id_value?apiKey=XX&company=xx"; Mockito.when(mockHttpClient.executeHTTP(targetURL/*Mockito.any()*/)).thenReturn(mockHttpResponse); Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockEntity); Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(statusLine); @@ -152,7 +148,6 @@ public List testHttpDynamicTransform(String filepath)throws Ex OUTPUT_SCHEMA, "HttpDynamicTransform-transform", baseURL, - Joiner.on(",").withKeyValueSeparator(":").join(urlParameters), Joiner.on(",").withKeyValueSeparator(":").join(urlVariables), 10, "", @@ -194,7 +189,6 @@ public void testReuseInputs()throws Exception { "HttpDynamicTransform-transform", "", "", - "", 10, "_id,mail", Joiner.on(",").withKeyValueSeparator(":").join(renamingMapping)); diff --git a/widgets/HTTP-transform.json b/widgets/HTTP-transform.json index f88a1c7..7f94936 100644 --- a/widgets/HTTP-transform.json +++ b/widgets/HTTP-transform.json @@ -17,14 +17,6 @@ "label": "URL", "name": "url" }, - { - "widget-type": "keyvalue", - "label": "Query Parameters", - "name": "queryParameters", - "widget-attributes": { - "showDelimiter": "false" - } - }, { "widget-type": "keyvalue", "label": "URL Variables", @@ -33,22 +25,6 @@ "showDelimiter": "false" } }, - { - "widget-type": "input-field-selector", - "label": "Reused Input Fields", - "name": "reusedInputs", - "widget-attributes": { - "multiselect": "true" - } - }, - { - "widget-type": "keyvalue", - "label": "Rename Reused Input Fields", - "name": "renameReusedInputs", - "widget-attributes": { - "showDelimiter": "false" - } - }, { "widget-type": "select", "label": "HTTP Method", @@ -82,8 +58,24 @@ } ] }, { - "label": "Throttling", + "label": "Multi-HTTP", "properties": [ + { + "widget-type": "input-field-selector", + "label": "Reused Input Fields", + "name": "reusedInputs", + "widget-attributes": { + "multiselect": "true" + } + }, + { + "widget-type": "keyvalue", + "label": "Rename Reused Input Fields", + "name": "renameReusedInputs", + "widget-attributes": { + "showDelimiter": "false" + } + }, { "widget-type": "number", "label": "Max calls per seconds", @@ -630,20 +622,6 @@ } ] }, - { - "name": "Pagination none", - "condition": { - "property": "paginationType", - "operator": "equal to", - "value": "None" - }, - "show": [ - { - "name": "waitTimeBetweenPages", - "type": "property" - } - ] - }, { "label": "OAuth2", "properties": [