Skip to content

Commit

Permalink
feat: bidirectional streaming source (#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
yhl25 authored Sep 22, 2024
1 parent 8aa30e9 commit 11e70d4
Show file tree
Hide file tree
Showing 38 changed files with 465 additions and 259 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package io.numaproj.numaflow.examples.mapstream.flatmapstream;

import io.numaproj.numaflow.mapstreamer.*;
import io.numaproj.numaflow.mapstreamer.Datum;
import io.numaproj.numaflow.mapstreamer.MapStreamer;
import io.numaproj.numaflow.mapstreamer.Message;
import io.numaproj.numaflow.mapstreamer.OutputObserver;
import io.numaproj.numaflow.mapstreamer.Server;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.numaproj.numaflow.sourcer.ReadRequest;
import io.numaproj.numaflow.sourcer.Server;
import io.numaproj.numaflow.sourcer.Sourcer;
import lombok.extern.slf4j.Slf4j;

import java.time.Instant;
import java.util.HashMap;
Expand All @@ -23,6 +24,7 @@
* acknowledges them when ack is called.
*/

@Slf4j
public class SimpleSource extends Sourcer {
private final Map<Long, Boolean> messages = new ConcurrentHashMap<>();
private long readIndex = 0;
Expand Down Expand Up @@ -70,16 +72,15 @@ public void read(ReadRequest request, OutputObserver observer) {

@Override
public void ack(AckRequest request) {
Long offset = Longs.fromByteArray(request.getOffset().getValue());
// remove the acknowledged messages from the map
for (Offset offset : request.getOffsets()) {
messages.remove(Longs.fromByteArray(offset.getValue()));
}
messages.remove(offset);
}

@Override
public long getPending() {
// pending messages will be zero for a simple source
return 0;
// number of messages not acknowledged yet
return messages.size();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.numaproj.numaflow.examples.map.flatmap.FlatMapFunction;
import io.numaproj.numaflow.examples.reduce.sum.SumFactory;
import io.numaproj.numaflow.examples.sink.simple.SimpleSink;
import io.numaproj.numaflow.examples.source.simple.SimpleSource;
import io.numaproj.numaflow.examples.sourcetransformer.eventtimefilter.EventTimeFilterFunction;
import io.numaproj.numaflow.mapper.MapperTestKit;
import io.numaproj.numaflow.mapper.Message;
Expand All @@ -14,7 +13,6 @@
import io.numaproj.numaflow.sinker.Response;
import io.numaproj.numaflow.sinker.ResponseList;
import io.numaproj.numaflow.sinker.SinkerTestKit;
import io.numaproj.numaflow.sourcer.SourcerTestKit;
import io.numaproj.numaflow.sourcetransformer.SourceTransformerTestKit;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
Expand All @@ -23,7 +21,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -202,41 +199,42 @@ public void testSinkServerInvocation() {
// we can add the logic to verify if the messages were
// successfully written to the sink(could be a file, database, etc.)
}

@Test
@Order(5)
public void testSourceServerInvocation() {
SimpleSource simpleSource = new SimpleSource();

SourcerTestKit sourcerTestKit = new SourcerTestKit(simpleSource);
try {
sourcerTestKit.startServer();
} catch (Exception e) {
Assertions.fail("Failed to start server");
}

// create a client to send requests to the server
SourcerTestKit.Client sourcerClient = new SourcerTestKit.Client();
// create a test observer to receive messages from the server
SourcerTestKit.TestListBasedObserver testObserver = new SourcerTestKit.TestListBasedObserver();
// create a read request with count 10 and timeout 1 second
SourcerTestKit.TestReadRequest testReadRequest = SourcerTestKit.TestReadRequest.builder()
.count(10).timeout(Duration.ofSeconds(1)).build();

try {
sourcerClient.sendReadRequest(testReadRequest, testObserver);
Assertions.assertEquals(10, testObserver.getMessages().size());
} catch (Exception e) {
Assertions.fail("Failed to send request to server");
}

try {
sourcerClient.close();
sourcerTestKit.stopServer();
} catch (InterruptedException e) {
Assertions.fail("Failed to stop server");
}
}
// FIXME: once tester kit changes are done for bidirectional streaming source
// @Ignore
// @Test
// @Order(5)
// public void testSourceServerInvocation() {
// SimpleSource simpleSource = new SimpleSource();
//
// SourcerTestKit sourcerTestKit = new SourcerTestKit(simpleSource);
// try {
// sourcerTestKit.startServer();
// } catch (Exception e) {
// Assertions.fail("Failed to start server");
// }
//
// // create a client to send requests to the server
// SourcerTestKit.Client sourcerClient = new SourcerTestKit.Client();
// // create a test observer to receive messages from the server
// SourcerTestKit.TestListBasedObserver testObserver = new SourcerTestKit.TestListBasedObserver();
// // create a read request with count 10 and timeout 1 second
// SourcerTestKit.TestReadRequest testReadRequest = SourcerTestKit.TestReadRequest.builder()
// .count(10).timeout(Duration.ofSeconds(1)).build();
//
// try {
// sourcerClient.sendReadRequest(testReadRequest, testObserver);
// Assertions.assertEquals(10, testObserver.getMessages().size());
// } catch (Exception e) {
// Assertions.fail("Failed to send request to server");
// }
//
// try {
// sourcerClient.close();
// sourcerTestKit.stopServer();
// } catch (InterruptedException e) {
// Assertions.fail("Failed to stop server");
// }
// }

@Test
@Order(6)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ public void test_ReadAndAck() {
for (Message message : testObserver.getMessages()) {
offsets.add(message.getOffset());
}
SourcerTestKit.TestAckRequest ackRequest = SourcerTestKit.TestAckRequest.builder()
.offsets(offsets).build();
simpleSource.ack(ackRequest);

for (Offset offset : offsets) {
SourcerTestKit.TestAckRequest ackRequest = SourcerTestKit.TestAckRequest.builder()
.offset(offset).build();
simpleSource.ack(ackRequest);
}

// Try reading 6 more messages
// Since the previous batch got acked, the data source should allow us to read more messages
Expand All @@ -54,9 +57,8 @@ public void test_ReadAndAck() {
@Test
public void testPending() {
SimpleSource simpleSource = new SimpleSource();
// simple source getPending always returns 0.
// since we haven't read any messages, the pending should be 0
Assertions.assertEquals(0, simpleSource.getPending());
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public BatchResponse(String id) {
* Appends a Message to the batch.
*
* @param msg the Message to be added to the batch
*
* @return the current BatchResponse instance for method chaining
*/
public BatchResponse append(Message msg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public BatchResponses() {
* Appends a BatchResponse to the list of batchResponses.
*
* @param batchResponse the BatchResponse to be added
*
* @return the current BatchResponses object
*/
public BatchResponses append(BatchResponse batchResponse) {
Expand Down
13 changes: 10 additions & 3 deletions src/main/java/io/numaproj/numaflow/batchmapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public void onNext(Batchmap.BatchMapRequest mapRequest) {
public void onError(Throwable throwable) {
// We close the stream and let the sender retry the messages
log.error("Error Encountered in batchMap Stream", throwable);
var status = Status.UNKNOWN.withDescription(throwable.getMessage()).withCause(throwable);
var status = Status.UNKNOWN
.withDescription(throwable.getMessage())
.withCause(throwable);
responseObserver.onError(status.asException());
}

Expand All @@ -79,7 +81,10 @@ public void onCompleted() {
datumStream.getCount());
// Crash if the number of responses from the users don't match the input requests ignoring the EOF message
if (responses.getItems().size() != datumStream.getCount() - 1) {
throw new RuntimeException("Number of results did not match expected " + (datumStream.getCount()-1) + " but got " + responses.getItems().size());
throw new RuntimeException("Number of results did not match expected " + (
datumStream.getCount() - 1) + " but got " + responses
.getItems()
.size());
}
buildAndStreamResponse(responses, responseObserver);
} catch (Exception e) {
Expand All @@ -90,7 +95,9 @@ public void onCompleted() {
};
}

private void buildAndStreamResponse(BatchResponses responses, StreamObserver<Batchmap.BatchMapResponse> responseObserver) {
private void buildAndStreamResponse(
BatchResponses responses,
StreamObserver<Batchmap.BatchMapResponse> responseObserver) {
responses.getItems().forEach(message -> {
List<Batchmap.BatchMapResponse.Result> batchMapResponseResult = new ArrayList<>();
message.getItems().forEach(res -> {
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/mapper/GRPCConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ static GRPCConfig defaultGrpcConfig() {
return GRPCConfig.newBuilder()
.infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH)
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE)
.isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running using numaflow
.isLocal(System.getenv("NUMAFLOW_POD")
== null) // if NUMAFLOW_POD is not set, then we are not running using numaflow
.socketPath(Constants.DEFAULT_SOCKET_PATH).build();
}
}
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/mapper/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public void start() throws Exception {

log.info(
"Server started, listening on {}",
grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ static GRPCConfig defaultGrpcConfig() {
return GRPCConfig.newBuilder()
.infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH)
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE)
.isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running using numaflow
.isLocal(System.getenv("NUMAFLOW_POD")
== null) // if NUMAFLOW_POD is not set, then we are not running using numaflow
.socketPath(Constants.DEFAULT_SOCKET_PATH).build();
}
}
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/reducer/GRPCConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ static GRPCConfig defaultGrpcConfig() {
return GRPCConfig.newBuilder()
.infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH)
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE)
.isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running using numaflow
.isLocal(System.getenv("NUMAFLOW_POD")
== null) // if NUMAFLOW_POD is not set, then we are not running using numaflow
.socketPath(Constants.DEFAULT_SOCKET_PATH).build();
}
}
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/reducer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public void start() throws Exception {

log.info(
"Server started, listening on {}",
grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ static GRPCConfig defaultGrpcConfig() {
return GRPCConfig.newBuilder()
.infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH)
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE)
.isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running using numaflow
.isLocal(System.getenv("NUMAFLOW_POD")
== null) // if NUMAFLOW_POD is not set, then we are not running using numaflow
.socketPath(Constants.DEFAULT_SOCKET_PATH).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public void start() throws Exception {

log.info(
"Server started, listening on {}",
grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ static GRPCConfig defaultGrpcConfig() {
return GRPCConfig.newBuilder()
.infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH)
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE)
.isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running using numaflow
.isLocal(System.getenv("NUMAFLOW_POD")
== null) // if NUMAFLOW_POD is not set, then we are not running using numaflow
.socketPath(Constants.DEFAULT_SOCKET_PATH).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public void start() throws Exception {

log.info(
"Server started, listening on {}",
grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
class SessionReducerActor extends AbstractActor {
private final SessionReducer sessionReducer;
private final OutputStreamObserver outputStream;
// the session window the actor is working on
private Sessionreduce.KeyedWindow keyedWindow;
private final OutputStreamObserver outputStream;
// when set to true, it means this session is already closed.
private boolean isClosed = false;

Expand Down
4 changes: 1 addition & 3 deletions src/main/java/io/numaproj/numaflow/sideinput/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ public class Constants {
static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/sideinput.sock";

static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sideinput-server-info";
static final String DEFAULT_HOST = "localhost";
static int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;

static int DEFAULT_PORT = 50051;

static final String DEFAULT_HOST = "localhost";
}
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/sideinput/GRPCConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ static GRPCConfig defaultGrpcConfig() {
return GRPCConfig.newBuilder()
.infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH)
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE)
.isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running using numaflow
.isLocal(System.getenv("NUMAFLOW_POD")
== null) // if NUMAFLOW_POD is not set, then we are not running using numaflow
.socketPath(Constants.DEFAULT_SOCKET_PATH).build();
}
}
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/sideinput/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public void start() throws Exception {

log.info(
"Server started, listening on {}",
grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ static GRPCConfig defaultGrpcConfig() {
return GRPCConfig.newBuilder()
.infoFilePath(infoFilePath)
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE)
.isLocal(containerType == null) // if ENV_UD_CONTAINER_TYPE is not set, then we are not running using numaflow
.isLocal(containerType
== null) // if ENV_UD_CONTAINER_TYPE is not set, then we are not running using numaflow
.socketPath(socketPath)
.build();
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/sinker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public void start() throws Exception {

log.info(
"Server started, listening on {}",
grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ public void onCompleted() {
.build() : Timestamp.newBuilder()
.setSeconds(datum.getWatermark().getEpochSecond())
.setNanos(datum.getWatermark().getNano()).build())
.putAllHeaders(datum.getHeaders() == null ? new HashMap<>() : datum.getHeaders())
.putAllHeaders(
datum.getHeaders() == null ? new HashMap<>() : datum.getHeaders())
.build();
requestObserver.onNext(request);
}
Expand Down
Loading

0 comments on commit 11e70d4

Please sign in to comment.