Skip to content

Commit

Permalink
Add a read timeout and cache BigQueryIOMetadata (#29662)
Browse files Browse the repository at this point in the history
  • Loading branch information
scwhittle authored Jan 16, 2024
1 parent 308c38e commit dd5bbb2
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ static String fetchMetadata(String key) {
int timeoutMillis = 5000;
final HttpParams httpParams = new BasicHttpParams();
HttpConnectionParams.setConnectionTimeout(httpParams, timeoutMillis);
HttpConnectionParams.setSoTimeout(httpParams, timeoutMillis);
String ret = "";
try {
HttpClient client = new DefaultHttpClient(httpParams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,25 @@
package org.apache.beam.sdk.io.gcp.bigquery;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.beam.sdk.extensions.gcp.util.GceMetadataUtil;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Metadata class for BigQueryIO. i.e. to use as BQ job labels. */
final class BigQueryIOMetadata {

private @Nullable String beamJobId;
private final @Nullable String beamJobId;

private @Nullable String beamJobName;
private final @Nullable String beamJobName;

private @Nullable String beamWorkerId;
private final @Nullable String beamWorkerId;

static final Supplier<BigQueryIOMetadata> INSTANCE =
Suppliers.memoizeWithExpiration(() -> refreshInstance(), 5, TimeUnit.MINUTES);

private BigQueryIOMetadata(
@Nullable String beamJobId, @Nullable String beamJobName, @Nullable String beamWorkerId) {
Expand All @@ -47,25 +53,21 @@ private BigQueryIOMetadata(
* being used.
*/
public static BigQueryIOMetadata create() {
String dataflowJobId = GceMetadataUtil.fetchDataflowJobId();
String dataflowJobName = GceMetadataUtil.fetchDataflowJobName();
String dataflowWorkerId = GceMetadataUtil.fetchDataflowWorkerId();
return INSTANCE.get();
}

private static BigQueryIOMetadata refreshInstance() {
String dataflowJobId = GceMetadataUtil.fetchDataflowJobId();
// If a Dataflow job id is returned on GCE metadata. Then it means
// this program is running on a Dataflow GCE VM.
boolean isDataflowRunner = !dataflowJobId.isEmpty();

String beamJobId = null;
String beamJobName = null;
String beamWorkerId = null;
if (isDataflowRunner) {
if (BigQueryIOMetadata.isValidCloudLabel(dataflowJobId)) {
beamJobId = dataflowJobId;
beamJobName = dataflowJobName;
beamWorkerId = dataflowWorkerId;
}
if (dataflowJobId.isEmpty() || !BigQueryIOMetadata.isValidCloudLabel(dataflowJobId)) {
return new BigQueryIOMetadata(null, null, null);
}
return new BigQueryIOMetadata(beamJobId, beamJobName, beamWorkerId);

return new BigQueryIOMetadata(
dataflowJobId,
GceMetadataUtil.fetchDataflowJobName(),
GceMetadataUtil.fetchDataflowWorkerId());
}

public Map<String, String> addAdditionalJobLabels(Map<String, String> jobLabels) {
Expand Down

0 comments on commit dd5bbb2

Please sign in to comment.