Skip to content

Commit

Permalink
[GOBBLIN-2082] Change setpermissioncommitstep to pre-create folders b…
Browse files Browse the repository at this point in the history
…efore commit (#3966)

Change setpermissioncommitstep to pre-create folders so that recursive rename wont run into race conditions
  • Loading branch information
Will-Lo authored Jun 10, 2024
1 parent d8d8e58 commit 8c3f326
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
import org.apache.gobblin.util.commit.SetPermissionCommitStep;
import org.apache.gobblin.util.commit.CreateAndSetDirectoryPermissionCommitStep;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -51,7 +50,7 @@
/**
* A dataset that based on Manifest. We expect the Manifest contains the list of all the files for this dataset.
* At first phase, we only support copy across different clusters to the same location. (We can add more feature to support rename in the future)
* We will delete the file on target if it's listed in the manifest and not exist on source when {@link ManifestBasedDataset.DELETE_FILE_NOT_EXIST_ON_SOURCE} set to be true
* We will delete the file on target if it's listed in the manifest and not exist on source when {@link ManifestBasedDataset#DELETE_FILE_NOT_EXIST_ON_SOURCE} set to be true
*/
@Slf4j
public class ManifestBasedDataset implements IterableCopyableDataset {
Expand Down Expand Up @@ -162,9 +161,9 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
}
}
Properties props = new Properties();
props.setProperty(SetPermissionCommitStep.STOP_ON_ERROR_KEY, "true");
CommitStep setPermissionCommitStep = new SetPermissionCommitStep(targetFs, ancestorOwnerAndPermissions, props);
copyEntities.add(new PostPublishStep(datasetURN(), Maps.newHashMap(), setPermissionCommitStep, 1));
props.setProperty(CreateAndSetDirectoryPermissionCommitStep.STOP_ON_ERROR_KEY, "true");
CommitStep setPermissionCommitStep = new CreateAndSetDirectoryPermissionCommitStep(targetFs, ancestorOwnerAndPermissions, props);
copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), setPermissionCommitStep, 1));

if (!toDelete.isEmpty()) {
//todo: add support sync for empty dir
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,12 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
import org.apache.gobblin.data.management.dataset.DatasetUtils;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.commit.SetPermissionCommitStep;
import org.apache.gobblin.util.commit.CreateAndSetDirectoryPermissionCommitStep;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;


Expand Down Expand Up @@ -177,9 +176,9 @@ protected Collection<? extends CopyEntity> getCopyableFilesImpl(CopyConfiguratio

if (this.useNewPreserveLogic) {
Properties props = new Properties();
props.setProperty(SetPermissionCommitStep.STOP_ON_ERROR_KEY, "true");
CommitStep step = new SetPermissionCommitStep(targetFs, ancestorOwnerAndPermissions, props);
copyEntities.add(new PostPublishStep(datasetURN(), Maps.newHashMap(), step, 1));
props.setProperty(CreateAndSetDirectoryPermissionCommitStep.STOP_ON_ERROR_KEY, "true");
CommitStep step = new CreateAndSetDirectoryPermissionCommitStep(targetFs, ancestorOwnerAndPermissions, props);
copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), step, 1));
}

return copyEntities;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
import org.apache.gobblin.data.management.copy.OwnerAndPermission;

