diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/FatalGrpcException.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/FatalGrpcException.java new file mode 100644 index 00000000..15854069 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/FatalGrpcException.java @@ -0,0 +1,38 @@ +package com.hedera.pbj.grpc.helidon; + +import edu.umd.cs.findbugs.annotations.NonNull; +import io.helidon.http.Header; +import io.helidon.http.Status; +import io.helidon.http.WritableHeaders; +import io.helidon.http.http2.Http2Headers; +import java.util.function.Consumer; + +class FatalGrpcException extends Exception { + final Consumer> headerCallback; + + FatalGrpcException(final @NonNull Consumer> headerCallback) { + this.headerCallback = headerCallback; + } + + FatalGrpcException(final @NonNull Header grpcStatus) { + this(w -> { + w.set(Http2Headers.STATUS_NAME, Status.OK_200.code()); + w.set(grpcStatus); + }); + } + + FatalGrpcException(final @NonNull Status status) { + this(w -> w.set(Http2Headers.STATUS_NAME, status.code())); + } + + FatalGrpcException(final @NonNull Status status, final @NonNull Header grpcStatus) { + this(w -> { + w.set(Http2Headers.STATUS_NAME, status.code()); + w.set(grpcStatus); + }); + } + + final Consumer> headerCallback() { + return headerCallback; + } +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjConfig.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjConfig.java index 711108b0..ae555225 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjConfig.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjConfig.java @@ -84,6 +84,7 @@ abstract class BuilderBase, PROT private Config config; private int maxMessageSize = 10240; private int maxResponseBufferSize = 10240; + private int maxIncomingBufferedMessages = 10; private String name; /** @@ -101,6 +102,7 @@ protected BuilderBase() { public BUILDER from(PbjConfig prototype) { maxMessageSize(prototype.maxMessageSize()); maxResponseBufferSize(prototype.maxResponseBufferSize()); + maxIncomingBufferedMessages(prototype.maxIncomingBufferedMessages()); name(prototype.name()); return self(); } @@ -114,6 +116,7 @@ public BUILDER from(PbjConfig prototype) { public BUILDER from(BuilderBase builder) { maxMessageSize(builder.maxMessageSize()); maxResponseBufferSize(builder.maxResponseBufferSize()); + maxIncomingBufferedMessages(builder.maxIncomingBufferedMessages()); builder.name().ifPresent(this::name); return self(); } @@ -131,6 +134,7 @@ public BUILDER config(Config config) { this.config = config; config.get("max-message-size").as(Integer.class).ifPresent(this::maxMessageSize); config.get("max-response-buffer-size").as(Integer.class).ifPresent(this::maxResponseBufferSize); + config.get("max-incoming-buffered-messages").as(Integer.class).ifPresent(this::maxIncomingBufferedMessages); return self(); } @@ -160,6 +164,19 @@ public BUILDER maxResponseBufferSize(int maxResponseBufferSize) { return self(); } + /** + * The maximum number of messages to buffer coming from the client until we start applying back pressure. + * Defaults to {@value #DEFAULT_MAX_INCOMING_BUFFERED_MESSAGES}. + * + * @param maxIncomingBufferedMessages the maximum number of incoming messages to buffer + * @return updated builder instance + * @see #maxIncomingBufferedMessages() + */ + public BUILDER maxIncomingBufferedMessages(int maxIncomingBufferedMessages) { + this.maxIncomingBufferedMessages = maxIncomingBufferedMessages; + return self(); + } + /** * * @@ -193,6 +210,16 @@ public int maxResponseBufferSize() { return maxResponseBufferSize; } + /** + * The maximum number of messages to buffer coming from the client until we start applying back pressure. + * Defaults to {@value #DEFAULT_MAX_INCOMING_BUFFERED_MESSAGES}. + * + * @return the max incoming messages that can be buffered + */ + public int maxIncomingBufferedMessages() { + return maxIncomingBufferedMessages; + } + /** * * @@ -244,6 +271,7 @@ protected static class PbjConfigImpl implements PbjConfig { private final int maxMessageSize; private final int maxResponseBufferSize; + private final int maxIncomingBufferedMessages; private final String name; /** @@ -254,6 +282,7 @@ protected static class PbjConfigImpl implements PbjConfig { protected PbjConfigImpl(BuilderBase builder) { this.maxMessageSize = builder.maxMessageSize(); this.maxResponseBufferSize = builder.maxResponseBufferSize(); + this.maxIncomingBufferedMessages = builder.maxIncomingBufferedMessages(); this.name = builder.name().get(); } @@ -267,6 +296,11 @@ public int maxResponseBufferSize() { return maxResponseBufferSize; } + @Override + public int maxIncomingBufferedMessages() { + return maxIncomingBufferedMessages; + } + @Override public String name() { return name; @@ -277,6 +311,7 @@ public String toString() { return "PbjConfig{" + "maxMessageSize=" + maxMessageSize + "," + "maxResponseBufferSize=" + maxResponseBufferSize + "," + + "maxIncomingBufferedMessages=" + maxIncomingBufferedMessages + "," + "name=" + name + "}"; } @@ -291,12 +326,13 @@ public boolean equals(Object o) { } return maxMessageSize == other.maxMessageSize() && maxResponseBufferSize == other.maxResponseBufferSize() + && maxIncomingBufferedMessages == other.maxIncomingBufferedMessages() && Objects.equals(name, other.name()); } @Override public int hashCode() { - return Objects.hash(maxMessageSize, maxResponseBufferSize, name); + return Objects.hash(maxMessageSize, maxResponseBufferSize, maxIncomingBufferedMessages, name); } } diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjConfigBlueprint.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjConfigBlueprint.java index 02d6fc30..2ca3987b 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjConfigBlueprint.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjConfigBlueprint.java @@ -9,6 +9,13 @@ @Prototype.Configured @Prototype.Provides(ProtocolConfig.class) interface PbjConfigBlueprint extends ProtocolConfig { + /** + * Default maximum number of messages to buffer coming from the client until we start applying back pressure. + * + * @see #maxIncomingBufferedMessages() + */ + int DEFAULT_MAX_INCOMING_BUFFERED_MESSAGES = 10; + /** * Default maximum message size in bytes ({@value}). * @@ -51,4 +58,10 @@ interface PbjConfigBlueprint extends ProtocolConfig { default String type() { return PbjProtocolProvider.CONFIG_NAME; } + + /** + * The maximum number of messages to buffer coming from the client until we start applying back pressure. + * Defaults to {@value #DEFAULT_MAX_INCOMING_BUFFERED_MESSAGES}. + */ + int maxIncomingBufferedMessages(); } diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java index 2f5a6374..adafe78c 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java @@ -1,11 +1,14 @@ package com.hedera.pbj.grpc.helidon; +import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_ACCEPT_ENCODING; import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_ENCODING; import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_TIMEOUT; +import static com.hedera.pbj.runtime.ServiceInterface.RequestOptions.APPLICATION_GRPC_JSON; +import static com.hedera.pbj.runtime.ServiceInterface.RequestOptions.APPLICATION_GRPC_PROTO; import static java.lang.System.Logger.Level.ERROR; +import static java.lang.System.Logger.Level.WARNING; import static java.util.Objects.requireNonNull; -import com.hedera.pbj.grpc.helidon.encoding.Encoding; import com.hedera.pbj.runtime.ServiceInterface; import com.hedera.pbj.runtime.io.buffer.Bytes; import edu.umd.cs.findbugs.annotations.NonNull; @@ -13,21 +16,22 @@ import io.helidon.http.Header; import io.helidon.http.HeaderNames; import io.helidon.http.HeaderValues; -import io.helidon.http.HttpMediaType; -import io.helidon.http.HttpPrologue; +import io.helidon.http.HttpMediaTypes; +import io.helidon.http.Status; import io.helidon.http.WritableHeaders; +import io.helidon.http.http2.FlowControl; import io.helidon.http.http2.Http2Flag; import io.helidon.http.http2.Http2FrameData; import io.helidon.http.http2.Http2FrameHeader; import io.helidon.http.http2.Http2FrameTypes; import io.helidon.http.http2.Http2Headers; import io.helidon.http.http2.Http2RstStream; -import io.helidon.http.http2.Http2Settings; import io.helidon.http.http2.Http2StreamState; import io.helidon.http.http2.Http2StreamWriter; import io.helidon.http.http2.Http2WindowUpdate; import io.helidon.http.http2.StreamFlowControl; import io.helidon.webserver.http2.spi.Http2SubProtocolSelector; +import java.net.http.HttpHeaders; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -43,38 +47,37 @@ */ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHandler { private static final System.Logger LOGGER = System.getLogger(PbjProtocolHandler.class.getName()); - private static final Header GRPC_ENCODING_IDENTITY = HeaderValues.createCached("grpc-encoding", "identity"); - - private static final HttpMediaType APPLICATION_GRPC = HttpMediaType.create("application/grpc"); - private static final HttpMediaType APPLICATION_GRPC_PROTO = HttpMediaType.create("application/grpc+proto"); - private static final HttpMediaType APPLICATION_GRPC_JSON = HttpMediaType.create("application/grpc+json"); + private static final String IDENTITY = "identity"; + private static final Header GRPC_ENCODING_IDENTITY = HeaderValues.createCached("grpc-encoding", IDENTITY); private static final String GRPC_TIMEOUT_REGEX = "(\\d{1,8})([HMSmun])"; private static final Pattern GRPC_TIMEOUT_PATTERN = Pattern.compile(GRPC_TIMEOUT_REGEX); // Helidon-specific fields related to the connection itself - private final HttpPrologue prologue; private final Http2Headers headers; private final Http2StreamWriter streamWriter; private final int streamId; - private final Http2Settings serverSettings; - private final Http2Settings clientSettings; private final StreamFlowControl flowControl; private Http2StreamState currentStreamState; - /** The service method that this connection was created for */ + /** The configurations to use for this handler */ + private final PbjConfig config; + /** + * The service method that this connection was created for. The route has information about the + * {@link ServiceInterface} and method to invoke. + */ private final PbjMethodRoute route; /** - * If there is a timeout defined for the request, then this detected is used to determine when the timeout - * deadline has been met. + * If there is a timeout defined for the request, then this detector is used to determine when the timeout + * deadline has been met. The detector runs on a background thread/timer. */ private final DeadlineDetector deadlineDetector; - /** A future representing the background task detecting deadlines. */ - private ScheduledFuture deadlineFuture = new NoopScheduledFuture(); - /** Whether the next incoming message is compressed. */ - private boolean isCompressed; - /** The encoding as determined by the grpc-encoding header. Will not be null. */ - private Encoding encoding; + /** + * A future representing the background task detecting deadlines. If there is a deadline, then this future will + * represent the task that will be executed when the deadline is reached. If there is no deadline, then we default + * to a non-null no-op future that exists in the infinite future. + */ + private ScheduledFuture deadlineFuture; /** The current index into {@link #entityBytes} into which data is to be read. */ private int entityBytesIndex = 0; /** @@ -85,151 +88,134 @@ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHa private BlockingQueue incomingMessages; /** Create a new instance */ - PbjProtocolHandler(final @NonNull HttpPrologue prologue, + PbjProtocolHandler(final @NonNull PbjConfig config, final @NonNull Http2Headers headers, final @NonNull Http2StreamWriter streamWriter, final int streamId, - final @NonNull Http2Settings serverSettings, - final @NonNull Http2Settings clientSettings, final @NonNull StreamFlowControl flowControl, final @NonNull Http2StreamState currentStreamState, final @NonNull PbjMethodRoute route, final @NonNull DeadlineDetector deadlineDetector) { - - this.prologue = requireNonNull(prologue); + this.config = requireNonNull(config); this.headers = requireNonNull(headers); this.streamWriter = requireNonNull(streamWriter); this.streamId = streamId; - this.serverSettings = requireNonNull(serverSettings); - this.clientSettings = requireNonNull(clientSettings); this.flowControl = requireNonNull(flowControl); this.currentStreamState = requireNonNull(currentStreamState); this.route = requireNonNull(route); this.deadlineDetector = requireNonNull(deadlineDetector); } - @Override - public void init() { - try { - // If the grpc-timeout header is present, determine when that timeout would occur. - final var timeout = headers.httpHeaders().value(GRPC_TIMEOUT); - if (timeout.isPresent()) { - final var matcher = GRPC_TIMEOUT_PATTERN.matcher(timeout.get()); - if (matcher.matches()) { - final var num = Integer.parseInt(matcher.group(0)); - final var unit = matcher.group(1); - final var deadline = System.nanoTime() + num * switch (unit) { - case "H" -> 3600_000_000_000L; - case "M" -> 60_000_000_000L; - case "S" -> 1_000_000_000L; - case "m" -> 1_000_000L; - case "u" -> 1_000L; - case "n" -> 1L; - default -> throw new IllegalArgumentException("Invalid unit: " + unit); - }; - deadlineFuture = deadlineDetector.scheduleDeadline(deadline, () -> { - close(GrpcStatus.DEADLINE_EXCEEDED); - }); - } - } - - // Get the encoding to use. We always use one, even if it is just "identity". This implementation currently - // only supports receiving compressed / encoded messages, it always responds with "identity" messages. - // This could be modified in the future, there is no reason not to support compression. - final var encodingHeader = headers.httpHeaders().value(GRPC_ENCODING).orElse("identity"); - encoding = switch (encodingHeader) { - case "identity" -> Encoding.IDENTITY; - case "gzip" -> Encoding.GZIP; - default -> throw new IllegalArgumentException("Unsupported encoding: " + encodingHeader); + private ScheduledFuture scheduleDeadline(final @NonNull String timeout) { + final var matcher = GRPC_TIMEOUT_PATTERN.matcher(timeout); + if (matcher.matches()) { + final var num = Integer.parseInt(matcher.group(0)); + final var unit = matcher.group(1); + final var deadline = System.nanoTime() + num * switch (unit) { + case "H" -> 3600_000_000_000L; + case "M" -> 60_000_000_000L; + case "S" -> 1_000_000_000L; + case "m" -> 1_000_000L; + case "u" -> 1_000L; + case "n" -> 1L; + default -> throw new IllegalArgumentException("Invalid unit: " + unit); }; + return deadlineDetector.scheduleDeadline(deadline, this::close); + } - // We know the content type has been set and starts with "application/grpc", otherwise this handler would - // not have been called. But we don't know whether it is "application/grpc" or "application/grpc+proto" or - // "application/grpc+json". Normalize "application/grpc" to "application/grpc+proto", and otherwise just - // pass whatever the content type is along to the service handler. Maybe it will support something - final var contentType = headers.httpHeaders().contentType().orElseThrow(); - final var normalizedContentType = contentType.equals(APPLICATION_GRPC) ? APPLICATION_GRPC_PROTO : contentType; - final var contentSubType = normalizedContentType.subtype(); - final var contentTypeExt = contentSubType.substring(contentSubType.indexOf('+') + 1); - - // todo Extract any custom metadata to pass along as well - - // Create the "options" to make available to the service handler. These options are used by the service - // handler to decide on the best way to parse or handle the request. - final var options = new ServiceInterface.RequestOptions() { - @Override - public boolean isProtobuf() { - return contentTypeExt.equals(ServiceInterface.RequestOptions.APPLICATION_GRPC_PROTO); - } - - @Override - public boolean isJson() { - return contentTypeExt.equals(ServiceInterface.RequestOptions.APPLICATION_GRPC_JSON); - } + return new NoopScheduledFuture(); + } - @Override - public String contentType() { - return contentTypeExt; + /** + * Called at the very beginning of the request, before any data has arrived. At this point we can look at the + * request headers and determine whether we have a valid request, and do any other initialization we need ot. + */ + @Override + public void init() { + try { + // If Content-Type does not begin with "application/grpc", gRPC servers SHOULD respond with HTTP status of + // 415 (Unsupported Media Type). This will prevent other HTTP/2 clients from interpreting a gRPC error + // response, which uses status 200 (OK), as successful. + // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md + // In addition, "application/grpc" is interpreted as "application/grpc+proto". + final var httpHeaders = headers.httpHeaders(); + final var ct = httpHeaders.contentType().orElse(HttpMediaTypes.PLAINTEXT_UTF_8).text(); + final var contentType = switch(ct) { + case "application/grpc", "application/grpc+proto" -> APPLICATION_GRPC_PROTO; + case "application/grpc+json" -> APPLICATION_GRPC_JSON; + default -> { + if (ct.startsWith("application/grpc")) { + yield ct; + } + throw new FatalGrpcException(Status.UNSUPPORTED_MEDIA_TYPE_415); } }; - incomingMessages = new ArrayBlockingQueue<>(10); // TODO Take from config - route.service().open(options, route.method(), incomingMessages, new ServiceInterface.ResponseCallback() { - @Override - public void start() { - // todo ignoring headers, just sending required response headers - WritableHeaders writable = WritableHeaders.create(); - writable.set(HeaderNames.CONTENT_TYPE, normalizedContentType.text()); // Respond with the same content type we received - writable.set(GRPC_ENCODING_IDENTITY); - - Http2Headers http2Headers = Http2Headers.create(writable); - http2Headers.status(io.helidon.http.Status.OK_200); - streamWriter.writeHeaders(http2Headers, - streamId, - Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS), - flowControl.outbound()); - } - - @Override - public void send(Bytes response) { - try { - final int length = (int) response.length(); - BufferData bufferData = BufferData.create(5 + length); - bufferData.write(0); - bufferData.writeUnsignedInt32(length); - bufferData.write(response.toByteArray()); - - // todo flags based on method type - // end flag should be sent when last message is sent (or just rst stream if we cannot determine this) - - Http2FrameHeader header = Http2FrameHeader.create(bufferData.available(), - Http2FrameTypes.DATA, - Http2Flag.DataFlags.create(0), - streamId); - - streamWriter.writeData(new Http2FrameData(header, bufferData), flowControl.outbound()); - } catch (Exception e) { - LOGGER.log(ERROR, "Failed to respond to grpc request: " + route.method(), e); - } - - } + // This implementation currently only supports "identity" and "gzip" compression. This implementation + // only supports receiving compressed / encoded messages, it always responds with "identity" messages. + // This could be extended in the future. Ideally we'd respond with the same compression that was sent to us. + // + // As per the documentation: + // If a client message is compressed by an algorithm that is not supported by a server, the message will + // result in an UNIMPLEMENTED error status on the server. The server will include a grpc-accept-encoding + // header [in] the response which specifies the algorithms that the server accepts. + final var encodingHeader = httpHeaders.value(GRPC_ENCODING).orElse(IDENTITY); + if (!IDENTITY.equals(encodingHeader)) { + throw new FatalGrpcException(h -> { + h.set(Http2Headers.STATUS_NAME, Status.OK_200.code()); + h.set(GrpcStatus.UNIMPLEMENTED); + h.set(GRPC_ACCEPT_ENCODING, IDENTITY); + }); + } - @Override - public void close() { - // If the deadline has not already been reached, then go ahead and cancel it. - deadlineFuture.cancel(false); - // If the deadline was not canceled, then it means it was already done before we got here, - // which means a separate close has already happened, so we cannot close again. - if (!deadlineFuture.isCancelled()) { - PbjProtocolHandler.this.close(GrpcStatus.OK); - } - } - }); - } catch (Throwable e) { - LOGGER.log(ERROR, "Failed to initialize grpc protocol handler", e); - throw e; + // If the grpc-timeout header is present, determine when that timeout would occur, or default to a future + // that is so far in the future it will never happen. + final var timeout = httpHeaders.value(GRPC_TIMEOUT); + deadlineFuture = timeout.isPresent() + ? scheduleDeadline(timeout.get()) + : new NoopScheduledFuture(); + + // Future: Should we support custom metadata, we would extract it here and pass it along via "options". + + // Create the "options" to make available to the ServiceInterface. These options are used to decide on the + // best way to parse or handle the request. + final var options = new Options( + contentType.equals(APPLICATION_GRPC_PROTO), + contentType.equals(APPLICATION_GRPC_JSON), + contentType); + + // Call the ServiceInterface to let it know of the new connection. + incomingMessages = new ArrayBlockingQueue<>(config.maxIncomingBufferedMessages()); + route.service().open(options, route.method(), incomingMessages, new Callback()); + + // Send the headers to the client. This is the first thing we do, and it is done before any data is sent. + final var responseHeaders = WritableHeaders.create(); + responseHeaders.set(HeaderNames.CONTENT_TYPE, contentType); // Respond with the same content type we received + responseHeaders.set(GRPC_ENCODING_IDENTITY); + responseHeaders.set(GrpcStatus.OK); + final var http2Headers = Http2Headers.create(responseHeaders); + http2Headers.status(io.helidon.http.Status.OK_200); + streamWriter.writeHeaders(http2Headers, + streamId, + Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS), + flowControl.outbound()); + } catch (final FatalGrpcException grpcException) { + // The request was bad. We need to respond with the appropriate status code and close the stream. + // This doesn't involve any logging (but could involve some metrics, so we track the number of failed + // requests). + // FUTURE Increment a metric counter for failed requests and of different types of failures + final WritableHeaders writable = WritableHeaders.create(); + grpcException.headerCallback().accept(writable); + final var http2Headers = Http2Headers.create(writable); + streamWriter.writeHeaders(http2Headers, + streamId, + Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM), + FlowControl.Outbound.NOOP); + currentStreamState = Http2StreamState.HALF_CLOSED_LOCAL; + } catch (final Exception unknown) { + LOGGER.log(ERROR, "Failed to initialize grpc protocol handler", unknown); + throw new RuntimeException(unknown); } - } @Override @@ -239,12 +225,12 @@ public Http2StreamState streamState() { @Override public void rstStream(Http2RstStream rstStream) { -// listener.onComplete(); + // Nothing to do } @Override public void windowUpdate(Http2WindowUpdate update) { - + // Nothing to do } /** @@ -259,7 +245,7 @@ public void data(Http2FrameHeader header, BufferData data) { // First chunk of data contains the compression flag and the length of the message if (entityBytes == null) { // Read whether this message is compressed. We do not currently support compression. - isCompressed = (data.read() == 1); + final var isCompressed = (data.read() == 1); if (isCompressed) { // TODO Proper logging and error handling throw new IllegalArgumentException("Compression is not supported"); @@ -269,7 +255,7 @@ public void data(Http2FrameHeader header, BufferData data) { // where the attacker sends us a very large length and exhausts our memory, we have a maximum // message size configuration setting. Using that, we can detect attempts to exhaust our memory. final long length = data.readUnsignedInt32(); - if (length > PbjConfigBlueprint.DEFAULT_MAX_MESSAGE_SIZE) { // TODO Needs proper config + if (length > PbjConfigBlueprint.DEFAULT_MAX_MESSAGE_SIZE) { // TODO Proper logging and error handling throw new IllegalArgumentException("Message size exceeds maximum allowed size: " + length + " > " + PbjConfigBlueprint.DEFAULT_MAX_MESSAGE_SIZE); @@ -306,19 +292,51 @@ public void data(Http2FrameHeader header, BufferData data) { if (header.flags(Http2FrameTypes.DATA).endOfStream()) { entityBytesIndex = 0; entityBytes = null; -// listener.onHalfClose(); currentStreamState = Http2StreamState.HALF_CLOSED_LOCAL; } - } catch (Exception e) { + } catch (final InterruptedException ie) { + // Thrown when the thread is interrupted while waiting to put a message into the queue. We'll log this + // but continue the next time around the loop. + LOGGER.log(WARNING, "Interrupted while waiting to put message into queue", ie); + Thread.currentThread().interrupt(); + } catch (final Exception e) { LOGGER.log(ERROR, "Failed to process grpc request: " + data.debugDataHex(true), e); } } - private void close(Header status) { - WritableHeaders writable = WritableHeaders.create(); - writable.set(status); + /** + * Sends the given message to the client + * @param response The bytes to send. + */ + public void onSendMessage(final @NonNull Bytes response) { + try { + final int length = (int) response.length(); + final var bufferData = BufferData.create(5 + length); + bufferData.write(0); // 0 means no compression + bufferData.writeUnsignedInt32(length); + bufferData.write(response.toByteArray()); + + final var header = Http2FrameHeader.create(bufferData.available(), + Http2FrameTypes.DATA, + Http2Flag.DataFlags.create(0), + streamId); + + streamWriter.writeData(new Http2FrameData(header, bufferData), flowControl.outbound()); + } catch (final Exception e) { + LOGGER.log(ERROR, "Failed to respond to grpc request: " + route.method(), e); + } + } - Http2Headers http2Headers = Http2Headers.create(writable); + private synchronized void close() { + final var responseHeaders = WritableHeaders.create(); + // Canceling a future that has already completed has no effect. So by canceling here, we are saying: + // "If you have not yet executed, never execute. If you have already executed, then just ignore me". + // The "isCancelled" flag is set if the future was canceled before it was executed. + deadlineFuture.cancel(false); + // If the deadline was canceled, then we have not yet responded to the client. So the response is OK. On the + // other hand, if th deadline was NOT canceled, then the deadline was exceeded. + responseHeaders.set(deadlineFuture.isCancelled() ? GrpcStatus.OK : GrpcStatus.DEADLINE_EXCEEDED); + final var http2Headers = Http2Headers.create(responseHeaders); streamWriter.writeHeaders(http2Headers, streamId, Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM), @@ -326,15 +344,41 @@ private void close(Header status) { currentStreamState = Http2StreamState.HALF_CLOSED_LOCAL; } + private final class Callback implements ServiceInterface.ResponseCallback { + @Override + public void send(final @NonNull Bytes response) { + PbjProtocolHandler.this.onSendMessage(response); + } + + @Override + public void close() { + PbjProtocolHandler.this.close(); + } + } + + /** + * Simple implementation of the {@link ServiceInterface.RequestOptions} interface. + */ + private record Options(boolean isProtobuf, boolean isJson, String contentType) + implements ServiceInterface.RequestOptions { + } + + /** + * A {@link ScheduledFuture} that does nothing. This is used when there is no deadline set for the request. + * A new instance of this must be created (or we need a "reset" method) for each {@link PbjProtocolHandler} + * instance, because it can become "corrupted" if canceled from any particular call. + */ private static final class NoopScheduledFuture extends CompletableFuture implements ScheduledFuture { @Override - public long getDelay(TimeUnit unit) { + public long getDelay(final @NonNull TimeUnit unit) { return 0; } @Override - public int compareTo(Delayed o) { - return 0; + public int compareTo(final @NonNull Delayed o) { + // Since all NoopScheduledFuture instances have "0" as the delay, any other Delayed instance with a non-0 + // delay will come after this one. + return (int) (o.getDelay(TimeUnit.NANOSECONDS)); } } } diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolProvider.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolProvider.java index 8a3aa1ae..db333ff5 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolProvider.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolProvider.java @@ -1,5 +1,6 @@ package com.hedera.pbj.grpc.helidon; +import edu.umd.cs.findbugs.annotations.NonNull; import io.helidon.webserver.ProtocolConfigs; import io.helidon.webserver.http2.spi.Http2SubProtocolProvider; import io.helidon.webserver.http2.spi.Http2SubProtocolSelector; @@ -13,24 +14,29 @@ public class PbjProtocolProvider implements Http2SubProtocolProvider /** * Default constructor required by Java {@link java.util.ServiceLoader}. * - * @deprecated please do not use directly outside of testing, this is reserved for Java {@link java.util.ServiceLoader} + * @deprecated please do not use directly outside of testing, this is reserved for Java + * {@link java.util.ServiceLoader} */ @Deprecated public PbjProtocolProvider() { + // requires deprecated annotation so as to avoid accidental use } @Override + @NonNull public String protocolType() { return CONFIG_NAME; } @Override + @NonNull public Class protocolConfigType() { return PbjConfig.class; } @Override - public Http2SubProtocolSelector create(PbjConfig config, ProtocolConfigs configs) { - return new PbjProtocolSelector(); + @NonNull + public Http2SubProtocolSelector create(final @NonNull PbjConfig config, final @NonNull ProtocolConfigs configs) { + return new PbjProtocolSelector(config); } } \ No newline at end of file diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolSelector.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolSelector.java index 13224dd4..278fba4f 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolSelector.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolSelector.java @@ -2,7 +2,9 @@ import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_ACCEPT_ENCODING; import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_ENCODING; +import static java.util.Objects.requireNonNull; +import edu.umd.cs.findbugs.annotations.NonNull; import io.helidon.http.HeaderNames; import io.helidon.http.HttpPrologue; import io.helidon.http.Method; @@ -27,12 +29,14 @@ */ public class PbjProtocolSelector implements Http2SubProtocolSelector { private final DeadlineDetector deadlineDetector; + private final PbjConfig config; /** * Create a new PBJ based grpc protocol selector (default). Access restricted to be package-private so as * to limit instantiation to the {@link PbjProtocolProvider}. */ - PbjProtocolSelector() { + PbjProtocolSelector(final @NonNull PbjConfig config) { + this.config = requireNonNull(config); deadlineDetector = new DeadlineDetector() { private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); @@ -53,8 +57,8 @@ public SubProtocolResult subProtocol(ConnectionContext ctx, Http2Headers headers, Http2StreamWriter streamWriter, int streamId, - Http2Settings serverSettings, - Http2Settings clientSettings, + Http2Settings serverSettings, // unused + Http2Settings clientSettings, // unused StreamFlowControl flowControl, Http2StreamState currentStreamState, Router router) { @@ -65,22 +69,10 @@ public SubProtocolResult subProtocol(ConnectionContext ctx, return NOT_SUPPORTED; } - // If Content-Type does not begin with "application/grpc", gRPC servers SHOULD respond with HTTP status of - // 415 (Unsupported Media Type). This will prevent other HTTP/2 clients from interpreting a gRPC error - // response, which uses status 200 (OK), as successful. - // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md - final var httpHeaders = headers.httpHeaders(); - final var contentType = httpHeaders.value(HeaderNames.CONTENT_TYPE).orElse(""); - if (!contentType.startsWith("application/grpc")) { - return new SubProtocolResult(true, - new PbjErrorProtocolHandler(streamWriter, streamId, currentStreamState, h -> - h.set(Http2Headers.STATUS_NAME, Status.UNSUPPORTED_MEDIA_TYPE_415.code()))); - } - // Look up the route based on the path. If that route does not exist, we return a 200 OK response with // a gRPC status of NOT_FOUND. - PbjRouting routing = router.routing(PbjRouting.class, PbjRouting.EMPTY); - PbjMethodRoute route = routing.findRoute(prologue); + final var routing = router.routing(PbjRouting.class, PbjRouting.EMPTY); + final var route = routing.findRoute(prologue); if (route == null) { return new SubProtocolResult(true, new PbjErrorProtocolHandler(streamWriter, streamId, currentStreamState, h -> { @@ -89,27 +81,12 @@ public SubProtocolResult subProtocol(ConnectionContext ctx, })); } - // This implementation currently only supports "identity" and "gzip" compression. As per the documentation: - // If a client message is compressed by an algorithm that is not supported by a server, the message will result - // in an UNIMPLEMENTED error status on the server. The server will include a grpc-accept-encoding header [in] - // the response which specifies the algorithms that the server accepts. - final var encoding = httpHeaders.value(GRPC_ENCODING).orElse("identity"); - if (!"identity".equals(encoding) && !"gzip".equals(encoding)) { - return new SubProtocolResult(true, - new PbjErrorProtocolHandler(streamWriter, streamId, currentStreamState, h -> { - h.set(GrpcStatus.UNIMPLEMENTED); - h.set(GRPC_ACCEPT_ENCODING, "identity,gzip"); - })); - } - // This looks like a valid call! We will return a new PbjProtocolHandler to handle the request. return new SubProtocolResult(true, - new PbjProtocolHandler(prologue, + new PbjProtocolHandler(config, headers, streamWriter, streamId, - serverSettings, - clientSettings, flowControl, currentStreamState, route, diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/Encoding.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/Encoding.java deleted file mode 100644 index 8c96289d..00000000 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/Encoding.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.hedera.pbj.grpc.helidon.encoding; - -public interface Encoding { - GzipEncoding GZIP = new GzipEncoding(); - IdentityEncoding IDENTITY = new IdentityEncoding(); - - byte[] decode(byte[] data) throws Exception; -} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/GzipEncoding.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/GzipEncoding.java deleted file mode 100644 index af0e004e..00000000 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/GzipEncoding.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.hedera.pbj.grpc.helidon.encoding; - -import java.io.ByteArrayInputStream; -import java.util.zip.GZIPInputStream; - -public class GzipEncoding implements Encoding { - @Override - public byte[] decode(byte[] data) throws Exception { - return new GZIPInputStream(new ByteArrayInputStream(data)).readAllBytes(); - } -} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/IdentityEncoding.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/IdentityEncoding.java deleted file mode 100644 index 59e17663..00000000 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/IdentityEncoding.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.hedera.pbj.grpc.helidon.encoding; - -public class IdentityEncoding implements Encoding { - @Override - public byte[] decode(byte[] data) { - return data; - } -} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/module-info.java b/pbj-core/pbj-grpc-helidon/src/main/java/module-info.java index e7a469d8..bf33fcfa 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/module-info.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/module-info.java @@ -4,6 +4,7 @@ requires com.hedera.pbj.runtime; requires io.helidon.webserver; requires io.helidon.webserver.http2; + requires java.net.http; exports com.hedera.pbj.grpc.helidon; diff --git a/pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/helidon/config-metadata.json b/pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/helidon/config-metadata.json index 172fa629..c9337d5b 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/helidon/config-metadata.json +++ b/pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/helidon/config-metadata.json @@ -1 +1,27 @@ -[{"module":"com.hedera.pbj.grpc.helidon","types":[{"annotatedType":"com.hedera.pbj.grpc.helidon.PbjConfig","type":"com.hedera.pbj.grpc.helidon.PbjConfig","producers":["com.hedera.pbj.grpc.helidon.PbjConfig#create(io.helidon.common.config.Config)","com.hedera.pbj.grpc.helidon.PbjConfig#builder()"],"provides":["io.helidon.webserver.spi.ProtocolConfig"],"options":[{"defaultValue":"10240","description":"Maximum size of any message in bytes.\n Defaults to {@value #DEFAULT_MAX_MESSAGE_SIZE}.\n\n @return the maximum number of bytes a single message can be","key":"max-message-size","method":"com.hedera.pbj.grpc.helidon.PbjConfig.Builder#maxMessageSize(int)","type":"java.lang.Integer"},{"defaultValue":"10240","description":"Maximum size of the response buffer in bytes.\n Defaults to {@value #DEFAULT_MAX_RESPONSE_BUFFER_SIZE}.\n\n @return the maximum number of bytes a response can be","key":"max-response-buffer-size","method":"com.hedera.pbj.grpc.helidon.PbjConfig.Builder#maxResponseBufferSize(int)","type":"java.lang.Integer"}]}]}] \ No newline at end of file +[{"module":"com.hedera.pbj.grpc.helidon", + "types":[{ + "annotatedType":"com.hedera.pbj.grpc.helidon.PbjConfig", + "type":"com.hedera.pbj.grpc.helidon.PbjConfig", + "producers":["com.hedera.pbj.grpc.helidon.PbjConfig#create(io.helidon.common.config.Config)","com.hedera.pbj.grpc.helidon.PbjConfig#builder()"], + "provides":["io.helidon.webserver.spi.ProtocolConfig"], + "options":[{ + "defaultValue":"10240", + "description":"Maximum size of any message in bytes.\n Defaults to {@value #DEFAULT_MAX_MESSAGE_SIZE}.\n\n @return the maximum number of bytes a single message can be", + "key":"max-message-size", + "method":"com.hedera.pbj.grpc.helidon.PbjConfig.Builder#maxMessageSize(int)", + "type":"java.lang.Integer" + },{ + "defaultValue":"10240", + "description":"Maximum size of the response buffer in bytes.\n Defaults to {@value #DEFAULT_MAX_RESPONSE_BUFFER_SIZE}.\n\n @return the maximum number of bytes a response can be", + "key":"max-response-buffer-size", + "method":"com.hedera.pbj.grpc.helidon.PbjConfig.Builder#maxResponseBufferSize(int)", + "type":"java.lang.Integer" + },{ + "defaultValue":"10", + "description":"The maximum number of messages to buffer coming from the client until we start applying back pressure.", + "key":"max-incoming-buffered-messages", + "method":"com.hedera.pbj.grpc.helidon.PbjConfig.Builder#maxIncomingBufferedMessages(int)", + "type":"java.lang.Integer" + }] + }] +}] \ No newline at end of file diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/http/HttpTest.java b/pbj-core/pbj-grpc-helidon/src/test/java/http/HttpTest.java deleted file mode 100644 index 81db59cf..00000000 --- a/pbj-core/pbj-grpc-helidon/src/test/java/http/HttpTest.java +++ /dev/null @@ -1,23 +0,0 @@ -package http; - -import io.helidon.webclient.api.WebClient; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import org.junit.jupiter.api.Test; - -public class HttpTest { - @Test - void simpleHttpCall() throws InterruptedException { - final var pool = Executors.newFixedThreadPool(100); - final var latch = new CountDownLatch(1000); - for (int i=0; i<1000; i++) { - pool.submit(() -> { - final var client = WebClient.builder().baseUri("http://localhost:8080").build(); - System.out.println(client.get().path("/greet").request().as(String.class)); - latch.countDown(); - }); - } - - latch.await(); - } -} diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/pbj/ConsensusService.java b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/ConsensusService.java index 0f3d342b..07b5b1a9 100644 --- a/pbj-core/pbj-grpc-helidon/src/test/java/pbj/ConsensusService.java +++ b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/ConsensusService.java @@ -6,6 +6,7 @@ import com.hedera.hapi.node.transaction.TransactionResponse; import com.hedera.pbj.runtime.ServiceInterface; import com.hedera.pbj.runtime.io.buffer.Bytes; +import edu.umd.cs.findbugs.annotations.NonNull; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -24,14 +25,17 @@ enum ConsensusMethod implements Method { TransactionResponse submitMessage(Transaction tx); Response getTopicInfo(Query q); + @NonNull default String serviceName() { return "ConsensusService"; } + @NonNull default String fullName() { return "proto.ConsensusService"; } + @NonNull default List methods() { return List.of( ConsensusMethod.createTopic, @@ -43,10 +47,10 @@ default List methods() { @Override default void open( - final /*@NonNull*/ RequestOptions options, - final /*@NonNull*/ Method method, - final /*@NonNull*/ BlockingQueue messages, - final /*@NonNull*/ ResponseCallback callback) { + final @NonNull RequestOptions options, + final @NonNull Method method, + final @NonNull BlockingQueue messages, + final @NonNull ResponseCallback callback) { final var m = (ConsensusMethod) method; Thread.ofVirtual().start(() -> { @@ -55,7 +59,6 @@ default void open( case ConsensusMethod.createTopic -> { // Unary method final var message = messages.take(); - callback.start(); final var messageBytes = options.isProtobuf() // What if it isn't JSON or PROTOBUF? ? Transaction.PROTOBUF.parse(message) : Transaction.JSON.parse(message); @@ -67,7 +70,6 @@ default void open( case ConsensusMethod.updateTopic -> { // Unary method final var message = messages.take(); - callback.start(); final var messageBytes = options.isProtobuf() ? Transaction.PROTOBUF.parse(message) : Transaction.JSON.parse(message); @@ -79,7 +81,6 @@ default void open( case ConsensusMethod.deleteTopic -> { // Unary method final var message = messages.take(); - callback.start(); final var messageBytes = options.isProtobuf() ? Transaction.PROTOBUF.parse(message) : Transaction.JSON.parse(message); @@ -89,9 +90,8 @@ default void open( callback.close(); } case ConsensusMethod.submitMessage -> { - // Unary method + // Unary method. final var message = messages.take(); - callback.start(); final var messageBytes = options.isProtobuf() ? Transaction.PROTOBUF.parse(message) : Transaction.JSON.parse(message); @@ -103,7 +103,6 @@ default void open( case ConsensusMethod.getTopicInfo -> { // Unary method final var message = messages.take(); - callback.start(); final var messageBytes = options.isProtobuf() ? Query.PROTOBUF.parse(message) : Query.JSON.parse(message); @@ -113,7 +112,7 @@ default void open( callback.close(); } } - } catch (Exception e) { + } catch (Throwable e) { e.printStackTrace(); callback.close(); } diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/pbj/PbjTest.java b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/PbjTest.java index e069c5df..0e85edeb 100644 --- a/pbj-core/pbj-grpc-helidon/src/test/java/pbj/PbjTest.java +++ b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/PbjTest.java @@ -2,21 +2,27 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.hedera.hapi.node.base.ResponseCodeEnum; import com.hedera.hapi.node.base.Transaction; import com.hedera.hapi.node.transaction.Query; import com.hedera.hapi.node.transaction.Response; import com.hedera.hapi.node.transaction.TransactionResponse; import com.hedera.pbj.grpc.helidon.GrpcStatus; import com.hedera.pbj.grpc.helidon.PbjRouting; +import com.hedera.pbj.runtime.ParseException; import com.hedera.pbj.runtime.io.buffer.Bytes; import com.hedera.pbj.runtime.io.stream.ReadableStreamingData; import com.hedera.pbj.runtime.io.stream.WritableStreamingData; import io.helidon.common.media.type.MediaType; +import io.helidon.common.media.type.MediaTypes; +import io.helidon.http.HeaderNames; import io.helidon.http.HttpMediaType; import io.helidon.http.Method; import io.helidon.webclient.api.WebClient; import io.helidon.webserver.WebServer; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -29,13 +35,20 @@ class PbjTest { private static final MediaType APPLICATION_RANDOM = HttpMediaType.create("application/random"); private static WebClient CLIENT; private static final String SUBMIT_MESSAGE_PATH = "/proto.ConsensusService/submitMessage"; + private static final String ECHO_PATH = "/proto.TestService/echo"; + private static final TransactionResponse SUCCESSFUL_TRANSACTION_RESPONSE = TransactionResponse.newBuilder() + .cost(100) + .nodeTransactionPrecheckCode(ResponseCodeEnum.SUCCESS) + .build(); @BeforeAll static void setup() { // Set up the server WebServer.builder() .port(8080) - .addRouting(PbjRouting.builder().service(new ConsensusServiceImpl())) + .addRouting(PbjRouting.builder() + .service(new ConsensusServiceImpl()) + .service(new CustomServiceImpl())) .build() .start(); @@ -53,11 +66,9 @@ static void setup() { // this functionality is strongly discouraged. gRPC does not go out of its way to break users that are using this // kind of override, but we do not actively support it, and some functionality (e.g., service config support) will // not work when the path is not of the form shown above. - // - // TESTS: - // - Verify the path is case-sensitive /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /** Verify the path is case-sensitive */ @Test void badCaseOnPathIsNotFound() { try (var response = CLIENT.post() @@ -76,11 +87,9 @@ void badCaseOnPathIsNotFound() { // SPEC: // // Only POST can be used for gRPC calls. - // - // TESTS: - // - Verify that only POST is supported /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /** Verify that only POST is supported */ @ParameterizedTest @ValueSource(strings = { "GET", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD", "TRACE" }) void mustUsePost(final String methodName) { @@ -106,15 +115,10 @@ void mustUsePost(final String methodName) { // uses status 200 (OK), as successful. // // TESTS: - // - Verify that the server responds with 415 when the Content-Type is not specified - // - Verify that the server responds with 415 when the Content-Type is not "application/grpc" - // - Verify that both "application/grpc+proto" and "application/grpc+json" are accepted - // - Verify that if "application/grpc" is used, it defaults to "application/grpc+proto" - // - Verify that the response is encoded as JSON when "application/grpc+json" is used - // - Verify that the response is encoded as protobuf when "application/grpc+proto" or "application/grpc" is used - // - Verify that a custom encoding "application/grpc+custom" can work + // - /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /** Verify that the server responds with 415 when the Content-Type is not specified */ @Test void contentTypeMustBeSet() { try (var response = CLIENT.post() @@ -126,6 +130,7 @@ void contentTypeMustBeSet() { } } + /** Verify that the server responds with 415 when the Content-Type does not start with "application/grpc" */ @Test void contentTypeMustStartWithApplicationGrpc() { try (var response = CLIENT.post() @@ -138,23 +143,62 @@ void contentTypeMustStartWithApplicationGrpc() { } } + /** Verify that "application/grpc+json" requests are accepted */ @Test - void contentTypeCanBeJSON() { + void contentTypeCanBeJSON() throws ParseException { try (var response = CLIENT.post() .protocolId("h2") .path(SUBMIT_MESSAGE_PATH) .contentType(APPLICATION_GRPC_JSON) .submit(messageBytesJson(Transaction.DEFAULT))) { - // TODO Assert that the response is also encoded as JSON + // TODO Verify the response is valid JSON + assertThat(response.status().code()).isEqualTo(200); + assertThat(response.headers().contentType().orElseThrow().text()) + .isEqualTo("application/grpc+json"); + + final var tx = decodeTransactionResponse(new ReadableStreamingData(response.inputStream())); + assertThat(tx).isEqualTo(SUCCESSFUL_TRANSACTION_RESPONSE); + } + } + + /** Verify that "application/grpc+proto" and "application/grpc" both support protobuf encoding */ + @ParameterizedTest + @ValueSource(strings = { "application/grpc+proto", "application/grpc" }) + void contentTypeCanBeProtobuf(final String contentType) throws ParseException { + try (var response = CLIENT.post() + .protocolId("h2") + .path(SUBMIT_MESSAGE_PATH) + .contentType(MediaTypes.create(contentType)) + .submit(messageBytes(Transaction.DEFAULT))) { + assertThat(response.status().code()).isEqualTo(200); - assertThat(response.headers().contentType().orElseThrow().text()).isEqualTo("application/grpc+json"); + assertThat(response.headers().contentType().orElseThrow().text()) + .isEqualTo("application/grpc+proto"); + + final var tx = decodeTransactionResponse(new ReadableStreamingData(response.inputStream())); + assertThat(tx).isEqualTo(SUCCESSFUL_TRANSACTION_RESPONSE); } } + /** Verify that a custom suffix of the content type is supported */ @Test - void contentTypeWithoutProtoDefaultsToProto() { + void contentTypeCanBeCustom() throws IOException { + try (var response = CLIENT.post() + .protocolId("h2") + .path(ECHO_PATH) + .contentType(MediaTypes.create("application/grpc+string")) + .submit(messageBytes("Hello, dude!".getBytes(StandardCharsets.UTF_8)))) { + assertThat(response.status().code()).isEqualTo(200); + assertThat(response.headers().contentType().orElseThrow().text()) + .isEqualTo("application/grpc+string"); + + // The first five bytes are framing -- compression + length + final var data = response.inputStream().readAllBytes(); + assertThat(new String(data, 5, data.length - 5, StandardCharsets.UTF_8)) + .isEqualTo("Hello, dude!"); + } } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -185,7 +229,7 @@ void contentTypeWithoutProtoDefaultsToProto() { // that same list as the metadata in the response. // // TESTS: - // - TBD + // - Not implemented /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -219,11 +263,28 @@ void contentTypeWithoutProtoDefaultsToProto() { // occurred. Compression contexts are NOT maintained over message boundaries, implementations must create a new // context for each message in the stream. If the Message-Encoding header is omitted then the Compressed-Flag must // be 0. - // - // TESTS: - // - TBD /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /** + * Verify that the server responds with grpc-accept-encoding and UNIMPLEMENTED if unsupported compression schemes + * are used. + */ + @ParameterizedTest + @ValueSource(strings = { "gzip", "deflate", "random" }) + void compressionNotSupported(final String grpcEncoding) { + try (var response = CLIENT.post() + .protocolId("h2") + .contentType(APPLICATION_GRPC_PROTO) + .path(SUBMIT_MESSAGE_PATH) + .header(HeaderNames.create("grpc-encoding"), grpcEncoding) + .submit(messageBytes(Transaction.DEFAULT))) { + + assertThat(response.status().code()).isEqualTo(200); + assertThat(response.headers().get(GrpcStatus.STATUS_NAME).values()).isEqualTo(GrpcStatus.UNIMPLEMENTED.values()); + assertThat(response.headers().get(HeaderNames.create("grpc-accept-encoding")).get()).isEqualTo("identity"); + } + } + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Unary Method Calls // @@ -247,8 +308,6 @@ void unaryCall() throws Exception { assertThat(rsd.readByte()).isEqualTo((byte) 0); // No Compression (we didn't ask for it) final var responseLength = (int) rsd.readUnsignedInt(); - assertThat(responseLength).isPositive(); - final var responseData = new byte[responseLength]; rsd.readBytes(responseData); final var txr = TransactionResponse.PROTOBUF.parse(Bytes.wrap(responseData)); @@ -282,8 +341,15 @@ void unaryCall() throws Exception { /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Utility methods /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - private byte[] messageBytes(Transaction tx) { - final var data = Transaction.PROTOBUF.toBytes(tx).toByteArray(); + private TransactionResponse decodeTransactionResponse(ReadableStreamingData rsd) throws ParseException { + assertThat(rsd.readByte()).isEqualTo((byte) 0); // No Compression + final var responseLength = (int) rsd.readUnsignedInt(); + final var responseData = new byte[responseLength]; + rsd.readBytes(responseData); + return TransactionResponse.PROTOBUF.parse(Bytes.wrap(responseData)); + } + + private byte[] messageBytes(byte[] data) { final var out = new ByteArrayOutputStream(); final WritableStreamingData wsd = new WritableStreamingData(out); wsd.writeByte((byte) 0); @@ -292,40 +358,35 @@ private byte[] messageBytes(Transaction tx) { return out.toByteArray(); } + private byte[] messageBytes(Transaction tx) { + final var data = Transaction.PROTOBUF.toBytes(tx).toByteArray(); + return messageBytes(data); + } + private byte[] messageBytesJson(Transaction tx) { final var data = Transaction.JSON.toBytes(tx).toByteArray(); - final var out = new ByteArrayOutputStream(); - final WritableStreamingData wsd = new WritableStreamingData(out); - wsd.writeByte((byte) 0); - wsd.writeUnsignedInt(data.length); - wsd.writeBytes(data); - return out.toByteArray(); + return messageBytes(data); } private static final class ConsensusServiceImpl implements ConsensusService { @Override public TransactionResponse createTopic(Transaction tx) { - // TODO Test when one of these returns null!! - System.out.println("Creating topic"); - return TransactionResponse.DEFAULT; + throw new RuntimeException("Some kind of Runtime exception is thrown!"); } @Override public TransactionResponse updateTopic(Transaction tx) { - System.out.println("Updating topic"); - return TransactionResponse.DEFAULT; + return SUCCESSFUL_TRANSACTION_RESPONSE; } @Override public TransactionResponse deleteTopic(Transaction tx) { - System.out.println("Deleting topic"); - return TransactionResponse.DEFAULT; + return SUCCESSFUL_TRANSACTION_RESPONSE; } @Override public TransactionResponse submitMessage(Transaction tx) { - System.out.println("Submitting message"); - return TransactionResponse.DEFAULT; + return SUCCESSFUL_TRANSACTION_RESPONSE; } @Override @@ -334,4 +395,12 @@ public Response getTopicInfo(Query q) { return Response.DEFAULT; } } + + private static final class CustomServiceImpl implements TestService { + @Override + public String echo(String message) { + System.out.println("Echoing message: " + message); + return message; + } + } } diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/pbj/TestService.java b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/TestService.java new file mode 100644 index 00000000..ccf44741 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/TestService.java @@ -0,0 +1,68 @@ +package pbj; + +import com.hedera.pbj.runtime.ServiceInterface; +import com.hedera.pbj.runtime.io.buffer.Bytes; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +public interface TestService extends ServiceInterface { + enum CustomMethod implements Method { + echoUnary, + echoBidi, + echoServerStream, + echoClientStream, + failUnary, + failBidi, + failServerStream, + failClientStream + } + + String echo(String message); + + @NonNull + default String serviceName() { + return "TestService"; + } + + @NonNull + default String fullName() { + return "proto.TestService"; + } + + @NonNull + default List methods() { + return Arrays.asList(CustomMethod.values()); + } + + @Override + default void open( + final @NonNull RequestOptions options, + final @NonNull Method method, + final @NonNull BlockingQueue messages, + final @NonNull ResponseCallback callback) { + + final var m = (CustomMethod) method; + Thread.ofVirtual().start(() -> { + try { + switch (m) { + case CustomMethod.echoUnary -> { + final var message = messages.take(); + final var ct = options.contentType(); + if (options.isJson() || options.isProtobuf() || !ct.equals("application/grpc+string")) { + throw new IllegalArgumentException("Only 'string' is allowed"); + } + + final var response = echo(message.asUtf8String()); + callback.send(Bytes.wrap(response)); + callback.close(); + } + } + } catch (Exception e) { + e.printStackTrace(); + callback.close(); + } + }); + } +} diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/ServiceInterface.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/ServiceInterface.java index 5e4940ee..cf6b07f6 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/ServiceInterface.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/ServiceInterface.java @@ -1,6 +1,7 @@ package com.hedera.pbj.runtime; import com.hedera.pbj.runtime.io.buffer.Bytes; +import edu.umd.cs.findbugs.annotations.NonNull; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -54,8 +55,9 @@ interface Method { } interface RequestOptions { - String APPLICATION_GRPC_PROTO = "proto"; - String APPLICATION_GRPC_JSON = "json"; + String APPLICATION_GRPC = "application/grpc"; + String APPLICATION_GRPC_PROTO = "application/grpc+proto"; + String APPLICATION_GRPC_JSON = "application/grpc+json"; boolean isProtobuf(); boolean isJson(); @@ -64,26 +66,19 @@ interface RequestOptions { /** * Through this interface the {@link ServiceInterface} implementation will send responses back to the client. - * The {@link #start()} method is called before any responses are sent, and the {@link #close()} method - * is called after all responses have been sent. + * The {@link #close()} method is called after all responses have been sent. * *

It is not common for an application to implement or use this interface. It is typically implemented by * a webserver to integrate PBJ into that server. */ interface ResponseCallback { - /** - * Called by the {@link ServiceInterface} implementation to before any responses have been sent to the client. - * This must be called before {@link #send(Bytes)} is called. - */ - void start(); - /** * Called to send a single response message to the client. For unary methods, this will be called once. For * server-side streaming or bidi-streaming, this may be called many times. * * @param response A response message to send to the client. */ - void send(/*@NonNull*/ Bytes response); + void send(@NonNull Bytes response); /** * Called to close the connection with the client, signaling that no more responses will be sent. @@ -92,11 +87,11 @@ interface ResponseCallback { } /** Gets the simple name of the service. For example, "HelloService". */ - /*@NonNull*/ String serviceName(); + @NonNull String serviceName(); /** Gets the full name of the service. For example, "example.HelloService". */ - /*@NonNull*/ String fullName(); + @NonNull String fullName(); /** Gets a list of each method in the service. This list may be empty but should never be null. */ - /*@NonNull*/ List methods(); + @NonNull List methods(); /** * Called by the webserver to open a new connection between the client and the service. This method may be called @@ -110,8 +105,8 @@ interface ResponseCallback { * @param callback A callback to send responses back to the client. */ void open( - /*@NonNull*/ RequestOptions opts, - /*@NonNull*/ Method method, - /*@NonNull*/ BlockingQueue messages, - /*@NonNull*/ ResponseCallback callback); + @NonNull RequestOptions opts, + @NonNull Method method, + @NonNull BlockingQueue messages, + @NonNull ResponseCallback callback); }