Skip to content

Commit

Permalink
chore: remodel unary callables as server streaming callables with an …
Browse files Browse the repository at this point in the history
…adapter at the end

Change-Id: I8708dff0e192d7647ef2cb361fc0992e1ddd2b24
  • Loading branch information
igorbernstein2 committed Nov 4, 2024
1 parent b40828c commit ec03559
Show file tree
Hide file tree
Showing 4 changed files with 338 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.AbstractApiFuture;
import com.google.api.core.ApiFuture;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.ApiTracerFactory;
import com.google.api.gax.tracing.SpanName;
import com.google.common.base.Preconditions;
import io.grpc.Status;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
* Helper to convert a fake {@link ServerStreamingCallable} (ie only up to 1 response) into a {@link
* UnaryCallable}. It is intended to be the outermost callable of a chain.
*
* <p>Responsibilities:
*
* <ul>
* <li>Operation level metrics
* <li>Configuring the default call context
* <li>Converting the result to a future
*/
class BigtableUnaryOperationCallable<ReqT, RespT> extends UnaryCallable<ReqT, RespT> {
private static final Logger LOGGER =
Logger.getLogger(BigtableUnaryOperationCallable.class.getName());

private final ServerStreamingCallable<ReqT, RespT> inner;
private final ApiCallContext defaultCallContext;
private final ApiTracerFactory tracerFactory;
private final SpanName spanName;
private final boolean allowNoResponse;

public BigtableUnaryOperationCallable(
ServerStreamingCallable<ReqT, RespT> inner,
ApiCallContext defaultCallContext,
ApiTracerFactory tracerFactory,
SpanName spanName,
boolean allowNoResponse) {
this.inner = inner;
this.defaultCallContext = defaultCallContext;
this.tracerFactory = tracerFactory;
this.spanName = spanName;
this.allowNoResponse = allowNoResponse;
}

@Override
public ApiFuture<RespT> futureCall(ReqT req, ApiCallContext apiCallContext) {
apiCallContext = defaultCallContext.merge(apiCallContext);

ApiTracer apiTracer =
tracerFactory.newTracer(
apiCallContext.getTracer(), spanName, ApiTracerFactory.OperationType.Unary);

apiCallContext = apiCallContext.withTracer(apiTracer);

UnaryFuture f = new UnaryFuture(apiTracer, allowNoResponse);
inner.call(req, f, apiCallContext);
return f;
}

class UnaryFuture extends AbstractApiFuture<RespT> implements ResponseObserver<RespT> {
private final ApiTracer tracer;
private final boolean allowNoResponse;

private StreamController controller;
private boolean responseReceived;
private @Nullable RespT response;

private UnaryFuture(ApiTracer tracer, boolean allowNoResponse) {
this.tracer = Preconditions.checkNotNull(tracer, "tracer can't be null");
this.allowNoResponse = allowNoResponse;
this.responseReceived = false;
}

@Override
public void onStart(StreamController controller) {
this.controller = controller;
controller.disableAutoInboundFlowControl();
// Request 2 to detect protocol bugs
controller.request(2);
}

/**
* Immediately cancel the future state and try to cancel the underlying operation. Will return
* false if the future is already resolved.
*/
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (super.cancel(mayInterruptIfRunning)) {
controller.cancel();
return true;
}
return false;
}

@Override
public void onResponse(RespT resp) {
tracer.responseReceived();

if (responseReceived) {
String msg =
String.format(
"Received multiple responses for a %s unary operation. Previous: %s, New: %s",
spanName, resp, response);
LOGGER.log(Level.WARNING, msg);

InternalException error =
new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false);
if (setException(error)) {
tracer.operationFailed(error);
}

controller.cancel();
return;
}

responseReceived = true;
this.response = resp;
this.tracer.responseReceived();
}

@Override
public void onError(Throwable throwable) {
if (this.setException(throwable)) {
tracer.operationFailed(throwable);
} else if (isCancelled()) {
tracer.operationCancelled();
}
// The future might've been resolved due to double response
}

@Override
public void onComplete() {
// Sanity check server response counts
if (!allowNoResponse && !responseReceived) {
String msg = spanName + " unary operation completed without a response message";
InternalException e =
new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false);

if (setException(e)) {
tracer.operationFailed(e);
}
return;
}

// check cancellation race
if (isCancelled()) {
tracer.operationCancelled();
return;
}

// try to resolve the future
if (this.set(response)) {
tracer.operationSucceeded();
return;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.SimpleStreamResumptionStrategy;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.RequestParamsExtractor;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StatusCode.Code;
Expand Down Expand Up @@ -124,7 +126,6 @@
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsFirstCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
Expand Down Expand Up @@ -155,6 +156,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -568,16 +570,22 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
.build(),
rowAdapter);

