Skip to content

Compaction

Ziyang Liu edited this page Feb 19, 2015 · 49 revisions

Gobblin Compaction Tool

Gobblin can be used to periodically pull full snapshots of a data set, as well as delta changes (i.e., inserted or updated records) since the last pull. The gobblin compaction tool can be used to merge a snapshot with one or multiple deltas.

The compaction tool uses Hive to perform the compaction. It assumes the snapshot and the deltas meet the following requirements:

  1. Snapshot and all deltas are in Avro format.
  2. Snapshot and all deltas have the same primary key attributes (they do not need to have the same schema).
  3. Snapshot is pulled earlier than all deltas. Therefore if a key appears in both snapshot and deltas, the one in the snapshot should be discarded.
  4. The deltas are pulled one after another, and ordered in ascending order of pull time. If a key appears in both the ith delta and the jth delta (i < j), the one in the jth delta survives.

In the near future we also plan to support selecting records by timestamps (rather than which file they appear). This is useful if the snapshot and the deltas are pulled in parallel, where if a key has multiple occurrences we should keep the one with the latest timestamp.

Note that since delta tables don't have information of deleted records, such information is only available the next time the full snapshot is pulled.

Usage

Depending on your environment, use one of the following:

java jar compaction.jar <global-config-file>

or

hadoop jar compaction.jar <global-config-file>

or

yarn jar compaction.jar <global-config-file>

The merged data will be written to the HDFS directory specified in output.datalocation, as one or more Avro files. The schema of the output data will be the same as the schema of the last delta (which is the last pulled data and thus has the latest schema).

Global Config Properties (example: compaction.properties)

(1) Required:

  • compaction.config.dir

This is the the compaction jobconfig directory. Each file in this directory should be a jobconfig file (described in the next section).

(2) Optional:

  • hadoop.configfile.*

Hadoop configuration files that should be loaded (e.g., hadoop.configfile.coresite.xml=/export/apps/hadoop/latest/etc/hadoop/core-site.xml)

  • hdfs.uri

If property fs.defaultFS is specified in the hadoop configfile, then this property is not needed. However, if it is specified, it will override fs.defaultFS.

If fs.defaultFS is not specified in the hadoop configfile, and this property is also not specified, then we will use default value "hdfs://localhost:9000"

  • hiveserver.version (default: 2)

Either 1 or 2.

  • hiveserver.connection.string

  • hiveserver.url

  • hiveserver.user (default: "")

  • hiveserver.password (default: "")

If hiveserver.connection.string is specified, it will be used to connect to hiveserver.

If hiveserver.connection.string is not specified but hiveserver.url is specified, then we use (hiveserver.url, hiveserver.user, hiveserver.password) to connect to hiveserver.

If neither hiveserver.connection.string nor hiveserver.url is specified, then we will use embedded hiveserver (i.e., jdbc:hive:// if hiveserver.version=1, jdbc:hive2:// if hiveserver.version=2)

  • hivesite.dir

Directory that contains hive-site.xml, if hive-site.xml should be loaded.

  • hive.*

Any hive config property. (e.g., hive.join.cache.size). If specified, it will override the corresponding property in hive-site.xml.

Job Config Properties (example: jobconf/task1.conf)

(1) Required:

  • snapshot.pkey

comma separated primary key attributes of the snapshot table

  • snapshot.datalocation

snapshot data directory in HDFS

  • delta.i.pkey (i = 1, 2...)

the pkey of ith delta table (the pkey of snapshot and all deltas should be the same)

  • delta.i.datalocation (i = 1, 2...)

ith delta table's data directory in HDFS

  • output.datalocation

the HDFS data directory for the output (make sure you have write permission on this directory)

(2) Optional:

  • snapshot.name (default: randomly generated name)

prefix name of the snapshot table. The table name will be snapshot.name + random suffix

  • snapshot.schemalocation

snapshot table's schema location in HDFS. If not specified, schema will be extracted from the data.

  • delta.i.name (default: randomly generated name)

prefix name of the ith delta table. The table name will be delta.i.name + random suffix

  • delta.i.schemalocation

ith delta table's schema location in HDFS. If not specified, schema will be extracted from the data.

  • output.name (default: randomly generated name)

prefix name of the output table. The table name will be output.name + random suffix

  • hive.db.name (default: default)

the database name to be used. This database should already exist, and you should have write permission on it.

  • hive.queue.name (default: default)

queue name to be used.

  • hive.use.mapjoin (default: if not specified in the global config file, then false)

whether map-side join should be turned on. If specified both in this property and in the global config file (hive.*), this property takes precedences.

  • hive.mapjoin.smalltable.filesize (default: if not specified in the global config file, then use Hive's default value)

if hive.use.mapjoin = true, mapjoin will be used if the small table size is smaller than hive.mapjoin.smalltable.filesize (in bytes). If specified both in this property and in the global config file (hive.*), this property takes precedences.

  • hive.tmpschema.dir (default: the parent dir of the data location dir where the data is used to extract the schema)

If we need to extract schema from data, this dir is for the extracted schema. Note that if you do not have write permission on the default dir, you must specify this property as a dir where you do have write permission.

  • snapshot.copydata (default: false)

Set to true if you don't want to (or are unable to) create external table on snapshot.datalocation. We will create a copy of the snapshot data in hive.tmpdata.dir.

This property should be set to true if either of the following two situations applies:

(i) You don't have write permission to snapshot.datalocation. If so, once you create an external table on snapshot.datalocation, you may not be able to drop it.

(ii) You want to use a certain subset of files in snapshot.datalocation (e.g., snapshot.datalocation contains both .csv and .avro files but you only want to use .avro files)

  • delta.i.copydata (i = 1, 2...) (default: false)

Similar as snapshot.copydata

  • hive.tmpdata.dir (default: "/")

If snapshot.copydata = true or delta.i.copydata = true, the data will be copied to this dir. You should have write permission to this dir.

  • snapshot.dataformat.extension.name (default: "")

If snapshot.copydata = true, then only those data files whose extension is snapshot.dataformat will be moved to hive.tmpdata.dir.

  • delta.i.dataformat.extension.name (default: "")

Similar as snapshot.dataformat.extension.name

  • mapreduce.job.num.reducers

Number of reducers for the job.

  • timing.file (default: time.txt)

A file where the running time of each compaction job is printed.

Clone this wiki locally