Skip to content

Commit

Permalink
chore: apply latest reducer proto to the reducer SDK (#90)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Jan 12, 2024
1 parent 615d49b commit d4e6d35
Show file tree
Hide file tree
Showing 13 changed files with 279 additions and 116 deletions.
28 changes: 28 additions & 0 deletions src/main/java/io/numaproj/numaflow/reducer/ActorRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.numaproj.numaflow.reducer;

import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* ActorRequest is to store the request sent to ReduceActors.
*/
@Getter
@AllArgsConstructor
class ActorRequest {
ReduceOuterClass.ReduceRequest request;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
// we will revisit this one later.
public String getUniqueIdentifier() {
return String.join(
Constants.DELIMITER,
this.getRequest().getPayload().getKeysList().toArray(new String[0]));
}

public String[] getKeySet() {
return this.getRequest().getPayload().getKeysList().toArray(new String[0]);
}
}
11 changes: 10 additions & 1 deletion src/main/java/io/numaproj/numaflow/reducer/ActorResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@
@Getter
@AllArgsConstructor
class ActorResponse {
String[] keys;
ReduceOuterClass.ReduceResponse response;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
// we will revisit this one later.
public String getUniqueIdentifier() {
return String.join(
Constants.DELIMITER,
this.getResponse().getResult().getKeysList().toArray(new String[0]));
}
}
3 changes: 0 additions & 3 deletions src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@

@AllArgsConstructor
class HandlerDatum implements Datum {

private byte[] value;
private Instant watermark;
private Instant eventTime;


@Override
public Instant getWatermark() {
return this.watermark;
Expand All @@ -27,5 +25,4 @@ public byte[] getValue() {
public Instant getEventTime() {
return this.eventTime;
}

}
60 changes: 45 additions & 15 deletions src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -19,7 +20,6 @@
@Slf4j
@AllArgsConstructor
class ReduceActor extends AbstractActor {

private String[] keys;
private Metadata md;
private Reducer groupBy;
Expand All @@ -44,23 +44,53 @@ private void invokeHandler(HandlerDatum handlerDatum) {
private void getResult(String eof) {
MessageList resultMessages = this.groupBy.getOutput(keys, md);
// send the result back to sender(parent actor)
getSender().tell(buildDatumListResponse(resultMessages), getSelf());
resultMessages.getMessages().forEach(message -> {
getSender().tell(buildActorResponse(message), getSelf());
});
// send a response back with EOF set to true, indicating the reducer has finished the data aggregation.
getSender().tell(buildEOFActorResponse(), getSelf());
}

private ActorResponse buildDatumListResponse(MessageList messageList) {
private ActorResponse buildActorResponse(Message message) {
ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
messageList.getMessages().forEach(message -> {
responseBuilder.addResults(ReduceOuterClass.ReduceResponse.Result.newBuilder()
.setValue(ByteString.copyFrom(message.getValue()))
.addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(
message.getKeys()))
.addAllTags(message.getTags() == null ? new ArrayList<>() : List.of(
message.getTags()))
.build());

});
return new ActorResponse(this.keys, responseBuilder.build());
// set the window using the actor metadata.
responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder()
.setStart(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getStartTime().getNano()))
.setEnd(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getEndTime().getNano()))
.setSlot("slot-0").build());
responseBuilder.setEOF(false);
// set the result.
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result
.newBuilder()
.setValue(ByteString.copyFrom(message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>():Arrays.asList(message.getKeys()))
.addAllTags(
message.getTags() == null ? new ArrayList<>():List.of(message.getTags()))
.build());
return new ActorResponse(responseBuilder.build());
}

private ActorResponse buildEOFActorResponse() {
ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder()
.setStart(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getStartTime().getNano()))
.setEnd(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getEndTime().getNano()))
.setSlot("slot-0").build());
responseBuilder.setEOF(true);
// set a dummy result with the keys.
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result
.newBuilder()
.addAllKeys(List.of(this.keys))
.build());
return new ActorResponse(responseBuilder.build());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
/**
* ReduceSupervisorActor actor distributes the messages to actors and handles failure.
*/