ReadRowsUserCallable<RowT> readRowCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

ReadRowsFirstCallable<RowT> firstRow = new ReadRowsFirstCallable<>(readRowCallable);

UnaryCallable<Query, RowT> traced =
new TracedUnaryCallable<>(
firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow"));
ServerStreamingCallable<Query, RowT> readRowCallable =
new ServerStreamingCallable<Query, RowT>() {
@Override
public void call(
Query query, ResponseObserver<RowT> responseObserver, ApiCallContext apiCallContext) {
readRowsCallable.call(
query.limit(1).toProto(requestContext), responseObserver, apiCallContext);
}
};

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return new BigtableUnaryOperationCallable<>(
readRowCallable,
clientContext.getDefaultCallContext(),
clientContext.getTracerFactory(),
getSpanName("ReadRow"),
/*allowNoResponses=*/ true);
}

/**
Expand Down Expand Up @@ -1325,6 +1333,21 @@ private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnar
UnaryCallSettings<ReqT, RespT> callSettings,
Function<ReqT, BaseReqT> requestTransformer,
Function<BaseRespT, RespT> responseTranformer) {
if (EnhancedBigtableStubSettings.SKIP_TRAILERS) {
return createUnaryCallableNew(
methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer);
} else {
return createUnaryCallableNew(
methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer);
}
}

private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnaryCallableOld(
MethodDescriptor<BaseReqT, BaseRespT> methodDescriptor,
RequestParamsExtractor<BaseReqT> headerParamsFn,
UnaryCallSettings<ReqT, RespT> callSettings,
Function<ReqT, BaseReqT> requestTransformer,
Function<BaseRespT, RespT> responseTranformer) {

UnaryCallable<BaseReqT, BaseRespT> base =
GrpcRawCallableFactory.createUnaryCallable(
Expand Down Expand Up @@ -1361,6 +1384,50 @@ public ApiFuture<RespT> futureCall(ReqT reqT, ApiCallContext apiCallContext) {
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnaryCallableNew(
MethodDescriptor<BaseReqT, BaseRespT> methodDescriptor,
RequestParamsExtractor<BaseReqT> headerParamsFn,
UnaryCallSettings<ReqT, RespT> callSettings,
Function<ReqT, BaseReqT> requestTransformer,
Function<BaseRespT, RespT> responseTranformer) {

ServerStreamingCallable<BaseReqT, BaseRespT> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<BaseReqT, BaseRespT>newBuilder()
.setMethodDescriptor(methodDescriptor)
.setParamsExtractor(headerParamsFn)
.build(),
callSettings.getRetryableCodes());

base = new StatsHeadersServerStreamingCallable<>(base);

base = new BigtableTracerStreamingCallable<>(base);

base = withRetries(base, convertUnaryToServerStreamingSettings(settings.mutateRowSettings()));

ServerStreamingCallable<ReqT, RespT> transformed =
new TransformingServerStreamingCallable<>(base, requestTransformer, responseTranformer);

return new BigtableUnaryOperationCallable<>(
transformed,
clientContext.getDefaultCallContext(),
clientContext.getTracerFactory(),
getSpanName(methodDescriptor.getBareMethodName()),
/* allowNoResponse= */ false);
}

private static <ReqT, RespT>
ServerStreamingCallSettings<ReqT, RespT> convertUnaryToServerStreamingSettings(
UnaryCallSettings<?, ?> unarySettings) {
return ServerStreamingCallSettings.<ReqT, RespT>newBuilder()
.setResumptionStrategy(new SimpleStreamResumptionStrategy<>())
.setRetryableCodes(unarySettings.getRetryableCodes())
.setRetrySettings(unarySettings.getRetrySettings())
.setIdleTimeoutDuration(Duration.ZERO)
.setWaitTimeoutDuration(Duration.ZERO)
.build();
}

private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarm =
GrpcRawCallableFactory.createUnaryCallable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -108,6 +109,11 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private static final boolean DIRECT_PATH_ENABLED =
Boolean.parseBoolean(System.getenv("CBT_ENABLE_DIRECTPATH"));

static final boolean SKIP_TRAILERS =
Optional.of(System.getenv("CBT_SKIP_HEADERS"))
.map(Boolean::parseBoolean)
.orElse(DIRECT_PATH_ENABLED);

private static final Set<Code> IDEMPOTENT_RETRY_CODES =
ImmutableSet.of(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE);

Expand Down
Loading

0 comments on commit ec03559

Please sign in to comment.