-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: bidirectional streaming source #138
Conversation
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Assertions.fail("Failed to stop server"); | ||
} | ||
} | ||
// FIXME: once tester kit changes are done for bidirectional streaming source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you open an issue in numaflow repo to track this?
for (Offset offset : request.getOffsets()) { | ||
messages.remove(Longs.fromByteArray(offset.getValue())); | ||
} | ||
messages.remove(offset); | ||
} | ||
|
||
@Override | ||
public long getPending() { | ||
// pending messages will be zero for a simple source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the comment as well.
@Test | ||
public void testLong() { | ||
Long l = 1L; | ||
byte[] bytes = Longs.toByteArray(l); | ||
|
||
Long x = Longs.fromByteArray(bytes); | ||
Assertions.assertEquals(l, x); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to unit test a library. Please remove it.
import io.numaproj.numaflow.sourcer.Message; | ||
import io.numaproj.numaflow.sourcer.Offset; | ||
import io.numaproj.numaflow.sourcer.SourcerTestKit; | ||
import org.junit.Ignore; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this import being used?
.offsets(offsets).build(); | ||
simpleSource.ack(ackRequest); | ||
|
||
for (Offset offset : offsets) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please also update the testPending case.
/** | ||
* AckRequest request for acknowledging messages. | ||
*/ | ||
public interface AckRequest { | ||
/** | ||
* @return the list of offsets to be acknowledged | ||
* @return the offsets to be acknowledged |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @return the offsets to be acknowledged | |
* @return the offset to be acknowledged |
@Override | ||
public void onNext(SourceOuterClass.ReadRequest request) { | ||
// if the request is a handshake, send handshake response. | ||
if (request.hasHandshake() && request.getHandshake().getSot()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we unit test this branch? Same for ack func below.
future.complete(true); | ||
} | ||
}); | ||
StreamObserver<SourceOuterClass.ReadRequest> readRequestStreamObserver = sourceStub.readFn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can exclude SourceTestKit from codcov.
Signed-off-by: Yashash H L <[email protected]>
No description provided.