-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: rashar <[email protected]> Signed-off-by: RohanAshar <[email protected]> Co-authored-by: rashar <[email protected]> Co-authored-by: Keran Yang <[email protected]>
- Loading branch information
1 parent
3c2ec6f
commit 7997724
Showing
29 changed files
with
1,403 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
53 changes: 53 additions & 0 deletions
53
examples/src/main/java/io/numaproj/numaflow/examples/batchmap/flatmap/BatchFlatMap.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
package io.numaproj.numaflow.examples.batchmap.flatmap; | ||
|
||
import io.numaproj.numaflow.batchmapper.BatchMapper; | ||
import io.numaproj.numaflow.batchmapper.BatchResponse; | ||
import io.numaproj.numaflow.batchmapper.BatchResponses; | ||
import io.numaproj.numaflow.batchmapper.Datum; | ||
import io.numaproj.numaflow.batchmapper.DatumIterator; | ||
import io.numaproj.numaflow.batchmapper.Message; | ||
import io.numaproj.numaflow.batchmapper.Server; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
@Slf4j | ||
public class BatchFlatMap extends BatchMapper { | ||
@Override | ||
public BatchResponses processMessage(DatumIterator datumStream) { | ||
BatchResponses batchResponses = new BatchResponses(); | ||
while (true) { | ||
Datum datum = null; | ||
try { | ||
datum = datumStream.next(); | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
continue; | ||
} | ||
// null means the iterator is closed so we are good to break the loop. | ||
if (datum == null) { | ||
break; | ||
} | ||
try { | ||
String msg = new String(datum.getValue()); | ||
String[] strs = msg.split(","); | ||
BatchResponse batchResponse = new BatchResponse(datum.getId()); | ||
for (String str : strs) { | ||
batchResponse.append(new Message(str.getBytes())); | ||
} | ||
batchResponses.append(batchResponse); | ||
} catch (Exception e) { | ||
batchResponses.append(new BatchResponse(datum.getId())); | ||
} | ||
} | ||
return batchResponses; | ||
} | ||
|
||
public static void main(String[] args) throws Exception { | ||
Server server = new Server(new BatchFlatMap()); | ||
|
||
// Start the server | ||
server.start(); | ||
|
||
// wait for the server to shutdown | ||
server.awaitTermination(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
18 changes: 18 additions & 0 deletions
18
src/main/java/io/numaproj/numaflow/batchmapper/BatchMapper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package io.numaproj.numaflow.batchmapper; | ||
|
||
/** | ||
* BatchMapper exposes method for performing batch map operation. | ||
* Implementations should override the processMessage method | ||
* which will be used for processing the input messages | ||
*/ | ||
|
||
public abstract class BatchMapper { | ||
/** | ||
* method which will be used for processing messages. Please implement the interface to ensure that each message generates a corresponding BatchResponse object with a matching ID. | ||
* | ||
* @param datumStream current message to be processed | ||
* | ||
* @return BatchResponses which contains output from batch map | ||
*/ | ||
public abstract BatchResponses processMessage(DatumIterator datumStream); | ||
} |
45 changes: 45 additions & 0 deletions
45
src/main/java/io/numaproj/numaflow/batchmapper/BatchResponse.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package io.numaproj.numaflow.batchmapper; | ||
|
||
import lombok.Getter; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
/** | ||
* BatchResponse is used to collect and manage a batch of Message objects. | ||
*/ | ||
public class BatchResponse { | ||
@Getter | ||
private final String id; | ||
private final List<Message> messages; | ||
|
||
/** | ||
* Constructs a BatchResponse with a specified ID. | ||
* | ||
* @param id the unique identifier for this batch response | ||
*/ | ||
public BatchResponse(String id) { | ||
this.id = id; | ||
this.messages = new ArrayList<>(); | ||
} | ||
|
||
/** | ||
* Appends a Message to the batch. | ||
* | ||
* @param msg the Message to be added to the batch | ||
* @return the current BatchResponse instance for method chaining | ||
*/ | ||
public BatchResponse append(Message msg) { | ||
this.messages.add(msg); | ||
return this; | ||
} | ||
|
||
/** | ||
* Retrieves the list of Messages in the batch. | ||
* | ||
* @return the list of Messages | ||
*/ | ||
public List<Message> getItems() { | ||
return messages; | ||
} | ||
} |
39 changes: 39 additions & 0 deletions
39
src/main/java/io/numaproj/numaflow/batchmapper/BatchResponses.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package io.numaproj.numaflow.batchmapper; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
/** | ||
* BatchResponses is used to send a response from the batch map functions. | ||
* It contains a list of BatchResponse objects. | ||
*/ | ||
public class BatchResponses { | ||
private final List<BatchResponse> batchResponses; | ||
|
||
/** | ||
* Constructs an empty BatchResponses object. | ||
*/ | ||
public BatchResponses() { | ||
this.batchResponses = new ArrayList<>(); | ||
} | ||
|
||
/** | ||
* Appends a BatchResponse to the list of batchResponses. | ||
* | ||
* @param batchResponse the BatchResponse to be added | ||
* @return the current BatchResponses object | ||
*/ | ||
public BatchResponses append(BatchResponse batchResponse) { | ||
this.batchResponses.add(batchResponse); | ||
return this; | ||
} | ||
|
||
/** | ||
* Retrieves the list of BatchResponse objects. | ||
* | ||
* @return the list of BatchResponse objects | ||
*/ | ||
public List<BatchResponse> getItems() { | ||
return batchResponses; | ||
} | ||
} |
20 changes: 20 additions & 0 deletions
20
src/main/java/io/numaproj/numaflow/batchmapper/Constants.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
package io.numaproj.numaflow.batchmapper; | ||
|
||
class Constants { | ||
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64; | ||
|
||
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/batchmap.sock"; | ||
|
||
public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/mapper-server-info"; | ||
|
||
public static final int DEFAULT_PORT = 50051; | ||
|
||
public static final String DEFAULT_HOST = "localhost"; | ||
|
||
public static final String SUCCESS = "SUCCESS"; | ||
|
||
public static final String MAP_MODE_KEY = "MAP_MODE"; | ||
|
||
public static final String MAP_MODE = "batch-map"; | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package io.numaproj.numaflow.batchmapper; | ||
|
||
import java.time.Instant; | ||
import java.util.Map; | ||
|
||
/** | ||
* Datum contains methods to get the payload information. | ||
*/ | ||
public interface Datum { | ||
/** | ||
* method to get the payload keys | ||
* | ||
* @return returns the datum keys. | ||
*/ | ||
String[] getKeys(); | ||
|
||
/** | ||
* method to get the payload value | ||
* | ||
* @return returns the payload value in byte array | ||
*/ | ||
byte[] getValue(); | ||
|
||
/** | ||
* method to get the event time of the payload | ||
* | ||
* @return returns the event time of the payload | ||
*/ | ||
Instant getEventTime(); | ||
|
||
/** | ||
* method to get the watermark information | ||
* | ||
* @return returns the watermark | ||
*/ | ||
Instant getWatermark(); | ||
|
||
/** | ||
* method to get the ID for the Payload | ||
* | ||
* @return returns the ID | ||
*/ | ||
String getId(); | ||
|
||
/** | ||
* method to get the headers information of the payload | ||
* | ||
* @return returns the headers in the form of key value pair | ||
*/ | ||
Map<String, String> getHeaders(); | ||
} |
20 changes: 20 additions & 0 deletions
20
src/main/java/io/numaproj/numaflow/batchmapper/DatumIterator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
package io.numaproj.numaflow.batchmapper; | ||
|
||
|
||
/** | ||
* An iterator over a collection of {@link Datum} elements. | ||
* Passed to {@link BatchMapper#processMessage(DatumIterator)} method. | ||
*/ | ||
public interface DatumIterator { | ||
|
||
/** | ||
* Returns the next element in the iterator | ||
* This method blocks until an element becomes available in the queue. | ||
* When EOF_DATUM is received, this method will return null and the iterator will be closed. | ||
* | ||
* @return the next element in the iterator, null if EOF_DATUM is received or the iterator is already closed | ||
* | ||
* @throws InterruptedException if the thread is interrupted while waiting for the next element | ||
*/ | ||
Datum next() throws InterruptedException; | ||
} |
44 changes: 44 additions & 0 deletions
44
src/main/java/io/numaproj/numaflow/batchmapper/DatumIteratorImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
package io.numaproj.numaflow.batchmapper; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.LinkedBlockingDeque; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
/** | ||
* A thread-safe implementation of {@link DatumIterator}, backed by a blocking queue. | ||
*/ | ||
@Slf4j | ||
class DatumIteratorImpl implements DatumIterator { | ||
private final BlockingQueue<Datum> blockingQueue = new LinkedBlockingDeque<>(); | ||
private final AtomicBoolean closed = new AtomicBoolean(false); | ||
private final AtomicInteger counter = new AtomicInteger(0); // Keep Track of number of requests | ||
|
||
@Override | ||
public Datum next() throws InterruptedException { | ||
// if the iterator is closed, return null | ||
if (closed.get()) { | ||
return null; | ||
} | ||
Datum datum = blockingQueue.take(); | ||
// if EOF is received, close the iterator and return null | ||
if (datum == HandlerDatum.EOF_DATUM) { | ||
closed.set(true); | ||
return null; | ||
} | ||
return datum; | ||
} | ||
|
||
// blocking call, waits until the write operation is successful | ||
public void writeMessage(Datum datum) throws InterruptedException { | ||
blockingQueue.put(datum); | ||
counter.incrementAndGet(); | ||
} | ||
|
||
public int getCount() { | ||
return counter.get(); | ||
} | ||
|
||
} |
37 changes: 37 additions & 0 deletions
37
src/main/java/io/numaproj/numaflow/batchmapper/GRPCConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
package io.numaproj.numaflow.batchmapper; | ||
|
||
import lombok.Builder; | ||
import lombok.Getter; | ||
|
||
/** | ||
* GRPCConfig is used to provide configurations for map gRPC server. | ||
*/ | ||
@Getter | ||
@Builder(builderMethodName = "newBuilder") | ||
public class GRPCConfig { | ||
@Builder.Default | ||
private String socketPath = Constants.DEFAULT_SOCKET_PATH; | ||
|
||
@Builder.Default | ||
private int maxMessageSize = Constants.DEFAULT_MESSAGE_SIZE; | ||
|
||
@Builder.Default | ||
private String infoFilePath = Constants.DEFAULT_SERVER_INFO_FILE_PATH; | ||
|
||
@Builder.Default | ||
private int port = Constants.DEFAULT_PORT; | ||
|
||
private boolean isLocal; | ||
|
||
/** | ||
* Static method to create default GRPCConfig. | ||
*/ | ||
static GRPCConfig defaultGrpcConfig() { | ||
return GRPCConfig.newBuilder() | ||
.infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH) | ||
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) | ||
.isLocal(System.getenv("NUMAFLOW_POD") | ||
== null) // if NUMAFLOW_POD is not set, then we are not running using numaflow | ||
.socketPath(Constants.DEFAULT_SOCKET_PATH).build(); | ||
} | ||
} |
Oops, something went wrong.