Skip to content

Compaction

Ziyang Liu edited this page Oct 16, 2015 · 49 revisions

Table of Contents

Compaction can be used to post-process files pulled by Gobblin with certain semantics. Deduplication is one of the common reasons to do compaction, e.g., you may want to

  • deduplicate on all fields of the records.
  • deduplicate on key fields of the records, keep the one with the latest timestamp for records with the same key.

This is because duplicates can be generated for multiple reasons including both intended and unintended:

  • For ingestion from data sources with mutable records (e.g., relational databases), instead of ingesting a full snapshot of a table every time, one may wish to ingest only the records that were changed since the previous run (i.e., delta records), and merge these delta records with previously generated snapshots in a compaction. In this case, for records with the same primary key, the one with the latest timestamp should be kept.
  • The data source you ingest from may have duplicate records, e.g., if you have a hierarchy of Kafka clusters where topics are replicated among the Kafka clusters, duplicate records may be generated during the replication. In some data sources duplicate records may also be produced by the data producer.
  • In rare circumstances, Gobblin may pull the same data twice, thus creating duplicate records. This may happen if Gobblin publishes the data successfully, but for some reason fails to persist the checkpoints (watermarks) into the state store.

Gobblin provides two compactors out-of-the-box, a MapReduce compactor and a Hive compactor.

MapReduce Compactor

The MapReduce compactor can be used to deduplicate on all or certain fields of the records. For duplicate records, one of them will be preserved; there is no guarantee which one will be preserved.

A use case of MapReduce Compactor is for Kafka records deduplication.

Basic Usage

MRCompactor.compact() is the entry point for MapReduce-based compaction. The input data to be compacted is specified by compaction.input.dir. Each subdir under compaction.input.dir is considered a topic. Each topic may contain multiple datasets, each of which is a unit for compaction. It is up to MRCompactorJobPropCreator to determine what is a dataset under each topic. If a topic has multiple levels of folders, subsequent levels can be specified using compaction.input.subdir.

Example: suppose the root folder for your input data is /data/kafka_topics. There are three topics: NewUserEvent, PasswordChangeEvent, PrivateMessageEvent. The data for each topic is partitioned by hours, e.g., /data/kafka_topics/NewUserEvent/hourly/2015/10/06/14. You want to run compaction on each day's 24 hourly folders, and publish a deduplicated daily folder, e.g., /data/kafka_topics/NewUserEvent/daily_deduped/2015/10/06.

In this case each dataset contains the 24 hourly folder of a day, e.g., /data/kafka_topics/NewUserEvent/hourly/2015/10/06. A MapReduce job will be launched for each such dataset to deduplicate the records and publish output folder /data/kafka_topics/NewUserEvent/daily_deduped/2015/10/06. You may use the following config properties:

compaction.input.dir=/data/kafka_topics
compaction.dest.dir=/data/kafka_topics
compaction.input.subdir=hourly
compaction.dest.subdir=daily_deduped
compaction.folder.pattern=YYYY/MM/dd
compaction.timebased.max.time.ago=3d
compaction.timebased.min.time.ago=1d
compaction.jobprops.creator.class=gobblin.compaction.mapreduce.MRCompactorTimeBasedJobPropCreator
compaction.job.runner.class=gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner (if your data is Avro)

If your data format is not Avro, you can implement a different job runner class for deduplicating your data format. compaction.timebased.max.time.ago and compaction.timebased.min.time.ago are used to control the earliest and latest folders to be processed, e.g., if there values are 3d and 1d, respectively, and suppose the current time is 10/07 9am, it will not process folders on 10/04 or before (since they are more than 3 days ago) or folders on 10/07 (since they are less than 1 day ago). (■)

Non-deduping Compaction via Map-only Jobs

If deduplication is not needed, for example you simply want to consolidate files in 24 hourly folders into a single daily folder, you may set compaction.deduplicate=false.

Handling Late Records

Late records are records that arrived at a folder after compaction on this folder has started. We explain how Gobblin handles late records using the following example.

Suppose we ingest data from a Kafka broker, and we would like to publish the data by hour and by day, both of which are deduplicated:

  • Data in the Kafka broker is first ingested into an hourly_staging folder, e.g., /data/kafka_topics/NewUserEvent/hourly_staging/2015/10/29/08...
  • A compaction with deduplication runs hourly, consumes data in hourly_staging and publish data into hourly, e.g., /data/kafka_topics/NewUserEvent/hourly/2015/10/29/08...
  • A non-deduping compaction runs daily, consumes data in hourly and publish data into daily, e.g., /data/kafka_topics/NewUserEvent/daily/2015/10/29...

In this use case, both hourly compaction and daily compaction need a mechanism to handle late records. For hourly compaction, late records are records that arrived at an hourly_staging folder after the hourly compaction of that folder has started. It is similar for daily compaction.

For a compaction with deduplication (i.e., hourly compaction in the above use case), there are two options to deal with late data:

  • Option 1: if there are late data, re-do the compaction. For example, you may run the hourly compaction multiple times per hour. The first run will do the normal compaction, and in each subsequent run, if it detects late data in a folder, it will re-do compaction for that folder.

