Skip to content

Commit

Permalink
storage: Add exec command to HBaseMain #TASK-4372
Browse files Browse the repository at this point in the history
  • Loading branch information
j-coll committed Jul 6, 2023
1 parent 8b37aee commit 9aa9345
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,24 +122,52 @@ protected static ObjectMap getArgsMap(String[] args, int firstIdx, String... key
ObjectMap argsMap;
argsMap = new ObjectMap();
int i = firstIdx;
int offset;
while (i < args.length) {
String key = args[i];
while (key.startsWith("-")) {
key = key.substring(1);
String inputKey = args[i];
offset = 1;
String key;
String value;
if (inputKey.startsWith("--")) {
key = inputKey.substring(2);
value = safeArg(args, i + 1);
offset++; // one extra value read
} else if (inputKey.startsWith("-")) {
key = inputKey.substring(1, 2);
value = inputKey.substring(2);
if (value.isEmpty()) {
value = safeArg(args, i + 1);
offset++; // one extra value read
}
} else {
throw new IllegalArgumentException("Unknown argument '" + inputKey + "'");
}
if (value == null || value.startsWith("-")) {
value = "true";
offset--; // extra value discarded
}
if (!acceptedKeys.isEmpty()) {
if (!acceptedKeys.contains(key)) {
throw new IllegalArgumentException("Unknown argument '" + args[i] + "'");
throw new IllegalArgumentException("Unknown argument '" + inputKey + "'");
}
}
String value = safeArg(args, i + 1);
if (value == null || value.startsWith("-")) {
argsMap.put(key, true);
i += 1;

if (key.equals("D")) {
ObjectMap dynamic = (ObjectMap) argsMap.computeIfAbsent(key, k -> new ObjectMap());
String[] split = value.split("=", 2);
if (split.length != 2) {
throw new IllegalArgumentException("Expected '-D key=value'");
}
if (dynamic.put(split[0], split[1]) != null) {
throw new IllegalArgumentException("Duplicated argument '-D " + value + "'");
}
} else {
argsMap.put(key, value);
i += 2;
if (argsMap.put(key, value) != null) {
throw new IllegalArgumentException("Duplicated param '" + inputKey + "'");
}
}
i += offset;

}
return argsMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,21 @@
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.tools.ant.types.Commandline;
import org.opencb.commons.ProgressLogger;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.opencga.core.common.TimeUtils;
import org.opencb.opencga.core.config.storage.StorageConfiguration;
import org.opencb.opencga.storage.hadoop.utils.HBaseManager;
import org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageEngine;
import org.opencb.opencga.storage.hadoop.variant.executors.MRExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
Expand All @@ -36,6 +43,8 @@ public class HBaseMain extends AbstractMain {
public static final String SNAPSHOT_TABLE = "snapshot-table";
public static final String DELETE_SNAPSHOTS = "delete-snapshots";
public static final String CLONE_SNAPSHOTS = "clone-snapshots";
public static final String EXPORT_SNAPSHOTS = "export-snapshots";
public static final String EXEC = "exec";
public static final String DISABLE_TABLE = "disable-table";
public static final String DROP_TABLE = "drop-table";
public static final String ENABLE_TABLE = "enable-table";
Expand Down Expand Up @@ -137,6 +146,25 @@ public void run(String[] args) throws Exception {
);
break;
}
case EXPORT_SNAPSHOTS: {
ObjectMap argsMap = getArgsMap(args, 1, "dryRun", "snapshot", "copy-to", "copy-to-local", "copy-from", "target",
"mappers", "overwrite", "D");
exportSnapshot(null,
argsMap.getString("snapshot"),
argsMap.getString("copy-to"),
argsMap.getBoolean("copy-to-local"),
argsMap.getString("copy-from"),
argsMap.getString("target"),
argsMap.getString("mappers"),
argsMap.getBoolean("overwrite"),
argsMap.getBoolean("dryRun"),
argsMap);
break;
}
case EXEC: {
exec(getArg(args, 1), Arrays.asList(args).subList(2, args.length));
break;
}
case DISABLE_TABLE: {
ObjectMap argsMap = getArgsMap(args, 2, "dryRun");
disableTables(getArg(args, 1), argsMap.getBoolean("dryRun"));
Expand Down Expand Up @@ -186,6 +214,25 @@ public void run(String[] args) throws Exception {
+ "[--onExistingTables [fail|skip|drop] ]");
System.out.println(" Clone all snapshots into tables matching the regex. "
+ "Generated tables can have a table prefix change.");
// System.out.println(" " + EXPORT_SNAPSHOTS + " \n"
// + " --dryRun <arg> Dry run.\n"
// + " --snapshot <arg> Snapshot to restore.\n"
// + " --copyTo <arg> Remote destination hdfs://\n"
// + " --copyFrom <arg> Input folder hdfs:// (default hbase.rootdir)\n"
// + " --target <arg> Target name for the snapshot.\n"
//// + " --no-checksum-verify Do not verify checksum, use name+length only.\n"
//// + " --no-target-verify Do not verify the integrity of the exported snapshot.\n"
// + " --overwrite Rewrite the snapshot manifest if already exists.\n"
//// + " --chuser <arg> Change the owner of the files to the specified one.\n"
//// + " --chgroup <arg> Change the group of the files to the specified one.\n"
//// + " --chmod <arg> Change the permission of the files to the specified one.\n"
//// + " --bandwidth <arg> Limit bandwidth to this value in MB/second.\n"
// + " --mappers <arg> Number of mappers to use during the copy (mapreduce.job.maps).\n"
// + " -Dkey=value Other key-value fields");
// System.out.println(" Clone all snapshots into tables matching the regex. "
// + "Generated tables can have a table prefix change.");
System.out.println(" " + EXEC + "[hadoop|yarn|hbase|hdfs]");
System.out.println(" Execute a MR job on the hadoop cluster. Use \"exec yarn jar ....\"");
System.out.println(" " + DISABLE_TABLE + " <table-name-regex> [--dryRun]");
System.out.println(" Disable all tables matching the regex.");
System.out.println(" " + DROP_TABLE + " <table-name-regex> [--dryRun]");
Expand All @@ -199,6 +246,74 @@ public void run(String[] args) throws Exception {

}

private void exec(String tool, List<String> args) throws Exception {
Path opencgaHome = Paths.get(System.getProperty("app.home"));
String storageConfigurationPath = opencgaHome.resolve("conf").resolve("storage-configuration.yml").toString();
StorageConfiguration storageConfiguration;
try (FileInputStream is = new FileInputStream(storageConfigurationPath)) {
storageConfiguration = StorageConfiguration.load(is);
}

HadoopVariantStorageEngine engine = new HadoopVariantStorageEngine();
engine.setConfiguration(storageConfiguration, HadoopVariantStorageEngine.STORAGE_ENGINE_ID, "");

MRExecutor mrExecutor = engine.getMRExecutor();
mrExecutor.run(tool, args.toArray(new String[0]));
}

private void exportSnapshot(String storageConfigurationPath, String snapshot, String copyTo, boolean copyToLocal,
String copyFrom, String target,
String mappers, boolean overwrite, boolean dryRun, ObjectMap options) throws Exception {
if (storageConfigurationPath == null) {
Path opencgaHome = Paths.get(System.getProperty("app.home"));
storageConfigurationPath = opencgaHome.resolve("conf").resolve("storage-configuration.yml").toString();
}
StorageConfiguration storageConfiguration;
try (FileInputStream is = new FileInputStream(storageConfigurationPath)) {
storageConfiguration = StorageConfiguration.load(is);
}

List<String> args = new LinkedList<>();
args.add(org.apache.hadoop.hbase.snapshot.ExportSnapshot.class.getName());
for (Map.Entry<String, Object> entry : options.get("D", ObjectMap.class, new ObjectMap()).entrySet()) {
args.add("-D" + entry.getKey() + "=" + entry.getValue().toString());
}
args.add("--snapshot");
args.add(snapshot);
if (StringUtils.isNotEmpty(copyTo)) {
args.add("--copy-to");
args.add(copyTo);
} else if (copyToLocal) {
args.add("--copy-to");
args.add(hBaseManager.getConf().get(HConstants.HBASE_DIR));
}
if (StringUtils.isNotEmpty(copyFrom)) {
args.add("--copy-from");
args.add(copyFrom);
}
if (StringUtils.isNotEmpty(target)) {
args.add("--target");
args.add(target);
}
if (overwrite) {
args.add("--overwrite");
}
if (StringUtils.isNotEmpty(mappers)) {
args.add("--mappers");
args.add(mappers);
}

if (dryRun) {
System.out.println("hbase " + Commandline.toString(args.toArray(new String[0])));
} else {
HadoopVariantStorageEngine engine = new HadoopVariantStorageEngine();
engine.setConfiguration(storageConfiguration, HadoopVariantStorageEngine.STORAGE_ENGINE_ID, "");

MRExecutor mrExecutor = engine.getMRExecutor();
mrExecutor.run("hbase", args.toArray(new String[0]));
}
}

private void regionsPerTable(String tableNameStr) throws Exception {
// TableName tableName = getTable(tableNameStr);
// hBaseManager.act(tableName.getNameAsString(), (table, admin) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.io.IOException;
import java.net.URI;

import static org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageOptions.MR_HADOOP_BIN;
import static org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageOptions.INTERMEDIATE_HDFS_DIRECTORY;

/**
Expand Down Expand Up @@ -96,29 +95,11 @@ public URI preLoad(URI input, URI output) throws StorageEngineException {

protected void load(URI input, URI outdir, int studyId, int fileId) throws StorageEngineException {
URI vcfMeta = URI.create(VariantReaderUtils.getMetaFromTransformedFile(input.toString()));

String hadoopRoute = options.getString(MR_HADOOP_BIN.key(), MR_HADOOP_BIN.defaultValue());
String jar = MRExecutor.getJarWithDependencies(getOptions());

Class execClass = ArchiveDriver.class;
String executable = hadoopRoute + " jar " + jar + " " + execClass.getName();
String args = ArchiveDriver.buildCommandLineArgs(input, vcfMeta,
dbAdaptor.getVariantTable(), getArchiveTable(), studyId,
fileId, options);

long startTime = System.currentTimeMillis();
logger.info("------------------------------------------------------");
logger.info("Loading file {} into archive table '{}'", fileId, getArchiveTable());
logger.debug(executable + " " + args);
logger.info("------------------------------------------------------");
int exitValue = mrExecutor.run(executable, Commandline.translateCommandline(args));
logger.info("------------------------------------------------------");
logger.info("Exit value: {}", exitValue);
logger.info("Total time: {}s", (System.currentTimeMillis() - startTime) / 1000.0);
if (exitValue != 0) {
throw new StorageEngineException("Error loading file " + input + " into archive table \""
+ getArchiveTable() + "\"");
}
mrExecutor.run(ArchiveDriver.class, Commandline.translateCommandline(args),
"Loading file " + fileId + " into archive table '" + getArchiveTable() + "'");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
*
* @author Jacobo Coll &lt;[email protected]&gt;
*/
public class MRExecutorFactory {
public final class MRExecutorFactory {

private MRExecutorFactory() {
}

public static MRExecutor getMRExecutor(ObjectMap options) throws StorageEngineException {
MRExecutor mrExecutor;
Expand Down

0 comments on commit 9aa9345

Please sign in to comment.