Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Jan 11, 2024
1 parent 1f20979 commit 07f977b
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ private void invokeActors(ReduceOuterClass.ReduceRequest reduceRequest) {
ReduceOuterClass.ReduceRequest.Payload payload = reduceRequest.getPayload();
String[] keys = payload.getKeysList().toArray(new String[0]);
// TODO - do we need to include window information in the keyStr?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
String keyStr = String.join(Constants.DELIMITER, keys);
if (!actorsMap.containsKey(keyStr)) {
Reducer reduceHandler = reducerFactory.createReducer();
Expand Down Expand Up @@ -122,6 +124,8 @@ private void responseListener(ActorResponse actorResponse) {

responseObserver.onNext(actorResponse.getResponse());
// TODO - do we need to include window information for aligned windows?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
actorsMap.remove(String.join(Constants.DELIMITER, actorResponse.getKeys()));
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
Expand Down
12 changes: 6 additions & 6 deletions src/test/java/io/numaproj/numaflow/reducer/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -27,6 +26,7 @@

import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_END_KEY;
import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_START_KEY;
import static org.junit.Assert.assertEquals;

public class ServerTest {
public static final Metadata.Key<String> DATUM_METADATA_WIN_START = io.grpc.Metadata.Key.of(
Expand Down Expand Up @@ -123,16 +123,16 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ
ByteString expectedValue = ByteString.copyFromUtf8(String.valueOf(55));
while (!outputStreamObserver.completed.get()) ;

Assert.assertEquals(1, outputStreamObserver.resultDatum.get().size());
Assert.assertEquals(
assertEquals(1, outputStreamObserver.resultDatum.get().size());
assertEquals(
expectedKeys,
outputStreamObserver.resultDatum
.get()
.get(0)
.getResult()
.getKeysList()
.toArray(new String[0]));
Assert.assertEquals(
assertEquals(
expectedValue,
outputStreamObserver.resultDatum
.get()
Expand Down Expand Up @@ -180,9 +180,9 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then
while (!outputStreamObserver.completed.get()) ;
List<ReduceOuterClass.ReduceResponse> result = outputStreamObserver.resultDatum.get();
// the outputStreamObserver should have observed keyCount responses, each of which has value 55.
Assert.assertEquals(keyCount, result.size());
assertEquals(keyCount, result.size());
result.forEach(response -> {
Assert.assertEquals(expectedValue, response.getResult().getValue());
assertEquals(expectedValue, response.getResult().getValue());
});
}
}

0 comments on commit 07f977b

Please sign in to comment.