To do so, set compaction.job.overwrite.output.dir=true and compaction.recompact.from.input.for.late.data=true.

Please note the following when you use this option: (1) this means that your already-published data will be re-published if late data are detected; (2) this is potentially dangerous if your input folders have short retention periods. For example, suppose hourly_staging folders have a 2-day retention period, i.e., folder /data/kafka_topics/NewUserEvent/hourly_staging/2015/10/29 will be deleted on 2015/10/31. If, after 2015/10/31, new data arrived at this folder and you re-compact this folder and publish the data to hourly, all original data will be gone. To avoid this problem you may set compaction.timebased.max.time.ago=2d so that compaction will not be performed on a folder more than 2 days ago. However, this means that if a late record is late for more than 2 days, it will never be published into hourly.

  • Option 2: (this is the default option) if there are late data, copy the late data into a [output_subdir]/_late folder, e.g., for hourly compaction, late data in hourly_staging will be copied to hourly_late folders, e.g., /data/kafka_topics/NewUserEvent/hourly_late/2015/10/29....

If re-compaction is not necessary, this is all you need to do. If re-compaction is needed, you may schedule or manually invoke a re-compaction job which will re-compact by consuming data in both hourly and hourly_late. For this job, you need to set compaction.job.overwrite.output.dir=true and compaction.recompact.from.dest.paths=true.

Note that this re-compaction is different from the re-compaction in Option 1: this re-compaction consumes data in output folders (i.e., hourly) whereas the re-compaction in Option 1 consumes data in input folders (i.e., hourly_staging).

For a compaction without deduplication (i.e., daily compaction in the above use case)

One way of reducing the chance of seeing late records is to verify data completeness before running compaction, which will be explained next.

Verifying Data Completeness Before Compaction

Besides aborting the compaction job for a dataset if new data in the input folder is found, another way to reduce the chance of seeing late events is to verify the completeness of input data before running compaction. To do so, set compaction.completeness.verification.enabled=true, extend DataCompletenessVerifier.AbstractRunner and put in your verification logic, and pass it via compaction.completeness.verification.class.

When data completeness verification is enabled, MRCompactor will verify data completeness for the input datasets, and meanwhile speculatively start the compaction MR jobs. When the compaction MR job for a dataset finishes, if the completeness of the dataset is verified, its compacted data will be published, otherwise it is discarded, and the compaction MR job for this dataset will be launched again with a reduced priority.

It is possible to control which topics should or should not be verified via compaction.completeness.verification.whitelist and compaction.completeness.verification.blacklist. It is also possible to set a timeout for data completeness verification via compaction.completeness.verification.timeout.minutes. A dataset whose completeness verification timed out can be configured to be either compacted anyway or not compacted.

Hive Compactor

The Hive compactor can be used to merge a snapshot with one or multiple deltas. It assumes the snapshot and the deltas meet the following requirements:

  1. Snapshot and all deltas are in Avro format.
  2. Snapshot and all deltas have the same primary key attributes (they do not need to have the same schema).
  3. Snapshot is pulled earlier than all deltas. Therefore if a key appears in both snapshot and deltas, the one in the snapshot should be discarded.
  4. The deltas are pulled one after another, and ordered in ascending order of pull time. If a key appears in both the ith delta and the jth delta (i < j), the one in the jth delta survives.

In the near future we also plan to support selecting records by timestamps (rather than which file they appear). This is useful if the snapshot and the deltas are pulled in parallel, where if a key has multiple occurrences we should keep the one with the latest timestamp.

Note that since delta tables don't have information of deleted records, such information is only available the next time the full snapshot is pulled.

Usage

After building Gobblin (i.e., ./gradlew clean build), a zipped file build/gobblin-compaction/distributions/gobblin-compaction.tar.gz should be created. It contains a jar file (gobblin-compaction.jar), a folder of dependencies (gobblin-compaction_lib), and a log4j config file (log4j.xml).

To run compaction, extract it into a folder, go to that folder and run

java -jar gobblin-compaction.jar <global-config-file>

If for whatever reason (e.g., your Hadoop cluster is in secure mode) you need to run the jar using Hadoop or Yarn, then you first need to make sure the correct log4j config file is used, since there is another log4j config file in the Hadoop classpath. To do so, run the following two commands:

export HADOOP_CLASSPATH=.
export HADOOP_USER_CLASSPATH_FIRST=true

The first command adds the current directory to the Hadoop classpath, and the second command tells Hadoop/Yarn to prioritize user's classpath. Then you can run the compaction jar:

hadoop jar gobblin-compaction.jar <global-config-file>

or

yarn jar gobblin-compaction.jar <global-config-file>

The merged data will be written to the HDFS directory specified in output.datalocation, as one or more Avro files. The schema of the output data will be the same as the schema of the last delta (which is the last pulled data and thus has the latest schema).

The provided log4j config file (log4j.xml) prints logs from Gobblin compaction classes to the console, and writes logs from other classes (e.g., Hive classes) to logs/gobblin-compaction.log. Note that for drop table queries (DROP TABLE IF EXISTS <tablename>), the Hive JDBC client will throw NoSuchObjectException if the table doesn't exist. This is normal and such exceptions should be ignored.

