Skip to content

Commit

Permalink
Refactor BlockStreamService to extract BlockAccessService
Browse files Browse the repository at this point in the history
Signed-off-by: Alfredo Gutierrez <[email protected]>
  • Loading branch information
AlfredoG87 committed Oct 18, 2024
1 parent bee9d6d commit 5b096ec
Show file tree
Hide file tree
Showing 10 changed files with 448 additions and 306 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.hedera.block.server;

import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.grpc.BlockStreamService;
import com.hedera.block.server.metrics.MetricsService;
import com.swirlds.config.api.Configuration;
import dagger.Binds;
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/com/hedera/block/server/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ private Constants() {}
*/
@NonNull public static final String LOGGING_PROPERTIES = "logging.properties";

/** Constant mapped to the name of the service in the .proto file */
/** Constant mapped to the name of the BlockStream service in the .proto file */
@NonNull public static final String SERVICE_NAME_BLOCK_STREAM = "BlockStreamService";

/** Constant mapped to the name of the BlockAccess service in the .proto file */
@NonNull public static final String SERVICE_NAME_BLOCK_ACCESS = "BlockAccessService";

/** Constant mapped to the publishBlockStream service method name in the .proto file */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server.grpc;

import static com.hedera.block.server.Constants.SINGLE_BLOCK_METHOD_NAME;
import static com.hedera.block.server.Translator.fromPbj;
import static com.hedera.block.server.Translator.toPbj;
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.SingleBlocksNotFound;
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.SingleBlocksRetrieved;
import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.ERROR;

import com.google.protobuf.Descriptors;
import com.google.protobuf.InvalidProtocolBufferException;
import com.hedera.block.server.Constants;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.block.server.persistence.storage.read.BlockReader;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.hapi.block.SingleBlockRequest;
import com.hedera.hapi.block.SingleBlockResponseCode;
import com.hedera.hapi.block.protoc.BlockService;
import com.hedera.hapi.block.protoc.SingleBlockResponse;
import com.hedera.hapi.block.stream.Block;
import com.hedera.pbj.runtime.ParseException;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.stub.StreamObserver;
import io.helidon.webserver.grpc.GrpcService;
import java.io.IOException;
import java.util.Optional;
import javax.inject.Inject;

/**
* The BlockAccessService class provides a gRPC service to access blocks.
*
* <p>This service provides a unary gRPC method to retrieve a single block by block number.
*/
public class BlockAccessService implements GrpcService {
private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final ServiceStatus serviceStatus;
private final BlockReader<Block> blockReader;
private final MetricsService metricsService;

/**
* Constructs a new BlockAccessService instance with the given dependencies.
*
* @param serviceStatus used to query the service status
* @param blockReader used to retrieve blocks
* @param metricsService used to observe metrics
*/
@Inject
public BlockAccessService(
@NonNull ServiceStatus serviceStatus,
@NonNull BlockReader<Block> blockReader,
@NonNull MetricsService metricsService) {
this.serviceStatus = serviceStatus;
this.blockReader = blockReader;
this.metricsService = metricsService;
}

@Override
public Descriptors.FileDescriptor proto() {
return BlockService.getDescriptor();

Check warning on line 77 in server/src/main/java/com/hedera/block/server/grpc/BlockAccessService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/com/hedera/block/server/grpc/BlockAccessService.java#L77

Added line #L77 was not covered by tests
}

@Override
public String serviceName() {
return Constants.SERVICE_NAME_BLOCK_ACCESS;

Check warning on line 82 in server/src/main/java/com/hedera/block/server/grpc/BlockAccessService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/com/hedera/block/server/grpc/BlockAccessService.java#L82

Added line #L82 was not covered by tests
}

@Override
public void update(Routing routing) {
routing.unary(SINGLE_BLOCK_METHOD_NAME, this::protocSingleBlock);
}

void protocSingleBlock(
@NonNull final com.hedera.hapi.block.protoc.SingleBlockRequest singleBlockRequest,
@NonNull final StreamObserver<SingleBlockResponse> singleBlockResponseStreamObserver) {
LOGGER.log(DEBUG, "Executing Unary singleBlock gRPC method");

try {
final SingleBlockRequest pbjSingleBlockRequest =
toPbj(SingleBlockRequest.PROTOBUF, singleBlockRequest.toByteArray());
singleBlock(pbjSingleBlockRequest, singleBlockResponseStreamObserver);
} catch (ParseException e) {
LOGGER.log(ERROR, "Error parsing protoc SingleBlockRequest: {0}", singleBlockRequest);
singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse());
}
}