@Slf4j
class ReduceSupervisorActor extends AbstractActor {
private final ReducerFactory<? extends Reducer> reducerFactory;
Expand Down Expand Up @@ -79,7 +78,7 @@ public void postStop() {
public Receive createReceive() {
return ReceiveBuilder
.create()
.match(ReduceOuterClass.ReduceRequest.class, this::invokeActors)
.match(ActorRequest.class, this::invokeActors)
.match(String.class, this::sendEOF)
.match(ActorResponse.class, this::responseListener)
.build();
Expand All @@ -90,19 +89,18 @@ public Receive createReceive() {
if there is no actor for an incoming set of keys, create a new actor
track all the child actors using actors map
*/
private void invokeActors(ReduceOuterClass.ReduceRequest datumRequest) {
String[] keys = datumRequest.getKeysList().toArray(new String[0]);
String keyStr = String.join(Constants.DELIMITER, keys);
if (!actorsMap.containsKey(keyStr)) {
private void invokeActors(ActorRequest actorRequest) {
String[] keys = actorRequest.getKeySet();
String uniqueId = actorRequest.getUniqueIdentifier();
if (!actorsMap.containsKey(uniqueId)) {
Reducer reduceHandler = reducerFactory.createReducer();
ActorRef actorRef = getContext()
.actorOf(ReduceActor.props(keys, md, reduceHandler));

actorsMap.put(keyStr, actorRef);
actorsMap.put(uniqueId, actorRef);
}

HandlerDatum handlerDatum = constructHandlerDatum(datumRequest);
actorsMap.get(keyStr).tell(handlerDatum, getSelf());
HandlerDatum handlerDatum = constructHandlerDatum(actorRequest.getRequest().getPayload());
actorsMap.get(uniqueId).tell(handlerDatum, getSelf());
}

private void sendEOF(String EOF) {
Expand All @@ -119,24 +117,25 @@ private void responseListener(ActorResponse actorResponse) {
if there are no entries in the map, that means processing is
done we can close the stream.
*/

responseObserver.onNext(actorResponse.getResponse());
actorsMap.remove(String.join(Constants.DELIMITER, actorResponse.getKeys()));
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
if (actorResponse.getResponse().getEOF()) {
actorsMap.remove(actorResponse.getUniqueIdentifier());
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
}
}
}

private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest datumRequest) {
private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest.Payload payload) {
return new HandlerDatum(
datumRequest.getValue().toByteArray(),
payload.getValue().toByteArray(),
Instant.ofEpochSecond(
datumRequest.getWatermark().getSeconds(),
datumRequest.getWatermark().getNanos()),
payload.getWatermark().getSeconds(),
payload.getWatermark().getNanos()),
Instant.ofEpochSecond(
datumRequest.getEventTime().getSeconds(),
datumRequest.getEventTime().getNanos())
payload.getEventTime().getSeconds(),
payload.getEventTime().getNanos())
);
}

Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/reducer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public void start() throws Exception {
server.start();

log.info(
"Server started, listening on socket path: " + grpcConfig.getSocketPath());
"Server started, listening on socket path: "
+ grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/numaproj/numaflow/reducer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ public StreamObserver<ReduceOuterClass.ReduceRequest> reduceFn(final StreamObser
responseObserver));


return new StreamObserver<ReduceOuterClass.ReduceRequest>() {
return new StreamObserver<>() {
@Override
public void onNext(ReduceOuterClass.ReduceRequest datum) {
// send the message to parent actor, which takes care of distribution.
if (!supervisorActor.isTerminated()) {
supervisorActor.tell(datum, ActorRef.noSender());
supervisorActor.tell(new ActorRequest(datum), ActorRef.noSender());
} else {
responseObserver.onError(new Throwable("Supervisor actor was terminated"));
}
Expand All @@ -110,7 +110,6 @@ public void onError(Throwable throwable) {
public void onCompleted() {
// indicate the end of input to the supervisor
supervisorActor.tell(Constants.EOF, ActorRef.noSender());

}
};
}
Expand Down
49 changes: 43 additions & 6 deletions src/main/proto/reduce/v1/reduce.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import "google/protobuf/empty.proto";
package reduce.v1;

service Reduce {
// ReduceFn applies a reduce function to a request stream.
// ReduceFn applies a reduce function to a stream of reduce requests and sends reduce response back in a streaming fashion.
rpc ReduceFn(stream ReduceRequest) returns (stream ReduceResponse);

// IsReady is the heartbeat endpoint for gRPC.
Expand All @@ -19,22 +19,59 @@ service Reduce {
* ReduceRequest represents a request element.
*/
message ReduceRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
// WindowOperation represents a window operation.
// For Aligned windows, OPEN, APPEND and CLOSE events are sent.
message WindowOperation {
enum Event {
OPEN = 0;
CLOSE = 1;
APPEND = 4;
}

Event event = 1;
repeated Window windows = 2;
}

// Payload represents a payload element.
message Payload {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
}

Payload payload = 1;
WindowOperation operation = 2;
}

// Window represents a window.
// Since the client doesn't track keys, window doesn't have a keys field.
message Window {
google.protobuf.Timestamp start = 1;
google.protobuf.Timestamp end = 2;
string slot = 3;
}

/**
* ReduceResponse represents a response element.
*/
message ReduceResponse {
// FIXME: put all fields(window, EOF) inside of Result. Reference: https://protobuf.dev/programming-guides/api/#dont-include-primitive-types
// Result represents a result element. It contains the result of the reduce function.
message Result {
repeated string keys = 1;
bytes value = 2;
repeated string tags = 3;
}
repeated Result results = 1;

Result result = 1;

// window represents a window to which the result belongs.
Window window = 2;

// EOF represents the end of the response for a window.
// When it's set to true, the platform considers the response as an indicator which doesn't contain real data.
bool EOF = 3;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,25 @@
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

/**
* This is a dummy implementation of reduce output stream observer for testing purpose.
*/
@Slf4j
public class ReduceOutputStreamObserver implements StreamObserver<ReduceOuterClass.ReduceResponse> {
public AtomicReference<Boolean> completed = new AtomicReference<>(false);
public AtomicReference<ReduceOuterClass.ReduceResponse> resultDatum = new AtomicReference<>(
ReduceOuterClass.ReduceResponse.newBuilder().build());
public AtomicReference<List<ReduceOuterClass.ReduceResponse>> resultDatum = new AtomicReference<>(
new ArrayList<>());
public Throwable t;

@Override
public void onNext(ReduceOuterClass.ReduceResponse datum) {
resultDatum.set(resultDatum
.get()
.toBuilder()
.addAllResults(datum.getResultsList())
.build());
public void onNext(ReduceOuterClass.ReduceResponse response) {
List<ReduceOuterClass.ReduceResponse> receivedResponses = resultDatum.get();
receivedResponses.add(response);
resultDatum.set(receivedResponses);
}

@Override
Expand Down
Loading

0 comments on commit d4e6d35

Please sign in to comment.