Skip to content

Commit

Permalink
feat: udsink bidirectional streaming (#141)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Sep 26, 2024
1 parent babc8e1 commit f68f9f2
Show file tree
Hide file tree
Showing 12 changed files with 380 additions and 119 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/numaproj/numaflow/sinker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void start() throws Exception {
log.info(
"Server started, listening on {}",
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath());
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
139 changes: 77 additions & 62 deletions src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
package io.numaproj.numaflow.sinker;

import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.sink.v1.SinkGrpc;
import io.numaproj.numaflow.sink.v1.SinkOuterClass;
import lombok.extern.slf4j.Slf4j;

import java.time.Instant;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static io.numaproj.numaflow.sink.v1.SinkGrpc.getSinkFnMethod;

@Slf4j
class Service extends SinkGrpc.SinkImplBase {
// sinkTaskExecutor is the executor for the sinker. It is a fixed size thread pool
Expand All @@ -24,12 +22,6 @@ class Service extends SinkGrpc.SinkImplBase {
private final ExecutorService sinkTaskExecutor = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

// SHUTDOWN_TIME is the time to wait for the sinker to shut down, in seconds.
// We use 30 seconds as the default value because it provides a balance between giving tasks enough time to complete
// and not delaying program termination unduly.
private final long SHUTDOWN_TIME = 30;


private final Sinker sinker;

public Service(Sinker sinker) {
Expand All @@ -41,25 +33,60 @@ public Service(Sinker sinker) {
*/
@Override
public StreamObserver<SinkOuterClass.SinkRequest> sinkFn(StreamObserver<SinkOuterClass.SinkResponse> responseObserver) {
if (this.sinker == null) {
return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall(
getSinkFnMethod(),
responseObserver);
}
return new StreamObserver<>() {
private boolean startOfStream = true;
private CompletableFuture<ResponseList> result;
private DatumIteratorImpl datumStream;
private boolean handshakeDone = false;

DatumIteratorImpl datumStream = new DatumIteratorImpl();
@Override
public void onNext(SinkOuterClass.SinkRequest request) {
// make sure the handshake is done before processing the messages
if (!handshakeDone) {
if (!request.hasHandshake() || !request.getHandshake().getSot()) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("Handshake request not received")
.asException());
return;
}
responseObserver.onNext(SinkOuterClass.SinkResponse.newBuilder()
.setHandshake(request.getHandshake())
.build());
handshakeDone = true;
return;
}

Future<ResponseList> result = sinkTaskExecutor.submit(() -> this.sinker.processMessages(
datumStream));
// Create a DatumIterator to write the messages to the sinker
// and start the sinker if it is the start of the stream
if (startOfStream) {
datumStream = new DatumIteratorImpl();
result = CompletableFuture.supplyAsync(
() -> sinker.processMessages(datumStream),
sinkTaskExecutor);
startOfStream = false;
}

return new StreamObserver<SinkOuterClass.SinkRequest>() {
@Override
public void onNext(SinkOuterClass.SinkRequest d) {
try {
datumStream.writeMessage(constructHandlerDatum(d));
} catch (InterruptedException e) {
Thread.interrupted();
onError(e);
if (request.getStatus().getEot()) {
// End of transmission, write EOF datum to the stream
// Wait for the result and send the response back to the client
datumStream.writeMessage(HandlerDatum.EOF_DATUM);

ResponseList responses = result.join();
responses.getResponses().forEach(response -> {
SinkOuterClass.SinkResponse sinkResponse = buildResponse(response);
responseObserver.onNext(sinkResponse);
});

// reset the startOfStream flag, since the stream has ended
// so that the next request will be treated as the start of the stream
startOfStream = true;
} else {
datumStream.writeMessage(constructHandlerDatum(request));
}
} catch (Exception e) {
log.error("Encountered error in sinkFn - {}", e.getMessage());
responseObserver.onError(e);
}
}

Expand All @@ -71,26 +98,23 @@ public void onError(Throwable throwable) {

@Override
public void onCompleted() {
SinkOuterClass.SinkResponse response = SinkOuterClass.SinkResponse
.newBuilder()
.build();
try {
datumStream.writeMessage(HandlerDatum.EOF_DATUM);
// wait until the sink handler returns, result.get() is a blocking call
ResponseList responses = result.get();
// construct responseList from responses
response = buildResponseList(responses);

} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
onError(e);
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
};
}

private SinkOuterClass.SinkResponse buildResponse(Response response) {
SinkOuterClass.Status status = response.getFallback() ? SinkOuterClass.Status.FALLBACK :
response.getSuccess() ? SinkOuterClass.Status.SUCCESS : SinkOuterClass.Status.FAILURE;
return SinkOuterClass.SinkResponse.newBuilder()
.setResult(SinkOuterClass.SinkResponse.Result.newBuilder()
.setId(response.getId() == null ? "" : response.getId())
.setErrMsg(response.getErr() == null ? "" : response.getErr())
.setStatus(status)
.build())
.build();
}

/**
* IsReady is the heartbeat endpoint for gRPC.
*/
Expand All @@ -104,37 +128,28 @@ public void isReady(

private HandlerDatum constructHandlerDatum(SinkOuterClass.SinkRequest d) {
return new HandlerDatum(
d.getKeysList().toArray(new String[0]),
d.getValue().toByteArray(),
d.getRequest().getKeysList().toArray(new String[0]),
d.getRequest().getValue().toByteArray(),
Instant.ofEpochSecond(
d.getWatermark().getSeconds(),
d.getWatermark().getNanos()),
d.getRequest().getWatermark().getSeconds(),
d.getRequest().getWatermark().getNanos()),
Instant.ofEpochSecond(
d.getEventTime().getSeconds(),
d.getEventTime().getNanos()),
d.getId(),
d.getHeadersMap()
d.getRequest().getEventTime().getSeconds(),
d.getRequest().getEventTime().getNanos()),
d.getRequest().getId(),
d.getRequest().getHeadersMap()
);
}

public SinkOuterClass.SinkResponse buildResponseList(ResponseList responses) {
var responseBuilder = SinkOuterClass.SinkResponse.newBuilder();
responses.getResponses().forEach(response -> {
SinkOuterClass.Status status = response.getFallback() ? SinkOuterClass.Status.FALLBACK :
response.getSuccess() ? SinkOuterClass.Status.SUCCESS : SinkOuterClass.Status.FAILURE;
responseBuilder.addResults(SinkOuterClass.SinkResponse.Result.newBuilder()
.setId(response.getId() == null ? "" : response.getId())
.setErrMsg(response.getErr() == null ? "" : response.getErr())
.setStatus(status)
.build());
});
return responseBuilder.build();
}

// shuts down the executor service which is used for reduce
public void shutDown() {
this.sinkTaskExecutor.shutdown();
try {
// SHUTDOWN_TIME is the time to wait for the sinker to shut down, in seconds.
// We use 30 seconds as the default value because it provides a balance between giving tasks enough time to complete
// and not delaying program termination unduly.
long SHUTDOWN_TIME = 30;

if (!sinkTaskExecutor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) {
log.error("Sink executor did not terminate in the specified time.");
List<Runnable> droppedTasks = sinkTaskExecutor.shutdownNow();
Expand Down
49 changes: 33 additions & 16 deletions src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,13 @@ public Client(String host, int port) {
* @return response from the server as a ResponseList
*/
public ResponseList sendRequest(DatumIterator datumIterator) {
CompletableFuture<SinkOuterClass.SinkResponse> future = new CompletableFuture<>();
List<SinkOuterClass.SinkResponse> responses = new ArrayList<>();
CompletableFuture<List<SinkOuterClass.SinkResponse>> future = new CompletableFuture<>();

StreamObserver<SinkOuterClass.SinkResponse> responseObserver = new StreamObserver<>() {
@Override
public void onNext(SinkOuterClass.SinkResponse response) {
future.complete(response);
responses.add(response);
}

@Override
Expand All @@ -127,16 +129,19 @@ public void onError(Throwable t) {

@Override
public void onCompleted() {
if (!future.isDone()) {
future.completeExceptionally(new RuntimeException(
"Server completed without a response"));
}
future.complete(responses);
}
};

StreamObserver<SinkOuterClass.SinkRequest> requestObserver = sinkStub.sinkFn(
responseObserver);

// send handshake request
requestObserver.onNext(SinkOuterClass.SinkRequest.newBuilder()
.setHandshake(SinkOuterClass.Handshake.newBuilder().setSot(true).build())
.build());

// send actual requests
while (true) {
Datum datum = null;
try {
Expand All @@ -148,7 +153,8 @@ public void onCompleted() {
if (datum == null) {
break;
}
SinkOuterClass.SinkRequest request = SinkOuterClass.SinkRequest.newBuilder()
SinkOuterClass.SinkRequest.Request request = SinkOuterClass.SinkRequest.Request
.newBuilder()
.addAllKeys(
datum.getKeys()
== null ? new ArrayList<>() : List.of(datum.getKeys()))
Expand All @@ -168,28 +174,39 @@ public void onCompleted() {
.putAllHeaders(
datum.getHeaders() == null ? new HashMap<>() : datum.getHeaders())
.build();
requestObserver.onNext(request);
requestObserver.onNext(SinkOuterClass.SinkRequest
.newBuilder()
.setRequest(request)
.build());
}
// send end of transmission message
requestObserver.onNext(SinkOuterClass.SinkRequest.newBuilder().setStatus(
SinkOuterClass.SinkRequest.Status.newBuilder().setEot(true)).build());

requestObserver.onCompleted();

SinkOuterClass.SinkResponse response;
List<SinkOuterClass.SinkResponse> outputResponses;
try {
response = future.get();
outputResponses = future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}

ResponseList.ResponseListBuilder responseListBuilder = ResponseList.newBuilder();
for (SinkOuterClass.SinkResponse.Result result : response.getResultsList()) {
if (result.getStatus() == SinkOuterClass.Status.SUCCESS) {
responseListBuilder.addResponse(Response.responseOK(result.getId()));
} else if (result.getStatus() == SinkOuterClass.Status.FALLBACK) {
for (SinkOuterClass.SinkResponse result : outputResponses) {
if (result.getHandshake().getSot()) {
continue;
}
if (result.getResult().getStatus() == SinkOuterClass.Status.SUCCESS) {
responseListBuilder.addResponse(Response.responseOK(result
.getResult()
.getId()));
} else if (result.getResult().getStatus() == SinkOuterClass.Status.FALLBACK) {
responseListBuilder.addResponse(Response.responseFallback(
result.getId()));
result.getResult().getId()));
} else {
responseListBuilder.addResponse(Response.responseFailure(
result.getId(), result.getErrMsg()));
result.getResult().getId(), result.getResult().getErrMsg()));
}
}

Expand Down
33 changes: 23 additions & 10 deletions src/main/java/io/numaproj/numaflow/sourcer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,25 @@ public Service(Sourcer sourcer) {
@Override
public StreamObserver<SourceOuterClass.ReadRequest> readFn(final StreamObserver<SourceOuterClass.ReadResponse> responseObserver) {
return new StreamObserver<>() {
private boolean handshakeDone = false;

@Override
public void onNext(SourceOuterClass.ReadRequest request) {
// if the request is a handshake, send handshake response.
if (request.hasHandshake() && request.getHandshake().getSot()) {
// make sure that the handshake is done before processing the read requests
if (!handshakeDone) {
if (!request.hasHandshake() || !request.getHandshake().getSot()) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("Handshake request not received")
.asException());
return;
}
responseObserver.onNext(SourceOuterClass.ReadResponse.newBuilder()
.setHandshake(request.getHandshake())
.setStatus(SourceOuterClass.ReadResponse.Status.newBuilder()
.setCode(SourceOuterClass.ReadResponse.Status.Code.SUCCESS)
.build())
.build());
handshakeDone = true;
return;
}

ReadRequestImpl readRequest = new ReadRequestImpl(
request.getRequest().getNumRecords(),
Duration.ofMillis(request.getRequest().getTimeoutInMs()));
Expand Down Expand Up @@ -89,16 +96,22 @@ public void onCompleted() {
@Override
public StreamObserver<SourceOuterClass.AckRequest> ackFn(final StreamObserver<SourceOuterClass.AckResponse> responseObserver) {
return new StreamObserver<>() {
private boolean handshakeDone = false;

@Override
public void onNext(SourceOuterClass.AckRequest request) {
// if the request is a handshake, send a handshake response
if (request.hasHandshake() && request.getHandshake().getSot()) {
// make sure that the handshake is done before processing the ack requests
if (!handshakeDone) {
if (!request.hasHandshake() || !request.getHandshake().getSot()) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("Handshake request not received")
.asException());
return;
}
responseObserver.onNext(SourceOuterClass.AckResponse.newBuilder()
.setHandshake(request.getHandshake())
.setResult(SourceOuterClass.AckResponse.Result.newBuilder().setSuccess(
Empty.newBuilder().build()))
.build());
return;
handshakeDone = true;
}

SourceOuterClass.Offset offset = request.getRequest().getOffset();
Expand Down
Loading

0 comments on commit f68f9f2

Please sign in to comment.