Skip to content

Commit

Permalink
chore: factor out all of the unary callable logic into a helper
Browse files Browse the repository at this point in the history
Change-Id: Idbb2727108857daa662f2e4690053cb6065ad054
  • Loading branch information
igorbernstein2 committed Nov 1, 2024
1 parent ecf4d3e commit 128b31a
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 679 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.INSTANCE_ID_KEY;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.Batcher;
Expand All @@ -39,6 +41,7 @@
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.RequestParamsExtractor;
Expand All @@ -55,22 +58,17 @@
import com.google.auth.Credentials;
import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.CheckAndMutateRowRequest;
import com.google.bigtable.v2.CheckAndMutateRowResponse;
import com.google.bigtable.v2.ExecuteQueryRequest;
import com.google.bigtable.v2.ExecuteQueryResponse;
import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsRequest;
import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsResponse;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.bigtable.v2.ReadChangeStreamRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.bigtable.v2.ReadModifyWriteRowRequest;
import com.google.bigtable.v2.ReadModifyWriteRowResponse;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsResponse;
import com.google.bigtable.v2.RowRange;
Expand Down Expand Up @@ -141,8 +139,10 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.opencensus.stats.Stats;
import io.opencensus.stats.StatsRecorder;
import io.opencensus.tags.TagKey;
Expand All @@ -159,6 +159,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -801,42 +802,14 @@ private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
* </ul>
*/
private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
String methodName = "MutateRow";
UnaryCallable<MutateRowRequest, MutateRowResponse> base =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<MutateRowRequest, MutateRowResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getMutateRowMethod())
.setParamsExtractor(
new RequestParamsExtractor<MutateRowRequest>() {
@Override
public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
String tableName = mutateRowRequest.getTableName();
String authorizedViewName = mutateRowRequest.getAuthorizedViewName();
if (tableName.isEmpty()) {
tableName =
NameUtil.extractTableNameFromAuthorizedViewName(authorizedViewName);
}
return ImmutableMap.of(
"table_name",
tableName,
"app_profile_id",
mutateRowRequest.getAppProfileId());
}
})
.build(),
settings.mutateRowSettings().getRetryableCodes());

UnaryCallable<MutateRowRequest, MutateRowResponse> withStatsHeaders =
new StatsHeadersUnaryCallable<>(base);

UnaryCallable<MutateRowRequest, MutateRowResponse> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
withRetries(withBigtableTracer, settings.mutateRowSettings());

return createUserFacingUnaryCallable(
methodName, new MutateRowCallable(retrying, requestContext));
return createUnaryCallable(
BigtableGrpc.getMutateRowMethod(),
req ->
composeRequestParams(
req.getAppProfileId(), req.getTableName(), req.getAuthorizedViewName()),
settings.mutateRowSettings(),
req -> req.toProto(requestContext),
resp -> null);
}

/**
Expand Down Expand Up @@ -1056,44 +1029,14 @@ public Batcher<ByteString, Row> newBulkReadRowsBatcher(
* </ul>
*/
private UnaryCallable<ConditionalRowMutation, Boolean> createCheckAndMutateRowCallable() {
String methodName = "CheckAndMutateRow";
UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> base =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<CheckAndMutateRowRequest, CheckAndMutateRowResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getCheckAndMutateRowMethod())
.setParamsExtractor(
new RequestParamsExtractor<CheckAndMutateRowRequest>() {
@Override
public Map<String, String> extract(
CheckAndMutateRowRequest checkAndMutateRowRequest) {
String tableName = checkAndMutateRowRequest.getTableName();
String authorizedViewName =
checkAndMutateRowRequest.getAuthorizedViewName();
if (tableName.isEmpty()) {
tableName =
NameUtil.extractTableNameFromAuthorizedViewName(authorizedViewName);
}
return ImmutableMap.of(
"table_name",
tableName,
"app_profile_id",
checkAndMutateRowRequest.getAppProfileId());
}
})
.build(),
settings.checkAndMutateRowSettings().getRetryableCodes());

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withStatsHeaders =
new StatsHeadersUnaryCallable<>(base);

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
withRetries(withBigtableTracer, settings.checkAndMutateRowSettings());

