diff --git a/src/main/java/io/numaproj/numaflow/reducer/ActorRequest.java b/src/main/java/io/numaproj/numaflow/reducer/ActorRequest.java new file mode 100644 index 00000000..6c1f7786 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducer/ActorRequest.java @@ -0,0 +1,28 @@ +package io.numaproj.numaflow.reducer; + +import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * ActorRequest is to store the request sent to ReduceActors. + */ +@Getter +@AllArgsConstructor +class ActorRequest { + ReduceOuterClass.ReduceRequest request; + + // TODO - do we need to include window information in the id? + // for aligned reducer, there is always single window. + // but at the same time, would like to be consistent with GO SDK implementation. + // we will revisit this one later. + public String getUniqueIdentifier() { + return String.join( + Constants.DELIMITER, + this.getRequest().getPayload().getKeysList().toArray(new String[0])); + } + + public String[] getKeySet() { + return this.getRequest().getPayload().getKeysList().toArray(new String[0]); + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducer/ActorResponse.java b/src/main/java/io/numaproj/numaflow/reducer/ActorResponse.java index 5820356a..78dc44c4 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/ActorResponse.java +++ b/src/main/java/io/numaproj/numaflow/reducer/ActorResponse.java @@ -10,6 +10,15 @@ @Getter @AllArgsConstructor class ActorResponse { - String[] keys; ReduceOuterClass.ReduceResponse response; + + // TODO - do we need to include window information in the id? + // for aligned reducer, there is always single window. + // but at the same time, would like to be consistent with GO SDK implementation. + // we will revisit this one later. + public String getUniqueIdentifier() { + return String.join( + Constants.DELIMITER, + this.getResponse().getResult().getKeysList().toArray(new String[0])); + } } diff --git a/src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java b/src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java index 542c8a8f..612160e0 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java +++ b/src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java @@ -7,12 +7,10 @@ @AllArgsConstructor class HandlerDatum implements Datum { - private byte[] value; private Instant watermark; private Instant eventTime; - @Override public Instant getWatermark() { return this.watermark; @@ -27,5 +25,4 @@ public byte[] getValue() { public Instant getEventTime() { return this.eventTime; } - } diff --git a/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java b/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java index 1065c7b5..aeb2253e 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java +++ b/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java @@ -4,6 +4,7 @@ import akka.actor.Props; import akka.japi.pf.ReceiveBuilder; import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -19,7 +20,6 @@ @Slf4j @AllArgsConstructor class ReduceActor extends AbstractActor { - private String[] keys; private Metadata md; private Reducer groupBy; @@ -44,23 +44,53 @@ private void invokeHandler(HandlerDatum handlerDatum) { private void getResult(String eof) { MessageList resultMessages = this.groupBy.getOutput(keys, md); // send the result back to sender(parent actor) - getSender().tell(buildDatumListResponse(resultMessages), getSelf()); + resultMessages.getMessages().forEach(message -> { + getSender().tell(buildActorResponse(message), getSelf()); + }); + // send a response back with EOF set to true, indicating the reducer has finished the data aggregation. + getSender().tell(buildEOFActorResponse(), getSelf()); } - private ActorResponse buildDatumListResponse(MessageList messageList) { + private ActorResponse buildActorResponse(Message message) { ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder(); - messageList.getMessages().forEach(message -> { - responseBuilder.addResults(ReduceOuterClass.ReduceResponse.Result.newBuilder() - .setValue(ByteString.copyFrom(message.getValue())) - .addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList( - message.getKeys())) - .addAllTags(message.getTags() == null ? new ArrayList<>() : List.of( - message.getTags())) - .build()); - - }); - return new ActorResponse(this.keys, responseBuilder.build()); + // set the window using the actor metadata. + responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder() + .setStart(Timestamp.newBuilder() + .setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond()) + .setNanos(this.md.getIntervalWindow().getStartTime().getNano())) + .setEnd(Timestamp.newBuilder() + .setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond()) + .setNanos(this.md.getIntervalWindow().getEndTime().getNano())) + .setSlot("slot-0").build()); + responseBuilder.setEOF(false); + // set the result. + responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result + .newBuilder() + .setValue(ByteString.copyFrom(message.getValue())) + .addAllKeys(message.getKeys() + == null ? new ArrayList<>():Arrays.asList(message.getKeys())) + .addAllTags( + message.getTags() == null ? new ArrayList<>():List.of(message.getTags())) + .build()); + return new ActorResponse(responseBuilder.build()); } + private ActorResponse buildEOFActorResponse() { + ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder(); + responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder() + .setStart(Timestamp.newBuilder() + .setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond()) + .setNanos(this.md.getIntervalWindow().getStartTime().getNano())) + .setEnd(Timestamp.newBuilder() + .setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond()) + .setNanos(this.md.getIntervalWindow().getEndTime().getNano())) + .setSlot("slot-0").build()); + responseBuilder.setEOF(true); + // set a dummy result with the keys. + responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result + .newBuilder() + .addAllKeys(List.of(this.keys)) + .build()); + return new ActorResponse(responseBuilder.build()); + } } - diff --git a/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java b/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java index ee4c53f3..ec3b8750 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java @@ -22,7 +22,6 @@ /** * ReduceSupervisorActor actor distributes the messages to actors and handles failure. */ - @Slf4j class ReduceSupervisorActor extends AbstractActor { private final ReducerFactory reducerFactory; @@ -79,7 +78,7 @@ public void postStop() { public Receive createReceive() { return ReceiveBuilder .create() - .match(ReduceOuterClass.ReduceRequest.class, this::invokeActors) + .match(ActorRequest.class, this::invokeActors) .match(String.class, this::sendEOF) .match(ActorResponse.class, this::responseListener) .build(); @@ -90,19 +89,18 @@ public Receive createReceive() { if there is no actor for an incoming set of keys, create a new actor track all the child actors using actors map */ - private void invokeActors(ReduceOuterClass.ReduceRequest datumRequest) { - String[] keys = datumRequest.getKeysList().toArray(new String[0]); - String keyStr = String.join(Constants.DELIMITER, keys); - if (!actorsMap.containsKey(keyStr)) { + private void invokeActors(ActorRequest actorRequest) { + String[] keys = actorRequest.getKeySet(); + String uniqueId = actorRequest.getUniqueIdentifier(); + if (!actorsMap.containsKey(uniqueId)) { Reducer reduceHandler = reducerFactory.createReducer(); ActorRef actorRef = getContext() .actorOf(ReduceActor.props(keys, md, reduceHandler)); - - actorsMap.put(keyStr, actorRef); + actorsMap.put(uniqueId, actorRef); } - HandlerDatum handlerDatum = constructHandlerDatum(datumRequest); - actorsMap.get(keyStr).tell(handlerDatum, getSelf()); + HandlerDatum handlerDatum = constructHandlerDatum(actorRequest.getRequest().getPayload()); + actorsMap.get(uniqueId).tell(handlerDatum, getSelf()); } private void sendEOF(String EOF) { @@ -119,24 +117,25 @@ private void responseListener(ActorResponse actorResponse) { if there are no entries in the map, that means processing is done we can close the stream. */ - responseObserver.onNext(actorResponse.getResponse()); - actorsMap.remove(String.join(Constants.DELIMITER, actorResponse.getKeys())); - if (actorsMap.isEmpty()) { - responseObserver.onCompleted(); - getContext().getSystem().stop(getSelf()); + if (actorResponse.getResponse().getEOF()) { + actorsMap.remove(actorResponse.getUniqueIdentifier()); + if (actorsMap.isEmpty()) { + responseObserver.onCompleted(); + getContext().getSystem().stop(getSelf()); + } } } - private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest datumRequest) { + private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest.Payload payload) { return new HandlerDatum( - datumRequest.getValue().toByteArray(), + payload.getValue().toByteArray(), Instant.ofEpochSecond( - datumRequest.getWatermark().getSeconds(), - datumRequest.getWatermark().getNanos()), + payload.getWatermark().getSeconds(), + payload.getWatermark().getNanos()), Instant.ofEpochSecond( - datumRequest.getEventTime().getSeconds(), - datumRequest.getEventTime().getNanos()) + payload.getEventTime().getSeconds(), + payload.getEventTime().getNanos()) ); } diff --git a/src/main/java/io/numaproj/numaflow/reducer/Server.java b/src/main/java/io/numaproj/numaflow/reducer/Server.java index 164c50d1..769901bf 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/Server.java +++ b/src/main/java/io/numaproj/numaflow/reducer/Server.java @@ -67,7 +67,8 @@ public void start() throws Exception { server.start(); log.info( - "Server started, listening on socket path: " + grpcConfig.getSocketPath()); + "Server started, listening on socket path: " + + grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { diff --git a/src/main/java/io/numaproj/numaflow/reducer/Service.java b/src/main/java/io/numaproj/numaflow/reducer/Service.java index 37048679..c60a23e7 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/Service.java +++ b/src/main/java/io/numaproj/numaflow/reducer/Service.java @@ -89,12 +89,12 @@ public StreamObserver reduceFn(final StreamObser responseObserver)); - return new StreamObserver() { + return new StreamObserver<>() { @Override public void onNext(ReduceOuterClass.ReduceRequest datum) { // send the message to parent actor, which takes care of distribution. if (!supervisorActor.isTerminated()) { - supervisorActor.tell(datum, ActorRef.noSender()); + supervisorActor.tell(new ActorRequest(datum), ActorRef.noSender()); } else { responseObserver.onError(new Throwable("Supervisor actor was terminated")); } @@ -110,7 +110,6 @@ public void onError(Throwable throwable) { public void onCompleted() { // indicate the end of input to the supervisor supervisorActor.tell(Constants.EOF, ActorRef.noSender()); - } }; } diff --git a/src/main/proto/reduce/v1/reduce.proto b/src/main/proto/reduce/v1/reduce.proto index e1afc890..93a257b0 100644 --- a/src/main/proto/reduce/v1/reduce.proto +++ b/src/main/proto/reduce/v1/reduce.proto @@ -8,7 +8,7 @@ import "google/protobuf/empty.proto"; package reduce.v1; service Reduce { - // ReduceFn applies a reduce function to a request stream. + // ReduceFn applies a reduce function to a stream of reduce requests and sends reduce response back in a streaming fashion. rpc ReduceFn(stream ReduceRequest) returns (stream ReduceResponse); // IsReady is the heartbeat endpoint for gRPC. @@ -19,22 +19,59 @@ service Reduce { * ReduceRequest represents a request element. */ message ReduceRequest { - repeated string keys = 1; - bytes value = 2; - google.protobuf.Timestamp event_time = 3; - google.protobuf.Timestamp watermark = 4; + // WindowOperation represents a window operation. + // For Aligned windows, OPEN, APPEND and CLOSE events are sent. + message WindowOperation { + enum Event { + OPEN = 0; + CLOSE = 1; + APPEND = 4; + } + + Event event = 1; + repeated Window windows = 2; + } + + // Payload represents a payload element. + message Payload { + repeated string keys = 1; + bytes value = 2; + google.protobuf.Timestamp event_time = 3; + google.protobuf.Timestamp watermark = 4; + } + + Payload payload = 1; + WindowOperation operation = 2; +} + +// Window represents a window. +// Since the client doesn't track keys, window doesn't have a keys field. +message Window { + google.protobuf.Timestamp start = 1; + google.protobuf.Timestamp end = 2; + string slot = 3; } /** * ReduceResponse represents a response element. */ message ReduceResponse { + // FIXME: put all fields(window, EOF) inside of Result. Reference: https://protobuf.dev/programming-guides/api/#dont-include-primitive-types + // Result represents a result element. It contains the result of the reduce function. message Result { repeated string keys = 1; bytes value = 2; repeated string tags = 3; } - repeated Result results = 1; + + Result result = 1; + + // window represents a window to which the result belongs. + Window window = 2; + + // EOF represents the end of the response for a window. + // When it's set to true, the platform considers the response as an indicator which doesn't contain real data. + bool EOF = 3; } /** diff --git a/src/test/java/io/numaproj/numaflow/reducer/ReduceOutputStreamObserver.java b/src/test/java/io/numaproj/numaflow/reducer/ReduceOutputStreamObserver.java index 7184d111..f230689c 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ReduceOutputStreamObserver.java +++ b/src/test/java/io/numaproj/numaflow/reducer/ReduceOutputStreamObserver.java @@ -4,22 +4,25 @@ import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; +/** + * This is a dummy implementation of reduce output stream observer for testing purpose. + */ @Slf4j public class ReduceOutputStreamObserver implements StreamObserver { public AtomicReference completed = new AtomicReference<>(false); - public AtomicReference resultDatum = new AtomicReference<>( - ReduceOuterClass.ReduceResponse.newBuilder().build()); + public AtomicReference> resultDatum = new AtomicReference<>( + new ArrayList<>()); public Throwable t; @Override - public void onNext(ReduceOuterClass.ReduceResponse datum) { - resultDatum.set(resultDatum - .get() - .toBuilder() - .addAllResults(datum.getResultsList()) - .build()); + public void onNext(ReduceOuterClass.ReduceResponse response) { + List receivedResponses = resultDatum.get(); + receivedResponses.add(response); + resultDatum.set(receivedResponses); } @Override diff --git a/src/test/java/io/numaproj/numaflow/reducer/ReduceSupervisorActorTest.java b/src/test/java/io/numaproj/numaflow/reducer/ReduceSupervisorActorTest.java index ceda441f..e33cbd21 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ReduceSupervisorActorTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/ReduceSupervisorActorTest.java @@ -9,22 +9,21 @@ import org.junit.Test; import java.time.Instant; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class ReduceSupervisorActorTest { @Test - public void invokeSingleActor() throws RuntimeException { + public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then_onlyOneReducerActorGetsCreatedAndAggregatesAllRequests() throws RuntimeException { final ActorSystem actorSystem = ActorSystem.create("test-system-1"); CompletableFuture completableFuture = new CompletableFuture(); - String reduceKey = "reduce-key"; - ReduceOuterClass.ReduceRequest.Builder requestBuilder = ReduceOuterClass.ReduceRequest. - newBuilder().addKeys(reduceKey); - ActorRef shutdownActor = actorSystem .actorOf(ReduceShutdownActor .props(completableFuture)); @@ -39,24 +38,40 @@ public void invokeSingleActor() throws RuntimeException { .props(new TestReducerFactory(), md, shutdownActor, outputStreamObserver)); for (int i = 1; i <= 10; i++) { - ReduceOuterClass.ReduceRequest reduceRequest = requestBuilder - .addKeys("reduce-test") - .setValue(ByteString.copyFromUtf8(String.valueOf(i))) - .build(); + ActorRequest reduceRequest = new ActorRequest(ReduceOuterClass.ReduceRequest + .newBuilder() + .setPayload(ReduceOuterClass.ReduceRequest.Payload + .newBuilder() + // all reduce requests share same set of keys. + .addAllKeys(Arrays.asList("key-1", "key-2")) + .setValue(ByteString.copyFromUtf8(String.valueOf(i))) + .build()) + .build()); supervisor.tell(reduceRequest, ActorRef.noSender()); } - supervisor.tell(Constants.EOF, ActorRef.noSender()); try { completableFuture.get(); + // the observer should receive 2 messages, one is the aggregated result, the other is the EOF response. + assertEquals(2, outputStreamObserver.resultDatum.get().size()); + assertEquals("10", outputStreamObserver.resultDatum + .get() + .get(0) + .getResult() + .getValue() + .toStringUtf8()); + assertEquals(true, outputStreamObserver.resultDatum + .get() + .get(1) + .getEOF()); } catch (InterruptedException | ExecutionException e) { fail("Expected the future to complete without exception"); } } @Test - public void invokeMultipleActors() throws RuntimeException { + public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcasts_then_multipleReducerActorsHandleKeySetsSeparately() throws RuntimeException { final ActorSystem actorSystem = ActorSystem.create("test-system-2"); CompletableFuture completableFuture = new CompletableFuture(); @@ -67,26 +82,41 @@ public void invokeMultipleActors() throws RuntimeException { Metadata md = new MetadataImpl( new IntervalWindowImpl(Instant.now(), Instant.now())); + ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver(); ActorRef supervisor = actorSystem .actorOf(ReduceSupervisorActor .props( new TestReducerFactory(), md, shutdownActor, - new ReduceOutputStreamObserver())); + outputStreamObserver) + ); for (int i = 1; i <= 10; i++) { - ReduceOuterClass.ReduceRequest reduceRequest = ReduceOuterClass.ReduceRequest + ActorRequest reduceRequest = new ActorRequest(ReduceOuterClass.ReduceRequest .newBuilder() - .addKeys("reduce-test" + i) - .setValue(ByteString.copyFromUtf8(String.valueOf(i))) - .build(); + .setPayload(ReduceOuterClass.ReduceRequest.Payload + .newBuilder() + // each request contain a unique set of keys. + .addAllKeys(Arrays.asList("shared-key", "unique-key-" + i)) + .setValue(ByteString.copyFromUtf8(String.valueOf(i))) + .build()) + .build()); supervisor.tell(reduceRequest, ActorRef.noSender()); } supervisor.tell(Constants.EOF, ActorRef.noSender()); try { completableFuture.get(); + // each reduce request generates two reduce responses, one containing the data and the other one indicating EOF. + assertEquals(20, outputStreamObserver.resultDatum.get().size()); + for (int i = 0; i < 20; i++) { + ReduceOuterClass.ReduceResponse response = outputStreamObserver.resultDatum + .get() + .get(i); + assertTrue(response.getResult().getValue().toStringUtf8().equals("1") + || response.getEOF()); + } } catch (InterruptedException | ExecutionException e) { fail("Expected the future to complete without exception"); } diff --git a/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java index d97a5bda..2efe8259 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java @@ -21,14 +21,14 @@ import org.junit.Rule; import org.junit.Test; +import java.util.concurrent.atomic.AtomicReference; + import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_END_KEY; import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_START_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; public class ServerErrTest { - - public static final Metadata.Key DATUM_METADATA_WIN_START = io.grpc.Metadata.Key.of( WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER); @@ -42,7 +42,6 @@ public class ServerErrTest { @Before public void setUp() throws Exception { - ServerInterceptor interceptor = new ServerInterceptor() { @Override public ServerCall.Listener interceptCall( @@ -89,27 +88,32 @@ public void tearDown() throws Exception { } @Test - public void TestReducerErr() { - String reduceKey = "reduce-key"; - + public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowable() { Metadata metadata = new Metadata(); metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000"); metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000"); - //create an output stream observer + // create an output stream observer ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver(); + // we need to maintain a reference to any exceptions thrown inside the thread, otherwise even if the assertion failed in the thread, + // the test can still succeed. + AtomicReference exceptionInThread = new AtomicReference<>(); Thread t = new Thread(() -> { while (outputStreamObserver.t == null) { try { Thread.sleep(100); } catch (InterruptedException e) { - e.printStackTrace(); + exceptionInThread.set(e); } } - assertEquals( - "UNKNOWN: java.lang.RuntimeException: unknown exception", - outputStreamObserver.t.getMessage()); + try { + assertEquals( + "UNKNOWN: java.lang.RuntimeException: unknown exception", + outputStreamObserver.t.getMessage()); + } catch (Throwable e) { + exceptionInThread.set(e); + } }); t.start(); @@ -121,8 +125,11 @@ public void TestReducerErr() { for (int i = 1; i <= 10; i++) { ReduceOuterClass.ReduceRequest reduceRequest = ReduceOuterClass.ReduceRequest .newBuilder() - .setValue(ByteString.copyFromUtf8(String.valueOf(i))) - .addKeys(reduceKey) + .setPayload(ReduceOuterClass.ReduceRequest.Payload + .newBuilder() + .addKeys("reduce-key") + .setValue(ByteString.copyFromUtf8(String.valueOf(i))) + .build()) .build(); inputStreamObserver.onNext(reduceRequest); } @@ -132,7 +139,11 @@ public void TestReducerErr() { try { t.join(); } catch (InterruptedException e) { - fail("Thread interrupted"); + fail("Thread got interrupted before test assertion."); + } + // Fail the test if any exception caught in the thread + if (exceptionInThread.get() != null) { + fail("Assertion failed in the thread: " + exceptionInThread.get().getMessage()); } } } diff --git a/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java b/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java index b3544359..2abfc13a 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java @@ -21,9 +21,13 @@ import org.junit.Rule; import org.junit.Test; +import java.util.Arrays; +import java.util.List; + import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_END_KEY; import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_START_KEY; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class ServerTest { public static final Metadata.Key DATUM_METADATA_WIN_START = io.grpc.Metadata.Key.of( @@ -87,15 +91,14 @@ public void tearDown() throws Exception { } @Test - public void TestReducerWithOneKey() { + public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequestsGetAggregatedToOneResponse() { String reduceKey = "reduce-key"; - Metadata metadata = new Metadata(); metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000"); metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000"); - //create an output stream observer + // create an output stream observer ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver(); StreamObserver inputStreamObserver = ReduceGrpc @@ -105,8 +108,11 @@ public void TestReducerWithOneKey() { for (int i = 1; i <= 10; i++) { ReduceOuterClass.ReduceRequest request = ReduceOuterClass.ReduceRequest.newBuilder() - .setValue(ByteString.copyFromUtf8(String.valueOf(i))) - .addKeys(reduceKey) + .setPayload(ReduceOuterClass.ReduceRequest.Payload + .newBuilder() + .setValue(ByteString.copyFromUtf8(String.valueOf(i))) + .addAllKeys(Arrays.asList(reduceKey)) + .build()) .build(); inputStreamObserver.onNext(request); } @@ -118,29 +124,36 @@ public void TestReducerWithOneKey() { ByteString expectedValue = ByteString.copyFromUtf8(String.valueOf(55)); while (!outputStreamObserver.completed.get()) ; - assertEquals(1, outputStreamObserver.resultDatum.get().getResultsCount()); + // Expect 2 responses, one containing the aggregated data and the other indicating EOF. + assertEquals(2, outputStreamObserver.resultDatum.get().size()); assertEquals( expectedKeys, outputStreamObserver.resultDatum .get() - .getResults(0) + .get(0) + .getResult() .getKeysList() .toArray(new String[0])); assertEquals( expectedValue, - outputStreamObserver.resultDatum.get().getResults(0).getValue()); + outputStreamObserver.resultDatum + .get() + .get(0) + .getResult() + .getValue()); + assertTrue(outputStreamObserver.resultDatum.get().get(1).getEOF()); } @Test - public void TestReducerWithMultipleKey() { + public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then_requestsGetAggregatedSeparately() { String reduceKey = "reduce-key"; - int keyCount = 100; + int keyCount = 3; Metadata metadata = new Metadata(); metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000"); metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000"); - //create an output stream observer + // create an output stream observer ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver(); StreamObserver inputStreamObserver = ReduceGrpc @@ -148,15 +161,17 @@ public void TestReducerWithMultipleKey() { .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)) .reduceFn(outputStreamObserver); - // send messages with 100 different keys + // send messages with keyCount different keys for (int j = 0; j < keyCount; j++) { for (int i = 1; i <= 10; i++) { - ReduceOuterClass.ReduceRequest inputDatum = ReduceOuterClass.ReduceRequest + ReduceOuterClass.ReduceRequest request = ReduceOuterClass.ReduceRequest .newBuilder() - .addKeys(reduceKey + j) - .setValue(ByteString.copyFromUtf8(String.valueOf(i))) + .setPayload(ReduceOuterClass.ReduceRequest.Payload.newBuilder() + .addAllKeys(Arrays.asList(reduceKey + j)) + .setValue(ByteString.copyFromUtf8(String.valueOf(i))) + .build()) .build(); - inputStreamObserver.onNext(inputDatum); + inputStreamObserver.onNext(request); } } @@ -166,10 +181,11 @@ public void TestReducerWithMultipleKey() { ByteString expectedValue = ByteString.copyFromUtf8(String.valueOf(55)); while (!outputStreamObserver.completed.get()) ; - ReduceOuterClass.ReduceResponse result = outputStreamObserver.resultDatum.get(); - assertEquals(100, result.getResultsCount()); - for (int i = 0; i < keyCount; i++) { - assertEquals(expectedValue, result.getResults(0).getValue()); - } + List result = outputStreamObserver.resultDatum.get(); + // the outputStreamObserver should have observed 2*keyCount responses, because for each key set, one response for the aggregated result, the other for EOF. + assertEquals(keyCount * 2, result.size()); + result.forEach(response -> { + assertTrue(response.getResult().getValue().equals(expectedValue) || response.getEOF()); + }); } } diff --git a/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java b/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java index b0c41ce6..b3bc7e7d 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java @@ -22,11 +22,12 @@ public class ShutDownActorTest { @Test public void testFailure() { final ActorSystem actorSystem = ActorSystem.create("test-system-1"); - CompletableFuture completableFuture = new CompletableFuture(); + CompletableFuture completableFuture = new CompletableFuture<>(); String reduceKey = "reduce-key"; - ReduceOuterClass.ReduceRequest.Builder requestBuilder = ReduceOuterClass.ReduceRequest. - newBuilder().addKeys(reduceKey); + ReduceOuterClass.ReduceRequest.Payload.Builder payloadBuilder = ReduceOuterClass.ReduceRequest.Payload + .newBuilder() + .addKeys(reduceKey); ActorRef shutdownActor = actorSystem .actorOf(ReduceShutdownActor @@ -43,10 +44,12 @@ public void testFailure() { shutdownActor, new ReduceOutputStreamObserver())); - ReduceOuterClass.ReduceRequest reduceRequest = requestBuilder - .addKeys("reduce-test") - .setValue(ByteString.copyFromUtf8(String.valueOf(1))) - .build(); + ActorRequest reduceRequest = new ActorRequest(ReduceOuterClass.ReduceRequest.newBuilder() + .setPayload(payloadBuilder + .addKeys("reduce-test") + .setValue(ByteString.copyFromUtf8(String.valueOf(1))) + .build()) + .build()); supervisor.tell(reduceRequest, ActorRef.noSender()); try {