/**
* An implementation of {@link CommitStep} for setting any file permissions.
* An implementation of {@link CommitStep} for creating directories and their associated permissions before commit
* Necessary when creating large file paths e.g. Manifest distcp where multiple threads are creating directories at the same time,
* which can lead to some race conditions described in {@link org.apache.gobblin.util.HadoopUtils#unsafeRenameIfNotExists(FileSystem, Path, Path)}
* Current implementation only sets permissions, but it is capable of setting owner and group as well.
*/
@Slf4j
public class SetPermissionCommitStep implements CommitStep {
public class CreateAndSetDirectoryPermissionCommitStep implements CommitStep {
@Getter
Map<String, OwnerAndPermission> pathAndPermissions;
private final URI fsUri;
Expand All @@ -47,7 +49,7 @@ public class SetPermissionCommitStep implements CommitStep {
public static final String DEFAULT_STOP_ON_ERROR = "false";
private boolean isCompleted = false;

public SetPermissionCommitStep(FileSystem targetFs, Map<String, OwnerAndPermission> pathAndPermissions,
public CreateAndSetDirectoryPermissionCommitStep(FileSystem targetFs, Map<String, OwnerAndPermission> pathAndPermissions,
Properties props) {
this.pathAndPermissions = pathAndPermissions;
this.fsUri = targetFs.getUri();
Expand All @@ -66,9 +68,13 @@ public void execute() throws IOException {
for (Map.Entry<String, OwnerAndPermission> entry : pathAndPermissions.entrySet()) {
Path path = new Path(entry.getKey());
try {
log.info("Setting permission {} on path {}", entry.getValue().getFsPermission(), path);
fs.setPermission(path, entry.getValue().getFsPermission());
// TODO : we can also set owner and group here.
if (!fs.exists(path)) {
log.info("Creating path {} with permission {}", path, entry.getValue().getFsPermission());
fs.mkdirs(path, entry.getValue().getFsPermission());
} else {
log.info("Setting permission {} on existing path {}", entry.getValue().getFsPermission(), path);
fs.setPermission(path, entry.getValue().getFsPermission());
}
} catch (AccessControlException e) {
log.warn("Error while setting permission on " + path, e);
if (this.stopOnError) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.ManifestBasedDataset;
import org.apache.gobblin.data.management.copy.ManifestBasedDatasetFinder;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.util.commit.SetPermissionCommitStep;
import org.apache.gobblin.util.commit.CreateAndSetDirectoryPermissionCommitStep;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -92,8 +92,9 @@ public void testFindFiles() throws IOException, URISyntaxException {
Assert.assertTrue(fileSets.hasNext());
FileSet<CopyEntity> fileSet = fileSets.next();
Assert.assertEquals(fileSet.getFiles().size(), 3); // 2 files to copy + 1 post publish step
Assert.assertTrue(((PostPublishStep) fileSet.getFiles().get(2)).getStep() instanceof SetPermissionCommitStep);
SetPermissionCommitStep step = (SetPermissionCommitStep) ((PostPublishStep) fileSet.getFiles().get(2)).getStep();
Assert.assertTrue(((PrePublishStep) fileSet.getFiles().get(2)).getStep() instanceof CreateAndSetDirectoryPermissionCommitStep);
CreateAndSetDirectoryPermissionCommitStep
step = (CreateAndSetDirectoryPermissionCommitStep) ((PrePublishStep) fileSet.getFiles().get(2)).getStep();

Assert.assertEquals(step.getPathAndPermissions().keySet().size(), 1); // SetPermissionCommitStep only applies to ancestors
Mockito.verify(manifestReadFs, Mockito.times(1)).exists(manifestPath);
Expand Down Expand Up @@ -136,7 +137,7 @@ public void testFindFilesWithDifferentPermissions() throws IOException, URISynta
Assert.assertTrue(fileSets.hasNext());
FileSet<CopyEntity> fileSet = fileSets.next();
Assert.assertEquals(fileSet.getFiles().size(), 3); // 2 files to copy + 1 post publish step
Assert.assertTrue(((PostPublishStep)fileSet.getFiles().get(2)).getStep() instanceof SetPermissionCommitStep);
Assert.assertTrue(((PrePublishStep)fileSet.getFiles().get(2)).getStep() instanceof CreateAndSetDirectoryPermissionCommitStep);

}
}
Expand Down Expand Up @@ -196,7 +197,7 @@ public void testFindDatasetEmptyRoot() throws Exception {
Assert.assertTrue(fileSets.hasNext());
FileSet<CopyEntity> fileSet = fileSets.next();
Assert.assertEquals(fileSet.getFiles().size(), 2); // 1 files to copy + 1 post publish step
Assert.assertTrue(((PostPublishStep) fileSet.getFiles().get(1)).getStep() instanceof SetPermissionCommitStep);
Assert.assertTrue(((PrePublishStep) fileSet.getFiles().get(1)).getStep() instanceof CreateAndSetDirectoryPermissionCommitStep);
}

private void setSourceAndDestFsMocks(FileSystem sourceFs, FileSystem destFs, Path manifestPath, FileSystem manifestReadFs) throws IOException, URISyntaxException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@


/**
* Test for {@link SetPermissionCommitStep}.
* Test for {@link CreateAndSetDirectoryPermissionCommitStep}.
*/
@Test(groups = { "gobblin.commit" })
public class SetPermissionCommitStepTest {
public class CreateAndSetDirectoryPermissionCommitStepTest {
private static final String ROOT_DIR = "set-permission-commit-step-test";

private FileSystem fs;
private SetPermissionCommitStep step;
private CreateAndSetDirectoryPermissionCommitStep step;
Path dir1;
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);

Expand All @@ -53,13 +53,12 @@ public void setUp() throws IOException {
this.fs.delete(new Path(ROOT_DIR), true);

dir1 = new Path(ROOT_DIR, "dir1");
this.fs.mkdirs(dir1);

OwnerAndPermission ownerAndPermission = new OwnerAndPermission("owner", "group", permission);
Map<String, OwnerAndPermission> pathAndPermissions = new HashMap<>();
pathAndPermissions.put(dir1.toString(), ownerAndPermission);

this.step = new SetPermissionCommitStep(this.fs, pathAndPermissions, new Properties());
this.step = new CreateAndSetDirectoryPermissionCommitStep(this.fs, pathAndPermissions, new Properties());
}

@AfterClass
Expand All @@ -69,8 +68,8 @@ public void tearDown() throws IOException {

@Test
public void testExecute() throws IOException {
Assert.assertNotEquals(this.fs.getFileStatus(dir1).getPermission(), permission);
this.step.execute();
Assert.assertEquals(this.fs.exists(dir1), true);
Assert.assertEquals(this.fs.getFileStatus(dir1).getPermission(), permission);
}
}

0 comments on commit 8c3f326

Please sign in to comment.