return createUserFacingUnaryCallable(
methodName, new CheckAndMutateRowCallable(retrying, requestContext));
return createUnaryCallable(
BigtableGrpc.getCheckAndMutateRowMethod(),
req ->
composeRequestParams(
req.getAppProfileId(), req.getTableName(), req.getAuthorizedViewName()),
settings.checkAndMutateRowSettings(),
req -> req.toProto(requestContext),
CheckAndMutateRowResponse::getPredicateMatched);
}

/**
Expand All @@ -1107,39 +1050,16 @@ public Map<String, String> extract(
* </ul>
*/
private UnaryCallable<ReadModifyWriteRow, Row> createReadModifyWriteRowCallable() {
UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> base =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getReadModifyWriteRowMethod())
.setParamsExtractor(
new RequestParamsExtractor<ReadModifyWriteRowRequest>() {
@Override
public Map<String, String> extract(ReadModifyWriteRowRequest request) {
String tableName = request.getTableName();
String authorizedViewName = request.getAuthorizedViewName();
if (tableName.isEmpty()) {
tableName =
NameUtil.extractTableNameFromAuthorizedViewName(authorizedViewName);
}
return ImmutableMap.of(
"table_name", tableName, "app_profile_id", request.getAppProfileId());
}
})
.build(),
settings.readModifyWriteRowSettings().getRetryableCodes());

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> withStatsHeaders =
new StatsHeadersUnaryCallable<>(base);

String methodName = "ReadModifyWriteRow";
UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
withRetries(withBigtableTracer, settings.readModifyWriteRowSettings());

return createUserFacingUnaryCallable(
methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
DefaultRowAdapter rowAdapter = new DefaultRowAdapter();

return createUnaryCallable(
BigtableGrpc.getReadModifyWriteRowMethod(),
req ->
composeRequestParams(
req.getAppProfileId(), req.getTableName(), req.getAuthorizedViewName()),
settings.readModifyWriteRowSettings(),
req -> req.toProto(requestContext),
resp -> rowAdapter.createRowFromProto(resp.getRow()));
}

/**
Expand Down Expand Up @@ -1404,6 +1324,57 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

private Map<String, String> composeRequestParams(
String appProfileId, String tableName, String authorizedViewName) {
if (tableName.isEmpty()) {
tableName = NameUtil.extractTableNameFromAuthorizedViewName(authorizedViewName);
}
return ImmutableMap.of("table_name", tableName, "app_profile_id", appProfileId);
}

private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnaryCallable(
MethodDescriptor<BaseReqT, BaseRespT> methodDescriptor,
RequestParamsExtractor<BaseReqT> headerParamsFn,
UnaryCallSettings<ReqT, RespT> callSettings,
Function<ReqT, BaseReqT> requestTransformer,
Function<BaseRespT, RespT> responseTranformer) {

UnaryCallable<BaseReqT, BaseRespT> base =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<BaseReqT, BaseRespT>newBuilder()
.setMethodDescriptor(methodDescriptor)
.setParamsExtractor(headerParamsFn)
.build(),
callSettings.getRetryableCodes());

UnaryCallable<BaseReqT, BaseRespT> withStatsHeaders = new StatsHeadersUnaryCallable<>(base);

UnaryCallable<BaseReqT, BaseRespT> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<BaseReqT, BaseRespT> retrying =
withRetries(withBigtableTracer, settings.mutateRowSettings());

UnaryCallable<ReqT, RespT> transformed =
new UnaryCallable<ReqT, RespT>() {
@Override
public ApiFuture<RespT> futureCall(ReqT reqT, ApiCallContext apiCallContext) {
ApiFuture<BaseRespT> f =
retrying.futureCall(requestTransformer.apply(reqT), apiCallContext);
return ApiFutures.transform(
f, responseTranformer::apply, MoreExecutors.directExecutor());
}
};

UnaryCallable<ReqT, RespT> traced =
new TracedUnaryCallable<>(
transformed,
clientContext.getTracerFactory(),
getSpanName(methodDescriptor.getBareMethodName()));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarm =
GrpcRawCallableFactory.createUnaryCallable(
Expand Down

This file was deleted.

Loading

0 comments on commit 128b31a

Please sign in to comment.