Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support config file to init analyser and bulkloader #35

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package com.xiaomi.infra.pegasus.spark.analyser;

import static com.xiaomi.infra.pegasus.spark.FDSConfig.loadFDSConfig;
import static com.xiaomi.infra.pegasus.spark.HDFSConfig.loadHDFSConfig;

import com.xiaomi.infra.pegasus.spark.CommonConfig;
import com.xiaomi.infra.pegasus.spark.FDSConfig;
import com.xiaomi.infra.pegasus.spark.HDFSConfig;
import com.xiaomi.infra.pegasus.spark.PegasusSparkException;
import java.util.Objects;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;

/**
* ColdBackupConfig is used when you manipulate the cold-backup data. <br>
Expand All @@ -17,6 +24,8 @@ public class ColdBackupConfig extends CommonConfig implements Config {

private static final int DEFAULT_FILE_OPEN_COUNT = 50;
private static final long DEFAULT_READ_AHEAD_SIZE_MB = 1;
private static final String DEFAULT_POLICY_NAME = "one_time";
private static final int DEFAULT_DATA_VERSION = 1;

private long readAheadSize;
private int fileOpenCount;
Expand All @@ -34,6 +43,46 @@ public ColdBackupConfig(FDSConfig fdsConfig, String clusterName, String tableNam
setReadOptions(DEFAULT_FILE_OPEN_COUNT, DEFAULT_READ_AHEAD_SIZE_MB);
}

public static ColdBackupConfig loadConfig() throws PegasusSparkException, ConfigurationException {
return loadConfig(RemoteFSType.FDS);
}

public static ColdBackupConfig loadConfig(CommonConfig.RemoteFSType remoteFSType)
throws ConfigurationException, PegasusSparkException {
XMLConfiguration configuration =
new XMLConfiguration(
Objects.requireNonNull(
ColdBackupConfig.class.getClassLoader().getResource("core-site.xml")));

long readAheadSize =
configuration.getLong("pegasus.analyser.readAheadSize", DEFAULT_READ_AHEAD_SIZE_MB);
int fileOpenCount =
configuration.getInt("pegasus.analyser.fileMaxOpenCount", DEFAULT_FILE_OPEN_COUNT);
DataVersion version =
configuration.getInt("pegasus.analyser.version", DEFAULT_DATA_VERSION) == 1
? new DataVersion1()
: new DataVersion2();
String policyName = configuration.getString("pegasus.analyser.policy", DEFAULT_POLICY_NAME);
String coldBackupTime = configuration.getString("pegasus.analyser.timestamp");
String clusterName = configuration.getString("pegasus.analyser.cluster");
String tableName = configuration.getString("pegasus.analyser.table");

ColdBackupConfig coldBackupConfig;
if (remoteFSType == RemoteFSType.FDS) {
coldBackupConfig = new ColdBackupConfig(loadFDSConfig(configuration), clusterName, tableName);
} else if (remoteFSType == RemoteFSType.HDFS) {
coldBackupConfig =
new ColdBackupConfig(loadHDFSConfig(configuration), clusterName, tableName);
} else {
throw new PegasusSparkException("Only support fds and hdfs");
}
return coldBackupConfig
.setReadOptions(fileOpenCount, readAheadSize)
.setColdBackupTime(coldBackupTime)
.setPolicyName(policyName)
.setDataVersion(version);
}

@Override
public DataType getDataType() {
return dataType;
Expand Down
35 changes: 35 additions & 0 deletions pegasus-spark-analyser/src/main/resources/core-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<configuration>
<property>
<name>fs.fds.impl</name>
<value>com.xiaomi.infra.pegasus.spark.thirdparty.galaxy.hadoop.fs.FDSFileSystem</value>
</property>

<fs>
<fds>
<key>key</key>
<secret>secret</secret>
<bucket>bucket</bucket>
<endpoint>endpoint</endpoint>
<port>80</port>
</fds>

<hdfs>
<url>hdfs://</url>
<port>9001</port>
</hdfs>
</fs>

<pegasus>
<analyser>
<cluster>cluster</cluster>
<table>table</table>
<!--follow is optional-->
<readAheadSize>1</readAheadSize>
<fileMaxOpenCount>50</fileMaxOpenCount>
<version>1</version>
<policy>one_time</policy>
<timestamp>2019-01-09</timestamp>
</analyser>
</pegasus>

</configuration>
19 changes: 0 additions & 19 deletions pegasus-spark-analyser/src/main/resources/log4j.properties

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.xiaomi.infra.pegasus.spark.JNILibraryLoader
import org.apache.commons.logging.LogFactory
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.rocksdb.RocksDB

/**
* PegasusContext is a serializable container for analysing Pegasus's checkpoint on HDFS.
Expand All @@ -15,7 +14,9 @@ import org.rocksdb.RocksDB
// TODO(jiashuo1) refactor rdd/iterator for adding pegasus online data
class PegasusContext(private val sc: SparkContext) extends Serializable {

def pegasusSnapshotRDD(config: Config): PegasusSnapshotRDD = {
def pegasusSnapshotRDD(
config: Config = ColdBackupConfig.loadConfig()
): PegasusSnapshotRDD = {

new PegasusSnapshotRDD(
this,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.xiaomi.infra.pegasus.spark.analyser.examples.basic

import com.xiaomi.infra.pegasus.spark.FDSConfig
import com.xiaomi.infra.pegasus.spark.CommonConfig.RemoteFSType
import com.xiaomi.infra.pegasus.spark.analyser.ColdBackupConfig
import org.apache.spark.{SparkConf, SparkContext}
import com.xiaomi.infra.pegasus.spark.analyser.CustomImplicits._
Expand All @@ -12,21 +12,11 @@ object CountData {
.setAppName("count pegasus data stored in XiaoMi's FDS")
.setIfMissing("spark.master", "local[1]")

val coldBackupConfig =
new ColdBackupConfig(
new FDSConfig( // if data is in HDFS, pass HDFSConfig()
"accessKey",
"accessSecret",
"bucketName",
"endPoint"
),
"clusterName",
"tableName"
)

var count = 0
val sc = new SparkContext(conf)
.pegasusSnapshotRDD(coldBackupConfig)
.pegasusSnapshotRDD(
ColdBackupConfig.loadConfig(RemoteFSType.FDS)
)
.map(_ => {
count = count + 1
if (count % 10000 == 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.xiaomi.infra.pegasus.spark.analyser.examples.parquet

import com.xiaomi.infra.pegasus.spark.FDSConfig
import com.xiaomi.infra.pegasus.spark.CommonConfig.RemoteFSType
import com.xiaomi.infra.pegasus.spark.analyser.ColdBackupConfig
import com.xiaomi.infra.pegasus.spark.analyser.CustomImplicits._
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
Expand All @@ -16,11 +16,9 @@ object ConvertParquet {
) // this config only for test at local, remove it before deploy in cluster
.getOrCreate()

// if data in HDFS, pass HDFSConfig()
val coldBackupConfig =
new ColdBackupConfig(new FDSConfig("", "", "", "", ""), "onebox", "temp")

val rdd = spark.sparkContext.pegasusSnapshotRDD(coldBackupConfig)
val rdd = spark.sparkContext.pegasusSnapshotRDD(
ColdBackupConfig.loadConfig(RemoteFSType.FDS)
)

// please make sure data can be converted valid string value
val dataFrame = spark.createDataFrame(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package com.xiaomi.infra.pegasus.spark.bulkloader;

import static com.xiaomi.infra.pegasus.spark.FDSConfig.loadFDSConfig;
import static com.xiaomi.infra.pegasus.spark.HDFSConfig.loadHDFSConfig;

import com.xiaomi.infra.pegasus.spark.CommonConfig;
import com.xiaomi.infra.pegasus.spark.FDSConfig;
import com.xiaomi.infra.pegasus.spark.HDFSConfig;
import com.xiaomi.infra.pegasus.spark.PegasusSparkException;
import java.io.Serializable;
import java.util.Objects;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;

/**
* The config used for generating the pegasus data which will be placed as follow":
Expand All @@ -14,18 +21,68 @@
* <DataPathRoot>/<ClusterName>/<TableName>/<PartitionIndex>/<FileIndex>.sst => RocksDB SST File
*/
public class BulkLoaderConfig extends CommonConfig {
private AdvancedConfig advancedConfig = new AdvancedConfig();
public static final String DEFAULT_DATA_PATH_ROOT = "/pegasus-bulkloader";

private AdvancedConfig advancedConfig;

private String dataPathRoot = "/pegasus-bulkloader";
private String dataPathRoot;
private int tableId;
private int tablePartitionCount;

public BulkLoaderConfig(HDFSConfig hdfsConfig, String clusterName, String tableName) {
super(hdfsConfig, clusterName, tableName);
initConfig();
}

public BulkLoaderConfig(FDSConfig fdsConfig, String clusterName, String tableName) {
super(fdsConfig, clusterName, tableName);
initConfig();
}

private void initConfig() {
this.dataPathRoot = DEFAULT_DATA_PATH_ROOT;
this.advancedConfig = new AdvancedConfig();
}

public static BulkLoaderConfig loadConfig() throws PegasusSparkException, ConfigurationException {
return loadConfig(RemoteFSType.FDS);
}

public static BulkLoaderConfig loadConfig(CommonConfig.RemoteFSType remoteFSType)
throws ConfigurationException, PegasusSparkException {
XMLConfiguration configuration =
new XMLConfiguration(
Objects.requireNonNull(
BulkLoaderConfig.class.getClassLoader().getResource("core-site.xml"))
.getPath());
String clusterName = configuration.getString("pegasus.bulkloader.cluster");
String tableName = configuration.getString("pegasus.bulkloader.table");
int tableId = configuration.getInt("pegasus.bulkloader.id");
int tablePartitionCount = configuration.getInt("pegasus.bulkloader.count");
String dataPathRoot =
configuration.getString("pegasus.bulkloader.root", DEFAULT_DATA_PATH_ROOT);
boolean enableSort =
configuration.getBoolean("pegasus.bulkloader.sort", AdvancedConfig.DEFAULT_ENABLE_SORT);
boolean enableDistinct =
configuration.getBoolean(
"pegasus.bulkloader.distinct", AdvancedConfig.DEFAULT_ENABLE_DISTINCT);

BulkLoaderConfig bulkLoaderConfig;
if (remoteFSType == RemoteFSType.FDS) {
bulkLoaderConfig = new BulkLoaderConfig(loadFDSConfig(configuration), clusterName, tableName);
} else if (remoteFSType == RemoteFSType.HDFS) {
bulkLoaderConfig =
new BulkLoaderConfig(loadHDFSConfig(configuration), clusterName, tableName);
} else {
throw new PegasusSparkException("Only support fds and hdfs");
}

return bulkLoaderConfig
.setAdvancedConfig(
new AdvancedConfig().enableDistinct(enableDistinct).enableSort(enableSort))
.setDataPathRoot(dataPathRoot)
.setTableId(tableId)
.setTablePartitionCount(tablePartitionCount);
}

/**
Expand Down Expand Up @@ -102,9 +159,16 @@ public AdvancedConfig getAdvancedConfig() {
* the class generally.
*/
public static class AdvancedConfig implements Serializable {
public static final boolean DEFAULT_ENABLE_SORT = true;
public static final boolean DEFAULT_ENABLE_DISTINCT = true;

private boolean isDistinct;
private boolean isSort;

private boolean isDistinct = true;
private boolean isSort = true;
public AdvancedConfig() {
this.isDistinct = DEFAULT_ENABLE_DISTINCT;
this.isSort = DEFAULT_ENABLE_SORT;
}

/**
* set whether to distinct the [hashKeyLength][hashKey][sortKey] of pegasus records generated by
Expand Down
35 changes: 35 additions & 0 deletions pegasus-spark-bulkloader/src/main/resources/core-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<configuration>
<property>
<name>fs.fds.impl</name>
<value>com.xiaomi.infra.pegasus.spark.thirdparty.galaxy.hadoop.fs.FDSFileSystem</value>
</property>

<fs>
<fds>
<key>key</key>
<secret>secret</secret>
<bucket>bucket</bucket>
<endpoint>endpoint</endpoint>
<port>80</port>
</fds>

<hdfs>
<url>hdfs://</url>
<port>9001</port>
</hdfs>
</fs>

<pegasus>
<bulkloader>
<cluster>cluster</cluster>
<table>table</table>
<id>id</id>
<count>count</count>
<!--follow is optional-->
<root>/pegasus-bulkloader</root>
<sort>true</sort>
<distinct>true</distinct>
</bulkloader>
</pegasus>

</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import scala.collection.JavaConverters._

class PegasusRecordRDD(data: RDD[(PegasusKey, PegasusValue)]) {

def saveAsPegasusFile(config: BulkLoaderConfig): Unit = {
def saveAsPegasusFile(
config: BulkLoaderConfig = BulkLoaderConfig.loadConfig()
): Unit = {
var rdd = data
if (config.getAdvancedConfig.enableDistinct) {
rdd = rdd.distinct()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.xiaomi.infra.pegasus.spark.bulkloader.examples

import com.xiaomi.infra.pegasus.spark.FDSConfig
import com.xiaomi.infra.pegasus.spark.CommonConfig.RemoteFSType
import com.xiaomi.infra.pegasus.spark.bulkloader.{
BulkLoaderConfig,
PegasusRecord
Expand All @@ -17,18 +17,6 @@ object CSVBulkLoader {

val sc = new SparkContext(conf)

val config = new BulkLoaderConfig(
new FDSConfig(
"accessKey",
"accessSecret",
"bucketName",
"endPoint"
),
"clusterName",
"tableName"
).setTableId(20)
.setTablePartitionCount(32)

// Note: if the partition size > 2G before "saveAsPegasusFile", you need
// sc.textFile("data.csv").repartition(n), and let the partition size < 2G
sc.textFile("data.csv")
Expand All @@ -40,7 +28,9 @@ object CSVBulkLoader {
lines(2).getBytes()
)
})
.saveAsPegasusFile(config)
.saveAsPegasusFile(
BulkLoaderConfig.loadConfig(RemoteFSType.FDS)
)
}

}
Loading