Skip to content

Commit

Permalink
chore: adding partitions for user defined source (#88)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Dec 14, 2023
1 parent 06b90a0 commit 2daf34c
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.examples.source.simple;

import com.google.common.primitives.Longs;
import io.numaproj.numaflow.sourcer.AckRequest;
import io.numaproj.numaflow.sourcer.Message;
import io.numaproj.numaflow.sourcer.Offset;
Expand All @@ -8,8 +9,8 @@
import io.numaproj.numaflow.sourcer.Server;
import io.numaproj.numaflow.sourcer.Sourcer;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -41,9 +42,9 @@ public void read(ReadRequest request, OutputObserver observer) {
return;
}
// create a message with increasing offset
Offset offset = new Offset(ByteBuffer.allocate(4).putLong(readIndex).array(), "0");
Offset offset = new Offset(Longs.toByteArray(readIndex));
Message message = new Message(
ByteBuffer.allocate(4).putLong(readIndex).array(),
Long.toString(readIndex).getBytes(),
offset,
Instant.now());
// send the message to the observer
Expand All @@ -58,7 +59,7 @@ public void read(ReadRequest request, OutputObserver observer) {
public void ack(AckRequest request) {
// remove the acknowledged messages from the map
for (Offset offset : request.getOffsets()) {
messages.remove(ByteBuffer.wrap(offset.getValue()).getLong());
messages.remove(Longs.fromByteArray(offset.getValue()));
}
}

Expand All @@ -67,4 +68,9 @@ public long getPending() {
// pending messages will be zero for a simple source
return 0;
}

@Override
public List<Integer> getPartitions() {
return Sourcer.defaultPartitions();
}
}
1 change: 1 addition & 0 deletions src/main/java/io/numaproj/numaflow/sinker/Sinker.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public abstract class Sinker {
* response list should be returned.
*
* @param datumStream stream of messages to be processed
* @return response list
*/
public abstract ResponseList processMessages(DatumIterator datumStream);
}
14 changes: 12 additions & 2 deletions src/main/java/io/numaproj/numaflow/sourcer/Offset.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,26 @@
@Setter
public class Offset {
private final byte[] value;
private final String partitionId;
private final Integer partitionId;

/**
* used to create Offset with value and partitionId.
*
* @param value offset value
* @param partitionId offset partitionId
*/
public Offset(byte[] value, String partitionId) {
public Offset(byte[] value, Integer partitionId) {
this.value = value;
this.partitionId = partitionId;
}

/**
* used to create Offset with value and default partitionId.
*
* @param value offset value
*/
public Offset(byte[] value) {
this.value = value;
this.partitionId = Sourcer.defaultPartitions().get(0);
}
}
21 changes: 21 additions & 0 deletions src/main/java/io/numaproj/numaflow/sourcer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,25 @@ public void isReady(
responseObserver.onNext(SourceOuterClass.ReadyResponse.newBuilder().setReady(true).build());
responseObserver.onCompleted();
}

@Override
public void partitionsFn(
Empty request,
StreamObserver<SourceOuterClass.PartitionsResponse> responseObserver) {

if (this.sourcer == null) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(
getPendingFnMethod(),
responseObserver);
return;
}

List<Integer> partitions = this.sourcer.getPartitions();
responseObserver.onNext(SourceOuterClass.PartitionsResponse.newBuilder()
.setResult(
SourceOuterClass.PartitionsResponse.Result.newBuilder()
.addAllPartitions(partitions)).
build());
responseObserver.onCompleted();
}
}
27 changes: 27 additions & 0 deletions src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.numaproj.numaflow.sourcer;

import java.util.Collections;
import java.util.List;

/**
* Sourcer exposes method for reading messages from source.
* Implementations should override the read method which will be used
Expand Down Expand Up @@ -30,4 +33,28 @@ public abstract class Sourcer {
* @return number of pending messages
*/
public abstract long getPending();

