From 408514fd35081e14495d3f44969627d4e77ec8f5 Mon Sep 17 00:00:00 2001 From: Whitney Deng Date: Thu, 17 Oct 2024 09:12:12 -0700 Subject: [PATCH] creating draft PR for refactoring JobStatus request. unfinished --- .../VeniceControllerAccessControlService.java | 195 ++++++++++++++++++ .../VeniceControllerApiHandler.java | 116 +++++++++++ .../controller/server/AbstractRoute.java | 2 +- .../controller/server/AdminSparkServer.java | 17 +- .../venice/controller/server/JobRoutes.java | 62 ++---- .../controller/server/JobRoutesTest.java | 4 +- 6 files changed, 349 insertions(+), 47 deletions(-) create mode 100644 services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerAccessControlService.java create mode 100644 services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerApiHandler.java diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerAccessControlService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerAccessControlService.java new file mode 100644 index 0000000000..7996b7022b --- /dev/null +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerAccessControlService.java @@ -0,0 +1,195 @@ +package com.linkedin.venice.controller; + +import static com.linkedin.venice.HttpConstants.*; +import static com.linkedin.venice.HttpConstants.HTTP_GET; +import static com.linkedin.venice.VeniceConstants.*; +import static com.linkedin.venice.VeniceConstants.CONTROLLER_SSL_CERTIFICATE_ATTRIBUTE_NAME; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.*; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.NAME; + +import com.linkedin.venice.acl.AclException; +import com.linkedin.venice.acl.DynamicAccessController; +import com.linkedin.venice.exceptions.VeniceException; +import java.security.cert.X509Certificate; +import java.util.Optional; +import javax.servlet.http.HttpServletRequest; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import spark.Request; + + +public class VeniceControllerAccessControlService { + private static final Logger LOGGER = LogManager.getLogger(VeniceControllerAccessControlService.class); + + private static final String USER_UNKNOWN = "USER_UNKNOWN"; + private static final String STORE_UNKNOWN = "STORE_UNKNOWN"; + + // A singleton of acl check function against store resource + private static final com.linkedin.venice.controller.server.AbstractRoute.ResourceAclCheck GET_ACCESS_TO_STORE = + (cert, resourceName, aclClient) -> aclClient.hasAccess(cert, resourceName, HTTP_GET); + // A singleton of acl check function against topic resource + private static final com.linkedin.venice.controller.server.AbstractRoute.ResourceAclCheck WRITE_ACCESS_TO_TOPIC = + (cert, resourceName, aclClient) -> aclClient.hasAccessToTopic(cert, resourceName, "Write"); + + private static final com.linkedin.venice.controller.server.AbstractRoute.ResourceAclCheck READ_ACCESS_TO_TOPIC = + (cert, resourceName, aclClient) -> aclClient.hasAccessToTopic(cert, resourceName, "Read"); + + private final boolean sslEnabled; + private final Optional accessController; + + /** + * Default constructor for different controller request routes. + * + * TODO: once Venice Admin allowlist proposal is approved, we can transfer the allowlist to all routes + * through this constructor; make sure Nuage is also in the allowlist so that they can create stores + * @param accessController the access client that check whether a certificate can access a resource + */ + public VeniceControllerAccessControlService(boolean sslEnabled, Optional accessController) { + this.sslEnabled = sslEnabled; + this.accessController = accessController; + } + + /** + * Check whether the user certificate in request has access to the store specified in + * the request. + */ + private boolean hasAccess( + Request request, + com.linkedin.venice.controller.server.AbstractRoute.ResourceAclCheck aclCheckFunction) { + if (!isAclEnabled()) { + /** + * Grant access if it's not required to check ACL. + */ + return true; + } + X509Certificate certificate = getCertificate(request); + + String storeName = request.queryParams(NAME); + /** + * Currently Nuage only supports adding GET/POST methods for a store resource + * TODO: Feature request for Nuage to support other method like PUT or customized methods + * like WRITE, UPDATE, ADMIN etc. + */ + try { + if (!aclCheckFunction.apply(certificate, storeName, accessController.get())) { + // log the abused users + LOGGER.warn( + "Client {} [host:{} IP:{}] doesn't have access to store {}", + certificate.getSubjectX500Principal().toString(), + request.host(), + request.ip(), + storeName); + return false; + } + } catch (AclException e) { + LOGGER.error( + "Error while parsing certificate from client {} [host:{} IP:{}]", + certificate.getSubjectX500Principal().toString(), + request.host(), + request.ip(), + e); + return false; + } + return true; + } + + /** + * Check whether the user has "Write" method access to the related version topics. + */ + protected boolean hasWriteAccessToTopic(Request request) { + return hasAccess(request, WRITE_ACCESS_TO_TOPIC); + } + + /** + * Check whether the user has "Read" method access to the related version topics. + */ + protected boolean hasReadAccessToTopic(Request request) { + return hasAccess(request, READ_ACCESS_TO_TOPIC); + } + + /** + * Get principal Id from request. + */ + protected String getPrincipalId(Request request) { + if (!isSslEnabled()) { + LOGGER.warn("SSL is not enabled. No certificate could be extracted from request."); + return USER_UNKNOWN; + } + X509Certificate certificate = getCertificate(request); + if (isAclEnabled()) { + try { + return accessController.get().getPrincipalId(certificate); + } catch (Exception e) { + LOGGER.error("Error when retrieving principal Id from request", e); + return USER_UNKNOWN; + } + } else { + return certificate.getSubjectX500Principal().getName(); + } + } + + /** + * Check whether the user has "GET" method access to the related store resource. + * + * Notice: currently we don't have any controller request that necessarily requires "GET" ACL to store; + * ACL is not checked for requests that want to get metadata of a store/job. + */ + protected boolean hasAccessToStore(Request request) { + return hasAccess(request, GET_ACCESS_TO_STORE); + } + + /** + * Check whether the user is within the admin users allowlist. + */ + protected boolean isAllowListUser(Request request) { + if (!isAclEnabled()) { + /** + * Grant access if it's not required to check ACL. + * {@link accessController} will be empty if ACL is not enabled. + */ + return true; + } + X509Certificate certificate = getCertificate(request); + + String storeName = request.queryParamOrDefault(NAME, STORE_UNKNOWN); + return accessController.get().isAllowlistUsers(certificate, storeName, HTTP_GET); + } + + /** + * @return whether SSL is enabled + */ + protected boolean isSslEnabled() { + return sslEnabled; + } + + /** + * @return whether ACL check is enabled. + */ + protected boolean isAclEnabled() { + /** + * {@link accessController} will be empty if ACL is not enabled. + */ + return accessController.isPresent(); + } + + /** + * Helper function to get certificate out of Spark request + */ + protected static X509Certificate getCertificate(Request request) { + HttpServletRequest rawRequest = request.raw(); + Object certificateObject = rawRequest.getAttribute(CONTROLLER_SSL_CERTIFICATE_ATTRIBUTE_NAME); + if (certificateObject == null) { + throw new VeniceException("Client request doesn't contain certificate for store: " + request.queryParams(NAME)); + } + return ((X509Certificate[]) certificateObject)[0]; + } + + /** + * A function that would check whether a principal has access to a resource. + */ + @FunctionalInterface + interface ResourceAclCheck { + boolean apply(X509Certificate clientCert, String resource, DynamicAccessController accessController) + throws AclException; + } +} diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerApiHandler.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerApiHandler.java new file mode 100644 index 0000000000..adc64d4a64 --- /dev/null +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerApiHandler.java @@ -0,0 +1,116 @@ +package com.linkedin.venice.controller; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.venice.SSLConfig; +import com.linkedin.venice.acl.DynamicAccessController; +import com.linkedin.venice.controller.server.endpoint.JobStatusRequest; +import com.linkedin.venice.controller.spark.VeniceSparkServerFactory; +import com.linkedin.venice.controller.stats.SparkServerStats; +import com.linkedin.venice.controllerapi.ControllerRoute; +import com.linkedin.venice.controllerapi.JobStatusQueryResponse; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.utils.ObjectMapperFactory; +import com.linkedin.venice.utils.VeniceProperties; +import io.tehuti.metrics.MetricsRepository; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import spark.embeddedserver.EmbeddedServers; + + +public class VeniceControllerApiHandler { + private final int port; + private final Admin admin; + private final boolean enforceSSL; + private final boolean sslEnabled; + private final boolean checkReadMethodForKafka; + private final Optional sslConfig; + private final Optional accessController; + + protected static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance(); + final private Map statsMap; + final private SparkServerStats nonclusterSpecificStats; + + private static String REQUEST_START_TIME = "startTime"; + private static String REQUEST_SUCCEED = "succeed"; + + private final List disabledRoutes; + + private final boolean disableParentRequestTopicForStreamPushes; + private final PubSubTopicRepository pubSubTopicRepository; + + // Use this for access controls and other security related checks + VeniceControllerAccessControlService veniceControllerAccessControlService; + + public VeniceControllerApiHandler( + int port, + Admin admin, + MetricsRepository metricsRepository, + Set clusters, + boolean enforceSSL, + Optional sslConfig, + boolean checkReadMethodForKafka, + Optional accessController, + List disabledRoutes, + VeniceProperties jettyConfigOverrides, + boolean disableParentRequestTopicForStreamPushes, + PubSubTopicRepository pubSubTopicRepository) { + this.port = port; + this.enforceSSL = enforceSSL; + this.sslEnabled = sslConfig.isPresent(); + this.sslConfig = sslConfig; + this.checkReadMethodForKafka = checkReadMethodForKafka; + this.accessController = accessController; + // Note: admin is passed in as a reference. The expectation is the source of the admin will + // close it so we don't close it in stopInner() + this.admin = admin; + statsMap = new HashMap<>(clusters.size()); + String statsPrefix = sslEnabled ? "secure_" : ""; + for (String cluster: clusters) { + statsMap.put( + cluster, + new SparkServerStats(metricsRepository, cluster + "." + statsPrefix + "controller_spark_server")); + } + nonclusterSpecificStats = new SparkServerStats(metricsRepository, "." + statsPrefix + "controller_spark_server"); + EmbeddedServers.add(EmbeddedServers.Identifiers.JETTY, new VeniceSparkServerFactory(jettyConfigOverrides)); + + this.disabledRoutes = disabledRoutes; + this.disableParentRequestTopicForStreamPushes = disableParentRequestTopicForStreamPushes; + this.pubSubTopicRepository = pubSubTopicRepository; + this.veniceControllerAccessControlService = new VeniceControllerAccessControlService(sslEnabled, accessController); + } + + public JobStatusQueryResponse populateJobStatus(JobStatusRequest jobStatusRequest) { + JobStatusQueryResponse responseObject = new JobStatusQueryResponse(); + + String store = jobStatusRequest.getStore(); + int versionNumber = jobStatusRequest.getVersionNumber(); + String cluster = jobStatusRequest.getCluster(); + String incrementalPushVersion = jobStatusRequest.getIncrementalPushVersion(); + String region = jobStatusRequest.getRegion(); + String targetedRegions = jobStatusRequest.getTargetedRegions(); + + String kafkaTopicName = Version.composeKafkaTopic(store, versionNumber); + Admin.OfflinePushStatusInfo offlineJobStatus = admin.getOffLinePushStatus( + cluster, + kafkaTopicName, + Optional.ofNullable(incrementalPushVersion), + region, + targetedRegions); + responseObject.setStatus(offlineJobStatus.getExecutionStatus().toString()); + responseObject.setStatusUpdateTimestamp(offlineJobStatus.getStatusUpdateTimestamp()); + responseObject.setStatusDetails(offlineJobStatus.getStatusDetails()); + responseObject.setExtraInfo(offlineJobStatus.getExtraInfo()); + responseObject.setExtraInfoUpdateTimestamp(offlineJobStatus.getExtraInfoUpdateTimestamp()); + responseObject.setExtraDetails(offlineJobStatus.getExtraDetails()); + responseObject.setUncompletedPartitions(offlineJobStatus.getUncompletedPartitions()); + + responseObject.setCluster(cluster); + responseObject.setName(store); + responseObject.setVersion(versionNumber); + return responseObject; + } +} diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AbstractRoute.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AbstractRoute.java index 9b30c4beed..12248ac9cc 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AbstractRoute.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AbstractRoute.java @@ -183,7 +183,7 @@ protected static X509Certificate getCertificate(Request request) { * A function that would check whether a principal has access to a resource. */ @FunctionalInterface - interface ResourceAclCheck { + public interface ResourceAclCheck { boolean apply(X509Certificate clientCert, String resource, DynamicAccessController accessController) throws AclException; } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java index aaa7e31ce5..b76d15c0f1 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java @@ -110,6 +110,7 @@ import com.linkedin.venice.acl.DynamicAccessController; import com.linkedin.venice.controller.Admin; import com.linkedin.venice.controller.AuditInfo; +import com.linkedin.venice.controller.VeniceControllerApiHandler; import com.linkedin.venice.controller.spark.VeniceSparkServerFactory; import com.linkedin.venice.controller.stats.SparkServerStats; import com.linkedin.venice.controllerapi.ControllerRoute; @@ -169,6 +170,7 @@ public class AdminSparkServer extends AbstractVeniceService { private final boolean disableParentRequestTopicForStreamPushes; private final PubSubTopicRepository pubSubTopicRepository; + private final VeniceControllerApiHandler veniceControllerApiHandler; public AdminSparkServer( int port, @@ -206,6 +208,19 @@ public AdminSparkServer( this.disabledRoutes = disabledRoutes; this.disableParentRequestTopicForStreamPushes = disableParentRequestTopicForStreamPushes; this.pubSubTopicRepository = pubSubTopicRepository; + this.veniceControllerApiHandler = new VeniceControllerApiHandler( + port, + admin, + metricsRepository, + clusters, + enforceSSL, + sslConfig, + checkReadMethodForKafka, + accessController, + disabledRoutes, + jettyConfigOverrides, + disableParentRequestTopicForStreamPushes, + pubSubTopicRepository); } @Override @@ -280,7 +295,7 @@ public boolean startInner() throws Exception { // Build all different routes ControllerRoutes controllerRoutes = new ControllerRoutes(sslEnabled, accessController, pubSubTopicRepository); StoresRoutes storesRoutes = new StoresRoutes(sslEnabled, accessController, pubSubTopicRepository); - JobRoutes jobRoutes = new JobRoutes(sslEnabled, accessController); + JobRoutes jobRoutes = new JobRoutes(sslEnabled, accessController, veniceControllerApiHandler); SkipAdminRoute skipAdminRoute = new SkipAdminRoute(sslEnabled, accessController); CreateVersion createVersion = new CreateVersion( sslEnabled, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/JobRoutes.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/JobRoutes.java index 31f55e5987..40083787f9 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/JobRoutes.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/JobRoutes.java @@ -15,6 +15,8 @@ import com.linkedin.venice.HttpConstants; import com.linkedin.venice.acl.DynamicAccessController; import com.linkedin.venice.controller.Admin; +import com.linkedin.venice.controller.VeniceControllerApiHandler; +import com.linkedin.venice.controller.server.endpoint.JobStatusRequest; import com.linkedin.venice.controllerapi.ControllerResponse; import com.linkedin.venice.controllerapi.IncrementalPushVersionsResponse; import com.linkedin.venice.controllerapi.JobStatusQueryResponse; @@ -38,9 +40,14 @@ public class JobRoutes extends AbstractRoute { private static final Logger LOGGER = LogManager.getLogger(JobRoutes.class); private final InternalAvroSpecificSerializer pushJobDetailsSerializer = AvroProtocolDefinition.PUSH_JOB_DETAILS.getSerializer(); + private VeniceControllerApiHandler veniceControllerApiHandler; - public JobRoutes(boolean sslEnabled, Optional accessController) { + public JobRoutes( + boolean sslEnabled, + Optional accessController, + VeniceControllerApiHandler veniceControllerApiHandler) { super(sslEnabled, accessController); + this.veniceControllerApiHandler = veniceControllerApiHandler; } /** @@ -53,20 +60,17 @@ public Route jobStatus(Admin admin) { try { // No ACL check for getting job metadata AdminSparkServer.validateParams(request, JOB.getParams(), admin); - String cluster = request.queryParams(CLUSTER); - String store = request.queryParams(NAME); - int versionNumber = Utils.parseIntFromString(request.queryParams(VERSION), VERSION); - String incrementalPushVersion = AdminSparkServer.getOptionalParameterValue(request, INCREMENTAL_PUSH_VERSION); - String targetedRegions = request.queryParams(TARGETED_REGIONS); - String region = AdminSparkServer.getOptionalParameterValue(request, FABRIC); - responseObject = populateJobStatus( - cluster, - store, - versionNumber, - admin, - Optional.ofNullable(incrementalPushVersion), - region, - targetedRegions); + + JobStatusRequest jobStatusRequest = new JobStatusRequest(); + jobStatusRequest.setCluster(request.queryParams(CLUSTER)); + jobStatusRequest.setStore(request.queryParams(NAME)); + jobStatusRequest.setVersionNumber(Utils.parseIntFromString(request.queryParams(VERSION), VERSION)); + jobStatusRequesqt + .setIncrementalPushVersion(AdminSparkServer.getOptionalParameterValue(request, INCREMENTAL_PUSH_VERSION)); + jobStatusRequest.setTargetedRegions(request.queryParams(TARGETED_REGIONS)); + jobStatusRequest.setRegion(AdminSparkServer.getOptionalParameterValue(request, FABRIC)); + + responseObject = veniceControllerApiHandler.populateJobStatus(jobStatusRequest); } catch (Throwable e) { responseObject.setError(e); AdminSparkServer.handleError(e, request, response); @@ -75,34 +79,6 @@ public Route jobStatus(Admin admin) { }; } - JobStatusQueryResponse populateJobStatus( - String cluster, - String store, - int versionNumber, - Admin admin, - Optional incrementalPushVersion, - String region, - String targetedRegions) { - JobStatusQueryResponse responseObject = new JobStatusQueryResponse(); - - String kafkaTopicName = Version.composeKafkaTopic(store, versionNumber); - - Admin.OfflinePushStatusInfo offlineJobStatus = - admin.getOffLinePushStatus(cluster, kafkaTopicName, incrementalPushVersion, region, targetedRegions); - responseObject.setStatus(offlineJobStatus.getExecutionStatus().toString()); - responseObject.setStatusUpdateTimestamp(offlineJobStatus.getStatusUpdateTimestamp()); - responseObject.setStatusDetails(offlineJobStatus.getStatusDetails()); - responseObject.setExtraInfo(offlineJobStatus.getExtraInfo()); - responseObject.setExtraInfoUpdateTimestamp(offlineJobStatus.getExtraInfoUpdateTimestamp()); - responseObject.setExtraDetails(offlineJobStatus.getExtraDetails()); - responseObject.setUncompletedPartitions(offlineJobStatus.getUncompletedPartitions()); - - responseObject.setCluster(cluster); - responseObject.setName(store); - responseObject.setVersion(versionNumber); - return responseObject; - } - /** * @see Admin#killOfflinePush(String, String, boolean) */ diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/JobRoutesTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/JobRoutesTest.java index f3ce901a72..6c03c5dc0d 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/JobRoutesTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/JobRoutesTest.java @@ -34,8 +34,8 @@ public void testPopulateJobStatus() { String store = Utils.getUniqueString("store"); int version = 5; JobRoutes jobRoutes = new JobRoutes(false, Optional.empty()); - JobStatusQueryResponse response = - jobRoutes.populateJobStatus(cluster, store, version, mockAdmin, Optional.empty(), null, null); + JobStatusQueryResponse response = jobRoutes.veniceControllerApiHandler + .populateJobStatus(cluster, store, version, mockAdmin, Optional.empty(), null, null); Map extraInfo = response.getExtraInfo(); LOGGER.info("extraInfo: {}", extraInfo);