Skip to content

Compaction

Ziyang Liu edited this page Mar 4, 2015 · 49 revisions

Gobblin can be used to periodically pull full snapshots of a data set, as well as delta changes (i.e., inserted or updated records) since the last pull. The gobblin compaction tool can be used to merge a snapshot with one or multiple deltas.

The compaction tool uses Hive to perform the compaction. It assumes the snapshot and the deltas meet the following requirements:

  1. Snapshot and all deltas are in Avro format.
  2. Snapshot and all deltas have the same primary key attributes (they do not need to have the same schema).
  3. Snapshot is pulled earlier than all deltas. Therefore if a key appears in both snapshot and deltas, the one in the snapshot should be discarded.
  4. The deltas are pulled one after another, and ordered in ascending order of pull time. If a key appears in both the ith delta and the jth delta (i < j), the one in the jth delta survives.

In the near future we also plan to support selecting records by timestamps (rather than which file they appear). This is useful if the snapshot and the deltas are pulled in parallel, where if a key has multiple occurrences we should keep the one with the latest timestamp.

Note that since delta tables don't have information of deleted records, such information is only available the next time the full snapshot is pulled.

Usage

After building Gobblin (i.e., ./gradlew clean build), a folder build/gobblin-compaction/jar should be created. The jar folder contains a jar file (gobblin-compaction.jar), a folder of dependencies (gobblin-compaction_lib), and a log4j config file (log4j.xml).

To run compaction, go to the jar folder (or copy everything in the jar folder to another folder and go to that folder), and run

java -jar compaction.jar <global-config-file>

If for whatever reason (e.g., your Hadoop cluster is in secure mode) you need to run the jar using Hadoop or Yarn, then you first need to make sure the correct log4j config file is used, since there is another log4j config file in the Hadoop classpath. To do so, run the following two commands:

export HADOOP_CLASSPATH=.
export HADOOP_USER_CLASSPATH_FIRST=true

The first command adds the current directory to the Hadoop classpath, and the second command tells Hadoop/Yarn to prioritize user's classpath. Then you can run the compaction jar:

hadoop jar compaction.jar <global-config-file>

or

yarn jar compaction.jar <global-config-file>

The merged data will be written to the HDFS directory specified in output.datalocation, as one or more Avro files. The schema of the output data will be the same as the schema of the last delta (which is the last pulled data and thus has the latest schema).

Global Config Properties (example: compaction.properties)

(1) Required:

  • compaction.config.dir

This is the the compaction jobconfig directory. Each file in this directory should be a jobconfig file (described in the next section).

(2) Optional:

  • hadoop.configfile.*

Hadoop configuration files that should be loaded (e.g., hadoop.configfile.coresite.xml=/export/apps/hadoop/latest/etc/hadoop/core-site.xml)

  • hdfs.uri

If property fs.defaultFS (or fs.default.name) is specified in the hadoop config file, then this property is not needed. However, if it is specified, it will override fs.defaultFS (or fs.default.name).

If fs.defaultFS or fs.default.name is not specified in the hadoop config file, and this property is also not specified, then the default value "hdfs://localhost:9000" will be used.

  • hiveserver.version (default: 2)

Either 1 or 2.

  • hiveserver.connection.string

  • hiveserver.url

  • hiveserver.user (default: "")

  • hiveserver.password (default: "")

If hiveserver.connection.string is specified, it will be used to connect to hiveserver.

If hiveserver.connection.string is not specified but hiveserver.url is specified, then it uses (hiveserver.url, hiveserver.user, hiveserver.password) to connect to hiveserver.

If neither hiveserver.connection.string nor hiveserver.url is specified, then embedded hiveserver will be used (i.e., jdbc:hive:// if hiveserver.version=1, jdbc:hive2:// if hiveserver.version=2)

  • hivesite.dir

Directory that contains hive-site.xml, if hive-site.xml should be loaded.

  • hive.*

Any hive config property. (e.g., hive.join.cache.size). If specified, it will override the corresponding property in hive-site.xml.

Job Config Properties (example: jobconf/task1.conf)

(1) Required:

  • snapshot.pkey

comma separated primary key attributes of the snapshot table

  • snapshot.datalocation

snapshot data directory in HDFS

  • delta.i.pkey (i = 1, 2...)

the primary key of ith delta table (the primary key of snapshot and all deltas should be the same)

  • delta.i.datalocation (i = 1, 2...)

ith delta table's data directory in HDFS

  • output.datalocation

the HDFS data directory for the output (make sure you have write permission on this directory)

(2) Optional:

  • snapshot.name (default: randomly generated name)

prefix name of the snapshot table. The table name will be snapshot.name + random suffix

  • snapshot.schemalocation

snapshot table's schema location in HDFS. If not specified, schema will be extracted from the data.

  • delta.i.name (default: randomly generated name)

prefix name of the ith delta table. The table name will be delta.i.name + random suffix

  • delta.i.schemalocation

ith delta table's schema location in HDFS. If not specified, schema will be extracted from the data.

  • output.name (default: randomly generated name)

prefix name of the output table. The table name will be output.name + random suffix

  • hive.db.name (default: default)

the database name to be used. This database should already exist, and you should have write permission on it.

  • hive.queue.name (default: default)

queue name to be used.

  • hive.use.mapjoin (default: if not specified in the global config file, then false)

whether map-side join should be turned on. If specified both in this property and in the global config file (hive.*), this property takes precedences.

  • hive.mapjoin.smalltable.filesize (default: if not specified in the global config file, then use Hive's default value)

if hive.use.mapjoin = true, mapjoin will be used if the small table size is smaller than hive.mapjoin.smalltable.filesize (in bytes). If specified both in this property and in the global config file (hive.*), this property takes precedences.

  • hive.tmpschema.dir (default: the parent dir of the data location dir where the data is used to extract the schema)

If we need to extract schema from data, this dir is for the extracted schema. Note that if you do not have write permission on the default dir, you must specify this property as a dir where you do have write permission.

  • snapshot.copydata (default: false)

Set to true if you don't want to (or are unable to) create external table on snapshot.datalocation. A copy of the snapshot data will be created in hive.tmpdata.dir, and will be removed after the compaction.

This property should be set to true if either of the following two situations applies:

(i) You don't have write permission to snapshot.datalocation. If so, once you create an external table on snapshot.datalocation, you may not be able to drop it. This is a Hive bug and for more information, see this page, which includes a Hive patch for the bug.

(ii) You want to use a certain subset of files in snapshot.datalocation (e.g., snapshot.datalocation contains both .csv and .avro files but you only want to use .avro files)

  • delta.i.copydata (i = 1, 2...) (default: false)

Similar as snapshot.copydata

  • hive.tmpdata.dir (default: "/")

If snapshot.copydata = true or delta.i.copydata = true, the data will be copied to this dir. You should have write permission to this dir.

  • snapshot.dataformat.extension.name (default: "")

If snapshot.copydata = true, then only those data files whose extension is snapshot.dataformat will be moved to hive.tmpdata.dir.

  • delta.i.dataformat.extension.name (default: "")

Similar as snapshot.dataformat.extension.name.

  • mapreduce.job.num.reducers

Number of reducers for the job.

  • timing.file (default: time.txt)

A file where the running time of each compaction job is printed.

Clone this wiki locally