Skip to content

Commit

Permalink
creating draft PR for refactoring JobStatus request. unfinished
Browse files Browse the repository at this point in the history
  • Loading branch information
Whitney Deng committed Oct 17, 2024
1 parent d29340a commit 408514f
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.HttpConstants.*;
import static com.linkedin.venice.HttpConstants.HTTP_GET;
import static com.linkedin.venice.VeniceConstants.*;
import static com.linkedin.venice.VeniceConstants.CONTROLLER_SSL_CERTIFICATE_ATTRIBUTE_NAME;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.*;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.NAME;

import com.linkedin.venice.acl.AclException;
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.exceptions.VeniceException;
import java.security.cert.X509Certificate;
import java.util.Optional;
import javax.servlet.http.HttpServletRequest;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import spark.Request;


public class VeniceControllerAccessControlService {
private static final Logger LOGGER = LogManager.getLogger(VeniceControllerAccessControlService.class);

private static final String USER_UNKNOWN = "USER_UNKNOWN";
private static final String STORE_UNKNOWN = "STORE_UNKNOWN";

// A singleton of acl check function against store resource
private static final com.linkedin.venice.controller.server.AbstractRoute.ResourceAclCheck GET_ACCESS_TO_STORE =
(cert, resourceName, aclClient) -> aclClient.hasAccess(cert, resourceName, HTTP_GET);
// A singleton of acl check function against topic resource
private static final com.linkedin.venice.controller.server.AbstractRoute.ResourceAclCheck WRITE_ACCESS_TO_TOPIC =
(cert, resourceName, aclClient) -> aclClient.hasAccessToTopic(cert, resourceName, "Write");

private static final com.linkedin.venice.controller.server.AbstractRoute.ResourceAclCheck READ_ACCESS_TO_TOPIC =
(cert, resourceName, aclClient) -> aclClient.hasAccessToTopic(cert, resourceName, "Read");

private final boolean sslEnabled;
private final Optional<DynamicAccessController> accessController;

/**
* Default constructor for different controller request routes.
*
* TODO: once Venice Admin allowlist proposal is approved, we can transfer the allowlist to all routes
* through this constructor; make sure Nuage is also in the allowlist so that they can create stores
* @param accessController the access client that check whether a certificate can access a resource
*/
public VeniceControllerAccessControlService(boolean sslEnabled, Optional<DynamicAccessController> accessController) {
this.sslEnabled = sslEnabled;
this.accessController = accessController;
}

/**
* Check whether the user certificate in request has access to the store specified in
* the request.
*/
private boolean hasAccess(
Request request,
com.linkedin.venice.controller.server.AbstractRoute.ResourceAclCheck aclCheckFunction) {
if (!isAclEnabled()) {
/**
* Grant access if it's not required to check ACL.
*/
return true;
}
X509Certificate certificate = getCertificate(request);

String storeName = request.queryParams(NAME);
/**
* Currently Nuage only supports adding GET/POST methods for a store resource
* TODO: Feature request for Nuage to support other method like PUT or customized methods
* like WRITE, UPDATE, ADMIN etc.
*/
try {
if (!aclCheckFunction.apply(certificate, storeName, accessController.get())) {
// log the abused users
LOGGER.warn(
"Client {} [host:{} IP:{}] doesn't have access to store {}",
certificate.getSubjectX500Principal().toString(),
request.host(),
request.ip(),
storeName);
return false;
}
} catch (AclException e) {
LOGGER.error(
"Error while parsing certificate from client {} [host:{} IP:{}]",
certificate.getSubjectX500Principal().toString(),
request.host(),
request.ip(),
e);
return false;
}
return true;
}

/**
* Check whether the user has "Write" method access to the related version topics.
*/
protected boolean hasWriteAccessToTopic(Request request) {
return hasAccess(request, WRITE_ACCESS_TO_TOPIC);
}

/**
* Check whether the user has "Read" method access to the related version topics.
*/
protected boolean hasReadAccessToTopic(Request request) {
return hasAccess(request, READ_ACCESS_TO_TOPIC);
}

/**
* Get principal Id from request.
*/
protected String getPrincipalId(Request request) {
if (!isSslEnabled()) {
LOGGER.warn("SSL is not enabled. No certificate could be extracted from request.");
return USER_UNKNOWN;
}
X509Certificate certificate = getCertificate(request);
if (isAclEnabled()) {
try {
return accessController.get().getPrincipalId(certificate);
} catch (Exception e) {
LOGGER.error("Error when retrieving principal Id from request", e);
return USER_UNKNOWN;
}
} else {
return certificate.getSubjectX500Principal().getName();
}
}

/**
* Check whether the user has "GET" method access to the related store resource.
*
* Notice: currently we don't have any controller request that necessarily requires "GET" ACL to store;
* ACL is not checked for requests that want to get metadata of a store/job.
*/
protected boolean hasAccessToStore(Request request) {
return hasAccess(request, GET_ACCESS_TO_STORE);
}

/**
* Check whether the user is within the admin users allowlist.
*/
protected boolean isAllowListUser(Request request) {
if (!isAclEnabled()) {
/**
* Grant access if it's not required to check ACL.
* {@link accessController} will be empty if ACL is not enabled.
*/
return true;
}
X509Certificate certificate = getCertificate(request);

String storeName = request.queryParamOrDefault(NAME, STORE_UNKNOWN);
return accessController.get().isAllowlistUsers(certificate, storeName, HTTP_GET);
}

/**
* @return whether SSL is enabled
*/
protected boolean isSslEnabled() {
return sslEnabled;
}

/**
* @return whether ACL check is enabled.
*/
protected boolean isAclEnabled() {
/**
* {@link accessController} will be empty if ACL is not enabled.
*/
return accessController.isPresent();
}

/**
* Helper function to get certificate out of Spark request
*/
protected static X509Certificate getCertificate(Request request) {
HttpServletRequest rawRequest = request.raw();
Object certificateObject = rawRequest.getAttribute(CONTROLLER_SSL_CERTIFICATE_ATTRIBUTE_NAME);
if (certificateObject == null) {
throw new VeniceException("Client request doesn't contain certificate for store: " + request.queryParams(NAME));
}
return ((X509Certificate[]) certificateObject)[0];
}

/**
* A function that would check whether a principal has access to a resource.
*/
@FunctionalInterface
interface ResourceAclCheck {
boolean apply(X509Certificate clientCert, String resource, DynamicAccessController accessController)
throws AclException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package com.linkedin.venice.controller;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.SSLConfig;
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.controller.server.endpoint.JobStatusRequest;
import com.linkedin.venice.controller.spark.VeniceSparkServerFactory;
import com.linkedin.venice.controller.stats.SparkServerStats;
import com.linkedin.venice.controllerapi.ControllerRoute;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.metrics.MetricsRepository;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import spark.embeddedserver.EmbeddedServers;


public class VeniceControllerApiHandler {
private final int port;
private final Admin admin;
private final boolean enforceSSL;
private final boolean sslEnabled;
private final boolean checkReadMethodForKafka;
private final Optional<SSLConfig> sslConfig;
private final Optional<DynamicAccessController> accessController;

protected static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance();
final private Map<String, SparkServerStats> statsMap;
final private SparkServerStats nonclusterSpecificStats;

private static String REQUEST_START_TIME = "startTime";
private static String REQUEST_SUCCEED = "succeed";

private final List<ControllerRoute> disabledRoutes;

private final boolean disableParentRequestTopicForStreamPushes;
private final PubSubTopicRepository pubSubTopicRepository;

// Use this for access controls and other security related checks
VeniceControllerAccessControlService veniceControllerAccessControlService;

public VeniceControllerApiHandler(
int port,
Admin admin,
MetricsRepository metricsRepository,
Set<String> clusters,
boolean enforceSSL,
Optional<SSLConfig> sslConfig,
boolean checkReadMethodForKafka,
Optional<DynamicAccessController> accessController,
List<ControllerRoute> disabledRoutes,
VeniceProperties jettyConfigOverrides,
boolean disableParentRequestTopicForStreamPushes,
PubSubTopicRepository pubSubTopicRepository) {
this.port = port;
this.enforceSSL = enforceSSL;
this.sslEnabled = sslConfig.isPresent();
this.sslConfig = sslConfig;
this.checkReadMethodForKafka = checkReadMethodForKafka;
this.accessController = accessController;
// Note: admin is passed in as a reference. The expectation is the source of the admin will
// close it so we don't close it in stopInner()
this.admin = admin;
statsMap = new HashMap<>(clusters.size());
String statsPrefix = sslEnabled ? "secure_" : "";
for (String cluster: clusters) {
statsMap.put(
cluster,
new SparkServerStats(metricsRepository, cluster + "." + statsPrefix + "controller_spark_server"));
}
nonclusterSpecificStats = new SparkServerStats(metricsRepository, "." + statsPrefix + "controller_spark_server");
EmbeddedServers.add(EmbeddedServers.Identifiers.JETTY, new VeniceSparkServerFactory(jettyConfigOverrides));

this.disabledRoutes = disabledRoutes;
this.disableParentRequestTopicForStreamPushes = disableParentRequestTopicForStreamPushes;
this.pubSubTopicRepository = pubSubTopicRepository;
this.veniceControllerAccessControlService = new VeniceControllerAccessControlService(sslEnabled, accessController);
}

public JobStatusQueryResponse populateJobStatus(JobStatusRequest jobStatusRequest) {
JobStatusQueryResponse responseObject = new JobStatusQueryResponse();

String store = jobStatusRequest.getStore();
int versionNumber = jobStatusRequest.getVersionNumber();
String cluster = jobStatusRequest.getCluster();
String incrementalPushVersion = jobStatusRequest.getIncrementalPushVersion();
String region = jobStatusRequest.getRegion();
String targetedRegions = jobStatusRequest.getTargetedRegions();

String kafkaTopicName = Version.composeKafkaTopic(store, versionNumber);
Admin.OfflinePushStatusInfo offlineJobStatus = admin.getOffLinePushStatus(
cluster,
kafkaTopicName,
Optional.ofNullable(incrementalPushVersion),
region,
targetedRegions);
responseObject.setStatus(offlineJobStatus.getExecutionStatus().toString());
responseObject.setStatusUpdateTimestamp(offlineJobStatus.getStatusUpdateTimestamp());
responseObject.setStatusDetails(offlineJobStatus.getStatusDetails());
responseObject.setExtraInfo(offlineJobStatus.getExtraInfo());
responseObject.setExtraInfoUpdateTimestamp(offlineJobStatus.getExtraInfoUpdateTimestamp());
responseObject.setExtraDetails(offlineJobStatus.getExtraDetails());
responseObject.setUncompletedPartitions(offlineJobStatus.getUncompletedPartitions());

responseObject.setCluster(cluster);
responseObject.setName(store);
responseObject.setVersion(versionNumber);
return responseObject;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ protected static X509Certificate getCertificate(Request request) {
* A function that would check whether a principal has access to a resource.
*/
@FunctionalInterface
interface ResourceAclCheck {
public interface ResourceAclCheck {
boolean apply(X509Certificate clientCert, String resource, DynamicAccessController accessController)
throws AclException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controller.AuditInfo;
import com.linkedin.venice.controller.VeniceControllerApiHandler;
import com.linkedin.venice.controller.spark.VeniceSparkServerFactory;
import com.linkedin.venice.controller.stats.SparkServerStats;
import com.linkedin.venice.controllerapi.ControllerRoute;
Expand Down Expand Up @@ -169,6 +170,7 @@ public class AdminSparkServer extends AbstractVeniceService {

private final boolean disableParentRequestTopicForStreamPushes;
private final PubSubTopicRepository pubSubTopicRepository;
private final VeniceControllerApiHandler veniceControllerApiHandler;

public AdminSparkServer(
int port,
Expand Down Expand Up @@ -206,6 +208,19 @@ public AdminSparkServer(
this.disabledRoutes = disabledRoutes;
this.disableParentRequestTopicForStreamPushes = disableParentRequestTopicForStreamPushes;
this.pubSubTopicRepository = pubSubTopicRepository;
this.veniceControllerApiHandler = new VeniceControllerApiHandler(
port,
admin,
metricsRepository,
clusters,
enforceSSL,
sslConfig,
checkReadMethodForKafka,
accessController,
disabledRoutes,
jettyConfigOverrides,
disableParentRequestTopicForStreamPushes,
pubSubTopicRepository);
}

@Override
Expand Down Expand Up @@ -280,7 +295,7 @@ public boolean startInner() throws Exception {
// Build all different routes
ControllerRoutes controllerRoutes = new ControllerRoutes(sslEnabled, accessController, pubSubTopicRepository);
StoresRoutes storesRoutes = new StoresRoutes(sslEnabled, accessController, pubSubTopicRepository);
JobRoutes jobRoutes = new JobRoutes(sslEnabled, accessController);
JobRoutes jobRoutes = new JobRoutes(sslEnabled, accessController, veniceControllerApiHandler);
SkipAdminRoute skipAdminRoute = new SkipAdminRoute(sslEnabled, accessController);
CreateVersion createVersion = new CreateVersion(
sslEnabled,
Expand Down
Loading

0 comments on commit 408514f

Please sign in to comment.