Skip to content

Commit

Permalink
added implementation of peridic refresh for source control metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
adrikagupta committed May 6, 2024
1 parent b3b2cc1 commit 382f9d1
Show file tree
Hide file tree
Showing 22 changed files with 1,226 additions and 256 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import io.cdap.cdap.internal.app.services.RunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.RunRecordMonitorService;
import io.cdap.cdap.internal.app.services.ScheduledRunRecordCorrectorService;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService;
import io.cdap.cdap.internal.app.store.DefaultStore;
import io.cdap.cdap.internal.bootstrap.guice.BootstrapModules;
import io.cdap.cdap.internal.capability.CapabilityModule;
Expand Down Expand Up @@ -214,6 +215,7 @@ protected void configure() {
bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class);
bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON);

Multibinder<String> servicesNamesBinder =
Multibinder.newSetBinder(binder(), String.class,
Expand Down Expand Up @@ -258,6 +260,7 @@ protected void configure() {
bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class);
bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON);

Multibinder<String> servicesNamesBinder =
Multibinder.newSetBinder(binder(), String.class,
Expand Down Expand Up @@ -315,6 +318,7 @@ protected void configure() {
bind(StorageProviderNamespaceAdmin.class)
.to(DistributedStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON);

bind(ProgramRunDispatcher.class).to(RemoteProgramRunDispatcher.class)
.in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import io.cdap.cdap.security.impersonation.UGIProvider;
import io.cdap.cdap.security.spi.authorization.AccessEnforcer;
import io.cdap.cdap.security.spi.authorization.ContextAccessEnforcer;
import io.cdap.cdap.sourcecontrol.guice.SourceControlModule;
import io.cdap.cdap.store.DefaultOwnerStore;
import org.apache.twill.filesystem.LocationFactory;

Expand Down Expand Up @@ -185,6 +186,10 @@ protected void configure() {
bind(MetadataAdmin.class).to(DefaultMetadataAdmin.class);
expose(MetadataAdmin.class);

//TODO(adrika): cleanup this dependency later
// This is needed because there is a transitive dependency on source control operation
// runner which is not actually being used here
install(new SourceControlModule());
bindPreviewRunner(binder());
expose(PreviewRunner.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.app.store;

import io.cdap.cdap.proto.SourceControlMetadataRecord;
import java.util.List;

/**
* Represents a response containing a list of source control metadata records, next page token and
* last refresh time.
*/

public class ListSourceControlMetadataResponse {

private final List<SourceControlMetadataRecord> apps;
private final String nextPageToken;
private final Long lastRefreshTime;

/**
* Constructs a ListSourceControlMetadataResponse object.
*
* @param apps The list of source control metadata records.
* @param nextPageToken The token for fetching the next page of results.
* @param lastRefreshTime The timestamp of the last refresh operation.
*/
public ListSourceControlMetadataResponse(List<SourceControlMetadataRecord> apps,
String nextPageToken, Long lastRefreshTime) {
this.apps = apps;
this.nextPageToken = nextPageToken;
this.lastRefreshTime = lastRefreshTime;
}

public List<SourceControlMetadataRecord> getApps() {
return apps;
}

public String getNextPageToken() {
return nextPageToken;
}

public Long getLastRefreshTime() {
return lastRefreshTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.app.store;

import io.cdap.cdap.proto.SourceControlMetadataRecord;

/**
* Represents a response containing a single source control metadata record and last refresh time.
*/
public class SingleSourceControlMetadataResponse {

private final SourceControlMetadataRecord app;
private final Long lastRefreshTime;

/**
* Constructs a SingleSourceControlMetadataResponse object.
*
* @param app The source control metadata record.
* @param lastRefreshTime The timestamp of the last refresh operation.
*/
public SingleSourceControlMetadataResponse(SourceControlMetadataRecord app,
Long lastRefreshTime) {
this.app = app;
this.lastRefreshTime = lastRefreshTime;
}

public SourceControlMetadataRecord getApp() {
return app;
}

public Long getLastRefreshTime() {
return lastRefreshTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,6 @@ int scanAppSourceControlMetadata(ScanSourceControlMetadataRequest request,
int scanRepositorySourceControlMetadata(ScanSourceControlMetadataRequest request,
Consumer<SourceControlMetadataRecord> consumer);

void updateSourceControlMeta(ApplicationReference appRef, String repoFileHash);

/**
* Returns a Map of {@link ApplicationMeta} for the given set of {@link ApplicationId}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import io.cdap.cdap.api.security.AccessException;
import io.cdap.cdap.app.runtime.ProgramRuntimeService;
import io.cdap.cdap.app.store.ApplicationFilter;
import io.cdap.cdap.app.store.ListSourceControlMetadataResponse;
import io.cdap.cdap.app.store.ScanApplicationsRequest;
import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest;
import io.cdap.cdap.app.store.SingleSourceControlMetadataResponse;
import io.cdap.cdap.common.ApplicationNotFoundException;
import io.cdap.cdap.common.ArtifactAlreadyExistsException;
import io.cdap.cdap.common.BadRequestException;
Expand Down Expand Up @@ -129,7 +131,6 @@ public class AppLifecycleHttpHandler extends AbstractAppLifecycleHttpHandler {
* Key in json paginated applications list response.
*/
public static final String APP_LIST_PAGINATED_KEY = "applications";
public static final String APP_LIST_PAGINATED_KEY_SHORT = "apps";

/**
* Runtime program service for running and managing programs.
Expand Down Expand Up @@ -319,27 +320,29 @@ public void getAllNamespaceSourceControlMetadata(FullHttpRequest request, HttpRe
@QueryParam("filter") String filter
) throws Exception {
validateNamespace(namespaceId);

JsonPaginatedListResponder.respond(GSON, responder, APP_LIST_PAGINATED_KEY_SHORT,
jsonListResponder -> {
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = applicationLifecycleService.scanSourceControlMetadata(
scanRequest, batchSize,
scmMetaRecord -> {
jsonListResponder.send(scmMetaRecord);
lastRecord.set(scmMetaRecord);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
return !pageLimitReached || record == null ? null : record.getName();
});
List<SourceControlMetadataRecord> apps = new ArrayList<>();
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = applicationLifecycleService.scanSourceControlMetadata(
scanRequest, batchSize,
scmMetaRecord -> {
apps.add(scmMetaRecord);
lastRecord.set(scmMetaRecord);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
String nextPageToken = !pageLimitReached || record == null ? null :
record.getName();
Long lastRefreshTime = applicationLifecycleService.getLastRefreshTime(namespaceId);
ListSourceControlMetadataResponse response = new ListSourceControlMetadataResponse(apps,
nextPageToken, lastRefreshTime);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response));
}

/**
Expand All @@ -357,10 +360,11 @@ public void getNamespaceSourceControlMetadata(HttpRequest request, HttpResponder
@PathParam("namespace-id") final String namespaceId,
@PathParam("app-id") final String appName) throws Exception {
validateApplicationId(namespaceId, appName);

responder.sendJson(HttpResponseStatus.OK,
GSON.toJson(applicationLifecycleService.getSourceControlMetadataRecord(
new ApplicationReference(namespaceId, appName))));
SourceControlMetadataRecord app = applicationLifecycleService.getSourceControlMetadataRecord(
new ApplicationReference(namespaceId, appName));
Long lastRefreshTime = applicationLifecycleService.getLastRefreshTime(namespaceId);
SingleSourceControlMetadataResponse response = new SingleSourceControlMetadataResponse(app, lastRefreshTime);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response));
}

private ScanApplicationsRequest getScanRequest(String namespaceId, String artifactVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import com.google.gson.JsonSyntaxException;
import com.google.inject.Inject;
import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.app.store.ListSourceControlMetadataResponse;
import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.ForbiddenException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.AppFabric;
Expand Down Expand Up @@ -59,6 +59,8 @@
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
Expand Down Expand Up @@ -156,29 +158,29 @@ public void listAllApplications(FullHttpRequest request, HttpResponder responder
@QueryParam("filter") String filter) throws Exception {
checkSourceControlFeatureFlag();
validateNamespaceId(namespaceId);
JsonPaginatedListResponder.respond(GSON, responder, APP_LIST_PAGINATED_KEY_SHORT,
jsonListResponder -> {
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = sourceControlService.scanRepoMetadata(
scanRequest, batchSize,
record -> {
jsonListResponder.send(record);
lastRecord.set(record);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
} catch (NotFoundException e) {
responder.sendString(HttpResponseStatus.NOT_FOUND, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
return !pageLimitReached || record == null ? null :
record.getName();
});
List<SourceControlMetadataRecord> apps = new ArrayList<>();
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = sourceControlService.scanRepoMetadata(
scanRequest, batchSize,
record -> {
apps.add(record);
lastRecord.set(record);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
String nextPageToken = !pageLimitReached || record == null ? null :
record.getName();
Long lastRefreshTime = sourceControlService.getLastRefreshTime(namespaceId);
ListSourceControlMetadataResponse response = new ListSourceControlMetadataResponse(apps,
nextPageToken, lastRefreshTime);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.cdap.cdap.common.security.HttpsEnabler;
import io.cdap.cdap.features.Feature;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataMigrationService;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService;
import io.cdap.cdap.internal.app.store.AppMetadataStore;
import io.cdap.cdap.internal.bootstrap.BootstrapService;
import io.cdap.cdap.internal.credential.CredentialProviderService;
Expand Down Expand Up @@ -97,6 +98,7 @@ public class AppFabricServer extends AbstractIdleService {
private final RepositoryCleanupService repositoryCleanupService;
private final OperationNotificationSubscriberService operationNotificationSubscriberService;
private final SourceControlMetadataMigrationService sourceControlMetadataMigrationService;
private final SourceControlMetadataRefreshService sourceControlMetadataRefreshService;
private final CConfiguration cConf;
private final SConfiguration sConf;
private final boolean sslEnabled;
Expand Down Expand Up @@ -137,7 +139,8 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
SourceControlOperationRunner sourceControlOperationRunner,
RepositoryCleanupService repositoryCleanupService,
OperationNotificationSubscriberService operationNotificationSubscriberService,
SourceControlMetadataMigrationService sourceControlMetadataMigrationService) {
SourceControlMetadataMigrationService sourceControlMetadataMigrationService,
SourceControlMetadataRefreshService sourceControlMetadataRefreshService) {
this.hostname = hostname;
this.discoveryService = discoveryService;
this.handlers = handlers;
Expand Down Expand Up @@ -167,6 +170,7 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
this.repositoryCleanupService = repositoryCleanupService;
this.operationNotificationSubscriberService = operationNotificationSubscriberService;
this.sourceControlMetadataMigrationService = sourceControlMetadataMigrationService;
this.sourceControlMetadataRefreshService = sourceControlMetadataRefreshService;
}

/**
Expand Down Expand Up @@ -199,7 +203,8 @@ protected void startUp() throws Exception {
sourceControlOperationRunner.start(),
repositoryCleanupService.start(),
operationNotificationSubscriberService.start(),
sourceControlMetadataMigrationService.start()
sourceControlMetadataMigrationService.start(),
sourceControlMetadataRefreshService.start()
));
Futures.allAsList(futuresList).get();

Expand Down Expand Up @@ -262,6 +267,7 @@ protected void shutDown() throws Exception {
namespaceCredentialProviderService.stopAndWait();
operationNotificationSubscriberService.stopAndWait();
sourceControlMetadataMigrationService.stopAndWait();
sourceControlMetadataRefreshService.stopAndWait();
}

private Cancellable startHttpService(NettyHttpService httpService) throws Exception {
Expand Down
Loading

0 comments on commit 382f9d1

Please sign in to comment.