private void singleBlock(
@NonNull final SingleBlockRequest singleBlockRequest,
@NonNull
final StreamObserver<com.hedera.hapi.block.protoc.SingleBlockResponse>
singleBlockResponseStreamObserver) {

LOGGER.log(DEBUG, "Executing Unary singleBlock gRPC method");

if (serviceStatus.isRunning()) {
final long blockNumber = singleBlockRequest.blockNumber();
try {
final Optional<Block> blockOpt = blockReader.read(blockNumber);
if (blockOpt.isPresent()) {
LOGGER.log(DEBUG, "Successfully returning block number: {0}", blockNumber);
singleBlockResponseStreamObserver.onNext(
fromPbjSingleBlockSuccessResponse(blockOpt.get()));

metricsService.get(SingleBlocksRetrieved).increment();
} else {
LOGGER.log(DEBUG, "Block number {0} not found", blockNumber);
singleBlockResponseStreamObserver.onNext(buildSingleBlockNotFoundResponse());
metricsService.get(SingleBlocksNotFound).increment();
}
} catch (IOException e) {
LOGGER.log(ERROR, "Error reading block number: {0}", blockNumber);
singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse());
} catch (ParseException e) {
LOGGER.log(ERROR, "Error parsing block number: {0}", blockNumber);
singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse());
}
} else {
LOGGER.log(ERROR, "Unary singleBlock gRPC method is not currently running");
singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse());
}

// Send the response
singleBlockResponseStreamObserver.onCompleted();
}

@NonNull
static com.hedera.hapi.block.protoc.SingleBlockResponse buildSingleBlockNotAvailableResponse() {
final com.hedera.hapi.block.SingleBlockResponse response =
com.hedera.hapi.block.SingleBlockResponse.newBuilder()
.status(SingleBlockResponseCode.READ_BLOCK_NOT_AVAILABLE)
.build();

return fromPbj(response);
}

@NonNull
static com.hedera.hapi.block.protoc.SingleBlockResponse buildSingleBlockNotFoundResponse()
throws InvalidProtocolBufferException {
final com.hedera.hapi.block.SingleBlockResponse response =
com.hedera.hapi.block.SingleBlockResponse.newBuilder()
.status(SingleBlockResponseCode.READ_BLOCK_NOT_FOUND)
.build();

return fromPbj(response);
}