Global Config Properties (example: compaction.properties)

(1) Required:

  • compaction.config.dir

This is the the compaction jobconfig directory. Each file in this directory should be a jobconfig file (described in the next section).

(2) Optional:

  • hadoop.configfile.*

Hadoop configuration files that should be loaded (e.g., hadoop.configfile.coresite.xml=/export/apps/hadoop/latest/etc/hadoop/core-site.xml)

  • hdfs.uri

If property fs.defaultFS (or fs.default.name) is specified in the hadoop config file, then this property is not needed. However, if it is specified, it will override fs.defaultFS (or fs.default.name).

If fs.defaultFS or fs.default.name is not specified in the hadoop config file, and this property is also not specified, then the default value "hdfs://localhost:9000" will be used.

  • hiveserver.version (default: 2)

Either 1 or 2.

  • hiveserver.connection.string

  • hiveserver.url

  • hiveserver.user (default: "")

  • hiveserver.password (default: "")

If hiveserver.connection.string is specified, it will be used to connect to hiveserver.

If hiveserver.connection.string is not specified but hiveserver.url is specified, then it uses (hiveserver.url, hiveserver.user, hiveserver.password) to connect to hiveserver.

If neither hiveserver.connection.string nor hiveserver.url is specified, then embedded hiveserver will be used (i.e., jdbc:hive:// if hiveserver.version=1, jdbc:hive2:// if hiveserver.version=2)

  • hivesite.dir

Directory that contains hive-site.xml, if hive-site.xml should be loaded.

  • hive.*

Any hive config property. (e.g., hive.join.cache.size). If specified, it will override the corresponding property in hive-site.xml.

Job Config Properties (example: jobconf/task1.conf)

(1) Required:

  • snapshot.pkey

comma separated primary key attributes of the snapshot table

  • snapshot.datalocation

snapshot data directory in HDFS

  • delta.i.pkey (i = 1, 2...)

the primary key of ith delta table (the primary key of snapshot and all deltas should be the same)

  • delta.i.datalocation (i = 1, 2...)

ith delta table's data directory in HDFS

  • output.datalocation

the HDFS data directory for the output (make sure you have write permission on this directory)

(2) Optional:

  • snapshot.name (default: randomly generated name)

prefix name of the snapshot table. The table name will be snapshot.name + random suffix

  • snapshot.schemalocation

snapshot table's schema location in HDFS. If not specified, schema will be extracted from the data.

  • delta.i.name (default: randomly generated name)

prefix name of the ith delta table. The table name will be delta.i.name + random suffix

  • delta.i.schemalocation

ith delta table's schema location in HDFS. If not specified, schema will be extracted from the data.

  • output.name (default: randomly generated name)

prefix name of the output table. The table name will be output.name + random suffix

  • hive.db.name (default: default)

the database name to be used. This database should already exist, and you should have write permission on it.

  • hive.queue.name (default: default)

queue name to be used.

  • hive.use.mapjoin (default: if not specified in the global config file, then false)

whether map-side join should be turned on. If specified both in this property and in the global config file (hive.*), this property takes precedences.

  • hive.mapjoin.smalltable.filesize (default: if not specified in the global config file, then use Hive's default value)

if hive.use.mapjoin = true, mapjoin will be used if the small table size is smaller than hive.mapjoin.smalltable.filesize (in bytes). If specified both in this property and in the global config file (hive.*), this property takes precedences.

  • hive.tmpschema.dir (default: the parent dir of the data location dir where the data is used to extract the schema)

If we need to extract schema from data, this dir is for the extracted schema. Note that if you do not have write permission on the default dir, you must specify this property as a dir where you do have write permission.

  • snapshot.copydata (default: false)

Set to true if you don't want to (or are unable to) create external table on snapshot.datalocation. A copy of the snapshot data will be created in hive.tmpdata.dir, and will be removed after the compaction.

This property should be set to true if either of the following two situations applies:

(i) You don't have write permission to snapshot.datalocation. If so, once you create an external table on snapshot.datalocation, you may not be able to drop it. This is a Hive bug and for more information, see this page, which includes a Hive patch for the bug.

(ii) You want to use a certain subset of files in snapshot.datalocation (e.g., snapshot.datalocation contains both .csv and .avro files but you only want to use .avro files)

  • delta.i.copydata (i = 1, 2...) (default: false)

Similar as snapshot.copydata

  • hive.tmpdata.dir (default: "/")

If snapshot.copydata = true or delta.i.copydata = true, the data will be copied to this dir. You should have write permission to this dir.

  • snapshot.dataformat.extension.name (default: "")

If snapshot.copydata = true, then only those data files whose extension is snapshot.dataformat will be moved to hive.tmpdata.dir.

  • delta.i.dataformat.extension.name (default: "")

Similar as snapshot.dataformat.extension.name.

  • mapreduce.job.num.reducers

Number of reducers for the job.

  • timing.file (default: time.txt)

A file where the running time of each compaction job is printed.

Clone this wiki locally