Skip to content

Commit

Permalink
Use Google protobuf for grpc tests, until #260 is resolved.
Browse files Browse the repository at this point in the history
  • Loading branch information
rbair23 committed Aug 21, 2024
1 parent f330c4f commit 5f48b4b
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 299 deletions.
1 change: 1 addition & 0 deletions pbj-core/gradle/modules.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
com.github.spotbugs.annotations=com.github.spotbugs:spotbugs-annotations
com.google.protobuf=com.google.protobuf:protobuf-java
com.google.protobuf.util=com.google.protobuf:protobuf-java-util
org.antlr.antlr4.runtime=org.antlr:antlr4-runtime
com.hedera.node.hapi=com.hedera.hashgraph:hapi
9 changes: 9 additions & 0 deletions pbj-core/pbj-grpc-helidon/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,18 @@ testModuleInfo {
requires("io.helidon.webserver.http2")
requires("io.helidon.webclient.http2")
requires("com.hedera.node.hapi")
requires("com.google.protobuf.util")
requiresStatic("com.github.spotbugs.annotations")
}

tasks.named("compileJava") {
dependsOn(":pbj-runtime:jar")
}

protobuf {
// Configure the protoc executable
protoc {
// Download from repositories
artifact = "com.google.protobuf:protoc:3.21.10"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_ACCEPT_ENCODING;
import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_ENCODING;
import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_TIMEOUT;
import static com.hedera.pbj.runtime.ServiceInterface.RequestOptions.APPLICATION_GRPC;
import static com.hedera.pbj.runtime.ServiceInterface.RequestOptions.APPLICATION_GRPC_JSON;
import static com.hedera.pbj.runtime.ServiceInterface.RequestOptions.APPLICATION_GRPC_PROTO;
import static java.lang.System.Logger.Level.ERROR;
Expand All @@ -13,9 +14,12 @@
import com.hedera.pbj.runtime.io.buffer.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.helidon.common.buffers.BufferData;
import io.helidon.common.media.type.MediaType;
import io.helidon.common.media.type.MediaTypes;
import io.helidon.http.Header;
import io.helidon.http.HeaderNames;
import io.helidon.http.HeaderValues;
import io.helidon.http.HttpMediaType;
import io.helidon.http.HttpMediaTypes;
import io.helidon.http.Status;
import io.helidon.http.WritableHeaders;
Expand All @@ -39,6 +43,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.security.auth.callback.Callback;

/**
* Implementation of gRPC relying on PBJ. This class specifically contains the glue logic for bridging between
Expand All @@ -48,6 +53,11 @@
final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHandler {
private static final System.Logger LOGGER = System.getLogger(PbjProtocolHandler.class.getName());
private static final String IDENTITY = "identity";

private static final HttpMediaType APPLICATION_GRPC_MEDIA_TYPE = HttpMediaType.create(APPLICATION_GRPC);
private static final HttpMediaType APPLICATION_GRPC_PROTO_MEDIA_TYPE = HttpMediaType.create(APPLICATION_GRPC_PROTO);
private static final HttpMediaType APPLICATION_GRPC_JSON_MEDIA_TYPE = HttpMediaType.create(APPLICATION_GRPC_JSON);

private static final Header GRPC_ENCODING_IDENTITY = HeaderValues.createCached("grpc-encoding", IDENTITY);

private static final String GRPC_TIMEOUT_REGEX = "(\\d{1,8})([HMSmun])";
Expand Down Expand Up @@ -106,26 +116,6 @@ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHa
this.deadlineDetector = requireNonNull(deadlineDetector);
}

private ScheduledFuture<?> scheduleDeadline(final @NonNull String timeout) {
final var matcher = GRPC_TIMEOUT_PATTERN.matcher(timeout);
if (matcher.matches()) {
final var num = Integer.parseInt(matcher.group(0));
final var unit = matcher.group(1);
final var deadline = System.nanoTime() + num * switch (unit) {
case "H" -> 3600_000_000_000L;
case "M" -> 60_000_000_000L;
case "S" -> 1_000_000_000L;
case "m" -> 1_000_000L;
case "u" -> 1_000L;
case "n" -> 1L;
default -> throw new IllegalArgumentException("Invalid unit: " + unit);
};
return deadlineDetector.scheduleDeadline(deadline, this::close);
}

return new NoopScheduledFuture();
}

/**
* Called at the very beginning of the request, before any data has arrived. At this point we can look at the
* request headers and determine whether we have a valid request, and do any other initialization we need ot.
Expand All @@ -139,12 +129,13 @@ public void init() {
// See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
// In addition, "application/grpc" is interpreted as "application/grpc+proto".
final var httpHeaders = headers.httpHeaders();
final var ct = httpHeaders.contentType().orElse(HttpMediaTypes.PLAINTEXT_UTF_8).text();
final var contentTypeMediaType = httpHeaders.contentType().orElse(HttpMediaTypes.PLAINTEXT_UTF_8);
final var ct = contentTypeMediaType.text();
final var contentType = switch(ct) {
case "application/grpc", "application/grpc+proto" -> APPLICATION_GRPC_PROTO;
case "application/grpc+json" -> APPLICATION_GRPC_JSON;
case APPLICATION_GRPC, APPLICATION_GRPC_PROTO -> APPLICATION_GRPC_PROTO;
case APPLICATION_GRPC_JSON -> APPLICATION_GRPC_JSON;
default -> {
if (ct.startsWith("application/grpc")) {
if (ct.startsWith(APPLICATION_GRPC)) {
yield ct;
}
throw new FatalGrpcException(Status.UNSUPPORTED_MEDIA_TYPE_415);
Expand Down Expand Up @@ -190,7 +181,7 @@ public void init() {

// Send the headers to the client. This is the first thing we do, and it is done before any data is sent.
final var responseHeaders = WritableHeaders.create();
responseHeaders.set(HeaderNames.CONTENT_TYPE, contentType); // Respond with the same content type we received
responseHeaders.contentType(contentTypeMediaType);
responseHeaders.set(GRPC_ENCODING_IDENTITY);
responseHeaders.set(GrpcStatus.OK);
final var http2Headers = Http2Headers.create(responseHeaders);
Expand Down Expand Up @@ -335,7 +326,11 @@ private synchronized void close() {
deadlineFuture.cancel(false);
// If the deadline was canceled, then we have not yet responded to the client. So the response is OK. On the
// other hand, if th deadline was NOT canceled, then the deadline was exceeded.
responseHeaders.set(deadlineFuture.isCancelled() ? GrpcStatus.OK : GrpcStatus.DEADLINE_EXCEEDED);
// if (!deadlineFuture.isCancelled()) {
responseHeaders.set(GrpcStatus.OK);
// } else {
// responseHeaders.set(GrpcStatus.DEADLINE_EXCEEDED);
// }
final var http2Headers = Http2Headers.create(responseHeaders);
streamWriter.writeHeaders(http2Headers,
streamId,
Expand All @@ -344,6 +339,26 @@ private synchronized void close() {
currentStreamState = Http2StreamState.HALF_CLOSED_LOCAL;
}

private ScheduledFuture<?> scheduleDeadline(final @NonNull String timeout) {
final var matcher = GRPC_TIMEOUT_PATTERN.matcher(timeout);
if (matcher.matches()) {
final var num = Integer.parseInt(matcher.group(0));
final var unit = matcher.group(1);
final var deadline = System.nanoTime() + num * switch (unit) {
case "H" -> 3600_000_000_000L;
case "M" -> 60_000_000_000L;
case "S" -> 1_000_000_000L;
case "m" -> 1_000_000L;
case "u" -> 1_000L;
case "n" -> 1L;
default -> throw new IllegalArgumentException("Invalid unit: " + unit);
};
return deadlineDetector.scheduleDeadline(deadline, this::close);
}

return new NoopScheduledFuture();
}

private final class Callback implements ServiceInterface.ResponseCallback {
@Override
public void send(final @NonNull Bytes response) {
Expand Down
121 changes: 0 additions & 121 deletions pbj-core/pbj-grpc-helidon/src/test/java/pbj/ConsensusService.java

This file was deleted.

91 changes: 91 additions & 0 deletions pbj-core/pbj-grpc-helidon/src/test/java/pbj/GreeterService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package pbj;

import com.google.protobuf.util.JsonFormat;
import com.hedera.pbj.runtime.ServiceInterface;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
import greeter.HelloReply;
import greeter.HelloReplyOuterClass;
import greeter.HelloRequest;
import greeter.HelloRequestOuterClass;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;

/**
* This service doesn't rely on any PBJ objects, because the build right now doesn't have a good way to use the
* compiler. This will be fixed in a future release. So for now, we use Google's generated protobuf objects.
*/
public interface GreeterService extends ServiceInterface {
enum GreeterMethod implements Method {
sayHello,
sayHelloStreamReply,
sayHelloStreamRequest,
sayHelloStreamBidi
}

HelloReply sayHello(HelloRequest request);

@NonNull
default String serviceName() {
return "GreeterService";
}

@NonNull
default String fullName() {
return "greeter.GreeterService";
}

@NonNull
default List<Method> methods() {
return Arrays.asList(GreeterMethod.values());
}

@Override
default void open(
final @NonNull RequestOptions options,
final @NonNull Method method,
final @NonNull BlockingQueue<Bytes> messages,
final @NonNull ResponseCallback callback) {

final var m = (GreeterMethod) method;
Thread.ofVirtual().start(() -> {
try {
switch (m) {
case GreeterMethod.sayHello -> {
// Block waiting for the next message
final var message = messages.take();
// Parse the message into a HelloRequest
HelloRequest request;
if (options.isProtobuf()) {
request = HelloRequest.parseFrom(message.toByteArray());
} else if (options.isJson()) {
final var builder = HelloRequest.newBuilder();
JsonFormat.parser().merge(message.asUtf8String(), builder);
request = builder.build();
} else {
request = HelloRequest.newBuilder().setName(message.asUtf8String()).build();
}
// Call the service method
final var reply = sayHello(request);
// Convert the reply back into the appropriate format
Bytes replyBytes;
if (options.isProtobuf()) {
replyBytes = Bytes.wrap(reply.toByteArray());
} else if (options.isJson()) {
replyBytes = Bytes.wrap(JsonFormat.printer().print(reply));
} else {
replyBytes = Bytes.wrap(reply.getMessage().getBytes());
}
// Send back the reply and close the stream (unary).
callback.send(replyBytes);
callback.close();
}
}
} catch (Exception e) {
e.printStackTrace();
callback.close();
}
});
}
}
Loading

0 comments on commit 5f48b4b

Please sign in to comment.