@NonNull
static com.hedera.hapi.block.protoc.SingleBlockResponse fromPbjSingleBlockSuccessResponse(
@NonNull final Block block) {
final com.hedera.hapi.block.SingleBlockResponse singleBlockResponse =
com.hedera.hapi.block.SingleBlockResponse.newBuilder()
.status(SingleBlockResponseCode.READ_BLOCK_SUCCESS)
.block(block)
.build();

return fromPbj(singleBlockResponse);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,17 @@
* limitations under the License.
*/

package com.hedera.block.server;
package com.hedera.block.server.grpc;

import static com.hedera.block.server.Constants.CLIENT_STREAMING_METHOD_NAME;
import static com.hedera.block.server.Constants.SERVER_STREAMING_METHOD_NAME;
import static com.hedera.block.server.Constants.SERVICE_NAME_BLOCK_STREAM;
import static com.hedera.block.server.Constants.SINGLE_BLOCK_METHOD_NAME;
import static com.hedera.block.server.Translator.fromPbj;
import static com.hedera.block.server.Translator.toPbj;
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.SingleBlocksNotFound;
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.SingleBlocksRetrieved;
import static java.lang.System.Logger;
import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.ERROR;

import com.google.protobuf.Descriptors;
import com.google.protobuf.InvalidProtocolBufferException;
import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.consumer.ConsumerStreamResponseObserver;
import com.hedera.block.server.events.BlockNodeEventHandler;
Expand All @@ -40,26 +35,19 @@
import com.hedera.block.server.persistence.storage.read.BlockReader;
import com.hedera.block.server.producer.ProducerBlockItemObserver;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.hapi.block.SingleBlockRequest;
import com.hedera.hapi.block.SingleBlockResponse;
import com.hedera.hapi.block.SingleBlockResponseCode;
import com.hedera.hapi.block.SubscribeStreamResponse;
import com.hedera.hapi.block.SubscribeStreamResponseCode;
import com.hedera.hapi.block.protoc.BlockService;
import com.hedera.hapi.block.stream.Block;
import com.hedera.pbj.runtime.ParseException;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.stub.StreamObserver;
import io.helidon.webserver.grpc.GrpcService;
import java.io.IOException;
import java.time.Clock;
import java.util.Optional;
import javax.inject.Inject;

/**
* The BlockStreamService class defines the gRPC service for the block stream service. It provides
* the implementation for the bidirectional streaming, server streaming, and unary methods defined
* in the proto file.
* the implementation for the bidirectional streaming, server streaming as defined in the proto file.
*/
public class BlockStreamService implements GrpcService {

Expand Down Expand Up @@ -140,7 +128,6 @@ public String serviceName() {
public void update(@NonNull final Routing routing) {
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::protocPublishBlockStream);
routing.serverStream(SERVER_STREAMING_METHOD_NAME, this::protocSubscribeBlockStream);
routing.unary(SINGLE_BLOCK_METHOD_NAME, this::protocSingleBlock);
}

StreamObserver<com.hedera.hapi.block.protoc.PublishStreamRequest> protocPublishBlockStream(
Expand Down Expand Up @@ -198,62 +185,6 @@ void protocSubscribeBlockStream(
}
}

void protocSingleBlock(
@NonNull final com.hedera.hapi.block.protoc.SingleBlockRequest singleBlockRequest,
@NonNull
final StreamObserver<com.hedera.hapi.block.protoc.SingleBlockResponse>
singleBlockResponseStreamObserver) {
LOGGER.log(DEBUG, "Executing Unary singleBlock gRPC method");

try {
final SingleBlockRequest pbjSingleBlockRequest =
toPbj(SingleBlockRequest.PROTOBUF, singleBlockRequest.toByteArray());
singleBlock(pbjSingleBlockRequest, singleBlockResponseStreamObserver);
} catch (ParseException e) {
LOGGER.log(ERROR, "Error parsing protoc SingleBlockRequest: {0}", singleBlockRequest);
singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse());
}
}

private void singleBlock(
@NonNull final SingleBlockRequest singleBlockRequest,
@NonNull
final StreamObserver<com.hedera.hapi.block.protoc.SingleBlockResponse>
singleBlockResponseStreamObserver) {

LOGGER.log(DEBUG, "Executing Unary singleBlock gRPC method");

if (serviceStatus.isRunning()) {
final long blockNumber = singleBlockRequest.blockNumber();
try {
final Optional<Block> blockOpt = blockReader.read(blockNumber);
if (blockOpt.isPresent()) {
LOGGER.log(DEBUG, "Successfully returning block number: {0}", blockNumber);
singleBlockResponseStreamObserver.onNext(
fromPbjSingleBlockSuccessResponse(blockOpt.get()));

metricsService.get(SingleBlocksRetrieved).increment();
} else {
LOGGER.log(DEBUG, "Block number {0} not found", blockNumber);
singleBlockResponseStreamObserver.onNext(buildSingleBlockNotFoundResponse());
metricsService.get(SingleBlocksNotFound).increment();
}
} catch (IOException e) {
LOGGER.log(ERROR, "Error reading block number: {0}", blockNumber);
singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse());
} catch (ParseException e) {
LOGGER.log(ERROR, "Error parsing block number: {0}", blockNumber);
singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse());
}
} else {
LOGGER.log(ERROR, "Unary singleBlock gRPC method is not currently running");
singleBlockResponseStreamObserver.onNext(buildSingleBlockNotAvailableResponse());
}

// Send the response
singleBlockResponseStreamObserver.onCompleted();
}

// TODO: Fix this error type once it's been standardized in `hedera-protobufs`
// this should not be success
@NonNull
Expand All @@ -266,37 +197,4 @@ private void singleBlock(

return fromPbj(response);
}

@NonNull
static com.hedera.hapi.block.protoc.SingleBlockResponse buildSingleBlockNotAvailableResponse() {
final SingleBlockResponse response =
SingleBlockResponse.newBuilder()
.status(SingleBlockResponseCode.READ_BLOCK_NOT_AVAILABLE)
.build();

return fromPbj(response);
}

@NonNull
static com.hedera.hapi.block.protoc.SingleBlockResponse buildSingleBlockNotFoundResponse()
throws InvalidProtocolBufferException {
final SingleBlockResponse response =
SingleBlockResponse.newBuilder()
.status(SingleBlockResponseCode.READ_BLOCK_NOT_FOUND)
.build();

return fromPbj(response);
}

@NonNull
static com.hedera.hapi.block.protoc.SingleBlockResponse fromPbjSingleBlockSuccessResponse(
@NonNull final Block block) {
final SingleBlockResponse singleBlockResponse =
SingleBlockResponse.newBuilder()
.status(SingleBlockResponseCode.READ_BLOCK_SUCCESS)
.block(block)
.build();

return fromPbj(singleBlockResponse);
}
}
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
exports com.hedera.block.server.persistence;
exports com.hedera.block.server.notifier;
exports com.hedera.block.server.service;
exports com.hedera.block.server.grpc;

requires com.hedera.block.stream;
requires com.google.protobuf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.hedera.block.server.grpc.BlockStreamService;
import com.hedera.block.server.health.HealthService;
import com.hedera.block.server.service.ServiceStatus;
import io.helidon.webserver.WebServer;
Expand Down
Loading

0 comments on commit 5b096ec

Please sign in to comment.