/**
* method returns the partitions associated with the source, will be used by the platform to determine
* the partitions to which the watermark should be published. If the source doesn't have partitions,
* `defaultPartitions()` can be used to return the default partitions.
* In most cases, the defaultPartitions() should be enough; the cases where we need to implement custom getPartitions()
* is in a case like Kafka, where a reader can read from multiple Kafka partitions.
*
* @return list of partitions
*/
public abstract List<Integer> getPartitions();

/**
* method returns default partitions for the source.
* It can be used in the getPartitions() function of the Sourcer interface only
* if the source doesn't have partitions. DefaultPartition will be the pod replica
* index of the source.
*
* @return list of partitions
*/
public static List<Integer> defaultPartitions() {
String partition = System.getenv().getOrDefault("NUMAFLOW_REPLICA", "0");
return Collections.singletonList(Integer.parseInt(partition));
}
}
17 changes: 16 additions & 1 deletion src/main/proto/source/v1/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ service Source {
// PendingFn returns the number of pending records at the user defined source.
rpc PendingFn(google.protobuf.Empty) returns (PendingResponse);

// PartitionsFn returns the list of partitions for the user defined source.
rpc PartitionsFn(google.protobuf.Empty) returns (PartitionsResponse);

// IsReady is the heartbeat endpoint for user defined source gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}
Expand Down Expand Up @@ -120,6 +123,18 @@ message PendingResponse {
Result result = 1;
}

/*
* PartitionsResponse is the response for the partitions request.
*/
message PartitionsResponse {
message Result {
// Required field holding the list of partitions.
repeated int32 partitions = 1;
}
// Required field holding the result.
Result result = 1;
}

/*
* Offset is the offset of the datum.
*/
Expand All @@ -132,5 +147,5 @@ message Offset {
// Optional partition_id indicates which partition of the source the datum belongs to.
// It is useful for sources that have multiple partitions. e.g. Kafka.
// If the partition_id is not specified, it is assumed that the source has a single partition.
string partition_id = 2;
int32 partition_id = 2;
}
7 changes: 7 additions & 0 deletions src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.junit.Rule;
import org.junit.Test;

import java.util.List;

import static org.junit.Assert.assertEquals;

public class ServerErrTest {
Expand Down Expand Up @@ -124,6 +126,11 @@ public void ack(AckRequest request) {

}

@Override
public List<Integer> getPartitions() {
return Sourcer.defaultPartitions();
}

@Override
public long getPending() {
throw new RuntimeException("unknown exception");
Expand Down
11 changes: 10 additions & 1 deletion src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ public void TestSourcer() {
var pending = stub.pendingFn(Empty.newBuilder().build());
assertEquals(5, pending.getResult().getCount());

var partitions = stub.partitionsFn(Empty.newBuilder().build());
assertEquals(1, partitions.getResult().getPartitionsCount());
assertEquals(0, partitions.getResult().getPartitions(0));

// ack the 5 messages
var ackResponse = stub.ackFn(ackRequestBuilder.build());
assertEquals(Empty.newBuilder().build(), ackResponse.getResult().getSuccess());
Expand Down Expand Up @@ -137,7 +141,7 @@ public TestSourcer() {
for (int i = 0; i < 10; i++) {
messages.add(new Message(
ByteBuffer.allocate(4).putInt(i).array(),
new Offset(ByteBuffer.allocate(4).putInt(i).array(), "0"),
new Offset(ByteBuffer.allocate(4).putInt(i).array(), 0),
eventTime
));
eventTime = eventTime.plusMillis(1000L);
Expand All @@ -159,6 +163,11 @@ public void read(ReadRequest request, OutputObserver observer) {
}
}

@Override
public List<Integer> getPartitions() {
return Sourcer.defaultPartitions();
}

@Override
public void ack(AckRequest request) {
for (Offset offset : request.getOffsets()) {
Expand Down

0 comments on commit 2daf34c

Please sign in to comment.