Skip to content

Commit

Permalink
fix: fix client blocking latency (#2346)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)
- [ ] Rollback plan is reviewed and LGTMed
- [ ] All new data plane features have a completed end to end testing plan

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
mutianf authored Oct 16, 2024
1 parent be62968 commit 3801961
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.common.base.Stopwatch;
import io.grpc.Attributes;
import io.grpc.ClientStreamTracer;
import io.grpc.Metadata;
import java.util.concurrent.TimeUnit;

/**
* Records the time a request is enqueued in a grpc channel queue. This a bridge between gRPC stream
Expand All @@ -28,21 +25,15 @@
*/
class BigtableGrpcStreamTracer extends ClientStreamTracer {

private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final BigtableTracer tracer;

public BigtableGrpcStreamTracer(BigtableTracer tracer) {
this.tracer = tracer;
}

@Override
public void streamCreated(Attributes transportAttrs, Metadata headers) {
stopwatch.start();
}

@Override
public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
tracer.grpcChannelQueuedLatencies(stopwatch.elapsed(TimeUnit.NANOSECONDS));
tracer.grpcMessageSent();
}

static class Factory extends ClientStreamTracer.Factory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,14 @@ public void setLocations(String zone, String cluster) {
// noop
}

@Deprecated
/** @deprecated {@link #grpcMessageSent()} is called instead. */
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
// noop
}

/** Called when the message is sent on a grpc channel. */
public void grpcMessageSent() {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class BuiltinMetricsTracer extends BigtableTracer {
private final Attributes baseAttributes;

private Long serverLatencies = null;
private final AtomicLong grpcMessageSentDelay = new AtomicLong(0);

// OpenCensus (and server) histogram buckets use [start, end), however OpenTelemetry uses (start,
// end]. To work around this, we measure all the latencies in nanoseconds and convert them
Expand Down Expand Up @@ -263,8 +264,8 @@ public void batchRequestThrottled(long throttledTimeNanos) {
}

@Override
public void grpcChannelQueuedLatencies(long queuedTimeNanos) {
totalClientBlockingTime.addAndGet(queuedTimeNanos);
public void grpcMessageSent() {
grpcMessageSentDelay.set(attemptTimer.elapsed(TimeUnit.NANOSECONDS));
}

@Override
Expand Down Expand Up @@ -351,6 +352,7 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
.put(STATUS_KEY, statusStr)
.build();

totalClientBlockingTime.addAndGet(grpcMessageSentDelay.get());
clientBlockingLatenciesHistogram.record(convertToMs(totalClientBlockingTime.get()), attributes);

attemptLatenciesHistogram.record(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,11 @@ public void grpcChannelQueuedLatencies(long queuedTimeMs) {
tracer.grpcChannelQueuedLatencies(queuedTimeMs);
}
}

@Override
public void grpcMessageSent() {
for (BigtableTracer tracer : bigtableTracers) {
tracer.grpcMessageSent();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,11 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.StringValue;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingServerCall;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ProxiedSocketAddress;
import io.grpc.ProxyDetector;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
Expand All @@ -95,6 +91,8 @@
import io.opentelemetry.sdk.metrics.View;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -104,6 +102,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -130,7 +129,7 @@ public class BuiltinMetricsTracerTest {
private static final long SLEEP_VARIABILITY = 15;
private static final String CLIENT_NAME = "java-bigtable/" + Version.VERSION;

private static final long CHANNEL_BLOCKING_LATENCY = 75;
private static final long CHANNEL_BLOCKING_LATENCY = 200;

@Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();

Expand Down Expand Up @@ -196,35 +195,14 @@ public void sendHeaders(Metadata headers) {
}
};

ClientInterceptor clientInterceptor =
new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions,
Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions)) {
@Override
public void sendMessage(ReqT message) {
try {
Thread.sleep(CHANNEL_BLOCKING_LATENCY);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
super.sendMessage(message);
}
};
}
};

server = FakeServiceBuilder.create(fakeService).intercept(trailersInterceptor).start();

BigtableDataSettings settings =
BigtableDataSettings.newBuilderForEmulator(server.getPort())
.setProjectId(PROJECT_ID)
.setInstanceId(INSTANCE_ID)
.setAppProfileId(APP_PROFILE_ID)
.setRefreshingChannel(false)
.build();
EnhancedBigtableStubSettings.Builder stubSettingsBuilder =
settings.getStubSettings().toBuilder();
Expand Down Expand Up @@ -264,7 +242,7 @@ public void sendMessage(ReqT message) {
if (oldConfigurator != null) {
builder = oldConfigurator.apply(builder);
}
return builder.intercept(clientInterceptor);
return builder.proxyDetector(new DelayProxyDetector());
});
stubSettingsBuilder.setTransportChannelProvider(channelProvider.build());

Expand Down Expand Up @@ -692,9 +670,8 @@ public void testQueuedOnChannelUnaryLatencies() {
.put(CLIENT_NAME_KEY, CLIENT_NAME)
.build();

long expected = CHANNEL_BLOCKING_LATENCY * 2 / 3;
long actual = getAggregatedValue(clientLatency, attributes);
assertThat(actual).isAtLeast(expected);
assertThat(actual).isAtLeast(CHANNEL_BLOCKING_LATENCY);
}

@Test
Expand Down Expand Up @@ -838,4 +815,18 @@ public AtomicInteger getResponseCounter() {
return responseCounter;
}
}

class DelayProxyDetector implements ProxyDetector {

@Nullable
@Override
public ProxiedSocketAddress proxyFor(SocketAddress socketAddress) throws IOException {
try {
Thread.sleep(CHANNEL_BLOCKING_LATENCY);
} catch (InterruptedException e) {

}
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,11 @@ public void testRequestBlockedOnChannel() {
verify(child3, times(1)).grpcChannelQueuedLatencies(5L);
verify(child4, times(1)).grpcChannelQueuedLatencies(5L);
}

@Test
public void testGrpcMessageSent() {
compositeTracer.grpcMessageSent();
verify(child3, times(1)).grpcMessageSent();
verify(child4, times(1)).grpcMessageSent();
}
}

0 comments on commit 3801961

Please sign in to comment.