Skip to content

Commit

Permalink
Set the operation timeout on the call context for each callable
Browse files Browse the repository at this point in the history
  • Loading branch information
djyau committed Oct 25, 2024
1 parent e9b4b12 commit 74b9ae5
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,13 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
new TracedServerStreamingCallable<>(
readRowsUserCallable, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext().
withOption(deadlineKey, (long) settings.readRowsSettings().getRetrySettings().getTotalTimeout().toMillis()));
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
deadlineKey,
(long)
settings.readRowsSettings().getRetrySettings().getTotalTimeout().toMillis()));
}

/**
Expand Down Expand Up @@ -578,7 +583,13 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
new TracedUnaryCallable<>(
firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow"));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
deadlineKey,
(long)
settings.readRowsSettings().getRetrySettings().getTotalTimeout().toMillis()));
}

/**
Expand Down Expand Up @@ -697,7 +708,13 @@ private <RowT> UnaryCallable<Query, List<RowT>> createBulkReadRowsCallable(
UnaryCallable<Query, List<RowT>> traced =
new TracedUnaryCallable<>(tracedBatcher, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
deadlineKey,
(long)
settings.readRowsSettings().getRetrySettings().getTotalTimeout().toMillis()));
}

/**
Expand Down Expand Up @@ -956,7 +973,13 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
new TracedUnaryCallable<>(
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
deadlineKey,
(long)
settings.readRowsSettings().getRetrySettings().getTotalTimeout().toMillis()));
}

/**
Expand Down Expand Up @@ -1225,7 +1248,13 @@ public Map<String, String> extract(
ServerStreamingCallable<String, ByteStringRange> traced =
new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
deadlineKey,
(long)
settings.readRowsSettings().getRetrySettings().getTotalTimeout().toMillis()));
}

/**
Expand Down Expand Up @@ -1305,7 +1334,13 @@ public Map<String, String> extract(
new TracedServerStreamingCallable<>(
readChangeStreamUserCallable, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
deadlineKey,
(long)
settings.readRowsSettings().getRetrySettings().getTotalTimeout().toMillis()));
}

/**
Expand Down Expand Up @@ -1391,7 +1426,18 @@ public Map<String, String> extract(ExecuteQueryRequest executeQueryRequest) {
new TracedServerStreamingCallable<>(retries, clientContext.getTracerFactory(), span);

return new ExecuteQueryCallable(
traced.withDefaultCallContext(clientContext.getDefaultCallContext()), requestContext);
traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
deadlineKey,
(long)
settings
.readRowsSettings()
.getRetrySettings()
.getTotalTimeout()
.toMillis())),
requestContext);
}

/**
Expand All @@ -1404,7 +1450,13 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
UnaryCallable<RequestT, ResponseT> traced =
new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
deadlineKey,
(long)
settings.readRowsSettings().getRetrySettings().getTotalTimeout().toMillis()));
}

private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
Expand All @@ -1423,7 +1475,13 @@ public Map<String, String> extract(PingAndWarmRequest request) {
})
.build(),
Collections.emptySet());
return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext());
return pingAndWarm.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
deadlineKey,
(long)
settings.readRowsSettings().getRetrySettings().getTotalTimeout().toMillis()));
}

private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> withRetries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ public void grpcMessageSent() {
}

/**
* Record the deadline when the request is sent to the Bigtable server. This will be
* called in BuiltinMetricsTracer.
* Record the deadline when the request is sent to the Bigtable server. This will be called in
* BuiltinMetricsTracer.
*/
public void setRemainingDeadline(long deadline) {
// noop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public class BigtableTracerStreamingCallable<RequestT, ResponseT>
private final ApiCallContext.Key<Long> deadlineKey;

public BigtableTracerStreamingCallable(
@Nonnull ServerStreamingCallable<RequestT, ResponseT> callable, ApiCallContext.Key<Long> deadlineKey) {
@Nonnull ServerStreamingCallable<RequestT, ResponseT> callable,
ApiCallContext.Key<Long> deadlineKey) {
this.innerCallable = Preconditions.checkNotNull(callable, "Inner callable must be set");
this.deadlineKey = deadlineKey;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ public class BigtableTracerUnaryCallable<RequestT, ResponseT>
private final UnaryCallable<RequestT, ResponseT> innerCallable;
private final ApiCallContext.Key<Long> deadlineKey;

public BigtableTracerUnaryCallable(@Nonnull UnaryCallable<RequestT, ResponseT> innerCallable, ApiCallContext.Key<Long> deadlineKey) {
public BigtableTracerUnaryCallable(
@Nonnull UnaryCallable<RequestT, ResponseT> innerCallable,
ApiCallContext.Key<Long> deadlineKey) {
this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set");
this.deadlineKey = deadlineKey;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,9 @@ public void grpcMessageSent() {
}

/*
In GrpcClientCalls we set the timeout in ApiContext on CallOptions. The timeout in ApiContext will be the attempt
timeout for the first few requests or the remaining operation timeout after retries and back offs.
*/
In GrpcClientCalls we set the timeout in ApiContext on CallOptions. The timeout in ApiContext will be the attempt
timeout for the first few requests or the remaining operation timeout after retries and back offs.
*/
@Override
public void setRemainingDeadline(long deadline) {
this.deadline = deadline - operationTimer.elapsed(TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public static BuiltinMetricsTracerFactory create(
remainingDeadlineHistogram =
meter
.histogramBuilder(REMAINING_DEADLINE_NAME)
.setDescription("The remaining deadline when the request is sent to grpc. This will either be the attempt timeout for the first few retries, or the reamining deadline from operation timeout after retries and back offs.")
.setDescription(
"The remaining deadline when the request is sent to grpc. This will either be the attempt timeout for the first few retries, or the reamining deadline from operation timeout after retries and back offs.")
.setUnit(MILLISECOND)
.build();
connectivityErrorCounter =
Expand Down Expand Up @@ -143,7 +144,7 @@ public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType op
firstResponseLatenciesHistogram,
clientBlockingLatenciesHistogram,
applicationBlockingLatenciesHistogram,
remainingDeadlineHistogram,
remainingDeadlineHistogram,
connectivityErrorCounter,
retryCounter);
}
Expand Down

0 comments on commit 74b9ae5

Please sign in to comment.