-
Notifications
You must be signed in to change notification settings - Fork 751
Compaction
- Author: Ziyang
- Reviewer: Henry
One of the canonical use cases of Gobblin is periodically pulling data from data stores, where we pull full snapshots of the data store with low frequency (due to their sizes), and pull deltas of the snapshots, which contains all records inserted or updated since the last pull, with higher frequency. Each time we pull deltas, we merge them with the latest snapshot we have, which is called a compaction. Since delta tables don't have information of deleted records, such information is only available the next time we pull the snapshot.
One way to perform compaction is using Hive. Let us look at a few examples.
<lqiao> There is a strong assumption on serial order of snapshot and delta, and snapshot and delta never overlap in time. We need to call it out in our deployment page to make sure that's the case. For the first release it's ok to live with that constraint. </lqiao>
<stakiar> Looks like the first part of the document talks about how to handle the case where the deltas only have (key, value). This would require snapshots and deltas to be taken sequentially. However, looks like the end of the document talks about how to do compaction with deltas (key, value, delta), which would allow deltas to be taken in parallel with snapshots. It would probably be a good idea to touch upon this point at the beginning of the document, that way it's clearer to the users what the tradeoffs are between having the delta schema as (key, value) vs. (key, value, delta). </stakiar>
Suppose we have one snapshot table 'snapshot (key INT, value STRING)' and one delta table 'delta (key INT, value STRING)'. Both tables have been pulled and stored in avro format (e.g., snapshot.avro, delta.avro, schema.avsc).
We first create tables for snapshot and delta:
CREATE EXTERNAL TABLE snapshot
STORED AS AVRO
LOCATION '<path-to-snapshot.avro>'
TBLPROPERTIES ('avro.schema.url'='<path-to-schema.avsc>');
The delta table can be created similarly. Note that If your Hive version is before 0.14, the create table statement has to be written as
CREATE EXTERNAL TABLE snapshot
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '<path-to-snapshot.avro>'
TBLPROPERTIES ('avro.schema.url'='<path-to-schema.avsc>');
Next we retrieve the records in the snapshot that are not updated in the delta, using a left outer join:
CREATE TEMPORARY TABLE not_updated AS
SELECT snapshot.key, snapshot.value FROM
snapshot LEFT OUTER JOIN delta
ON snapshot.key=delta.key
WHERE delta.key IS NULL;
Note that delta.key IS NULL
should be in the WHERE clause rather than the join condition (i.e., ON snapshot.key=delta.key AND delta.key IS NULL
), since the outer join has to be evaluated on condition snapshot.key=delta.key
first, before certain keys in delta become NULL.
If you are using a Hive version before 0.14, you may change TEMPORARY TABLE to TABLE or VIEW.
An alternative way to retrieve not_updated is using a NOT IN subquery:
CREATE TEMPORARY TABLE not_updated AS
SELECT * from SNAPSHOT
WHERE snapshot.key NOT IN
(SELECT delta.key FROM delta);
However, this only works if the primary key of snapshot and delta contains a single attribute.
Finally we do a UNION ALL on not_updated and delta, and insert it to the new_snapshot table:
CREATE EXTERNAL TABLE new_snapshot
STORED AS AVRO
LOCATION '<path-to-new_snapshot.avro>'
TBLPROPERTIES ('avro.schema.url'='<path-to-schema.avsc>');
INSERT OVERWRITE TABLE new_snapshot AS
SELECT * FROM not_updated
UNION ALL
SELECT * FROM delta;
<lqiao> Does this generate avro files automatically or the data format needs to be specified? We need to make sure the compaction outcome is not just in hive but also in avro format (first release), so down stream can continue with Pig or M/R to process the data. </lqiao>
One extraction of the delta tables may get multiple delta tables of a snapshot. Suppose we have a snapshot table and multiple delta tables delta_1,..., delta_k, in the order of timestamp (thus if j>i, an update on a key on delta_j should trump an update on the same key on delta_i). We can first merge the delta tables into a single delta table with the following pseudocode:
CREATE TABLE tmp_delta(key INT, value STRING);
merged_delta = delta_1;
for (i from 2 to k)
CREATE TABLE diff as
SELECT merged_delta.key, merged_delta.value FROM
merged_delta LEFT OUTER JOIN delta_i
ON merged_delta.key=delta_i.key
WHERE delta_i.key IS NULL;
INSERT OVERWRITE TABLE tmp_delta
SELECT * FROM diff
UNION ALL
SELECT * FROM delta_i;
merged_delta = tmp_delta;
Then we can do the same thing as in the previous example.
<stakiar> I know not all users may be using Avro, but does Avro provide an functionality to do schema evolution with Hive automatically? </stakiar>
Suppose we have a snapshot table and a delta table, with the same key attributes but different value attributes. Changing key attributes is out of the scope of this document, since it requires specific solutions depending on how the key attributes are changed.
New Attribute in Delta
Suppose the schema of snapshot is (key INT, value1 STRING) and the schema of delta is (key INT, value1 STRING, value2 STRING), and the default value of value2 is 'default'. After we obtain temporary table not_updated, we can add value2 to not_updated by
CREATE TABLE not_updated_with_new_attr
(key INT, value1 STRING, value2 STRING);
INSERT OVERWRITE TABLE not_updated_with_new_attr
SELECT *, 'default' from not_updated;
Then we can union not_updated_with_new_attr with delta. If the default value of value2 is NULL, then we can simply do
ALTER TABLE not_updated
ADD COLUMNS (value2 STRING);
Removed Attribute in Delta
Suppose the schema of snapshot is (key INT, value1 STRING, value2 STRING) and the schema of delta is (key INT, value1 STRING). Then we can simply project value2 out in the UNION ALL statement:
INSERT OVERWRITE TABLE new_snapshot AS
SELECT key, value1 FROM not_updated
UNION ALL
SELECT * FROM delta;
Suppose the schema of snapshot and delta is (key INT, value STRING) partitioned by (year INT, month INT). To improve performance, in the left outer join we should include year and month in the join condition:
CREATE TEMPORARY TABLE not_updated AS
SELECT snapshot.key, snapshot.value FROM
snapshot LEFT OUTER JOIN delta
ON snapshot.key=delta.key
AND snapshot.date=delta.date
AND snapshot.month=delta.month
WHERE delta.key IS NULL;
This tells Hive to join corresponding partitions in snapshot and delta, rather than join the full tables.
Suppose the schema of snapshot and delta is (key INT, value STRING, ts TIMESTAMP), and the delta table may contain multiple records of the same key if that key has been updated multiple times. In this case we should only merge the update with the latest timestamp with snapshot. To do so we can deduplicate the records in delta and only keep the one with the latest timestamp:
SELECT t.*
FROM delta t WHERE t.ts IN
(SELECT MAX(ts) FROM delta s
WHERE s.key=t.key);
In some applications the job pulling the snapshot and the job pulling the delta may run in parallel rather than sequentially. This means that it is possible that some records in delta are older than the corresponding records in snapshot, and such records in the snapshot should survive after merging snapshot and delta.
In this case each record should have a timestamp, and whenever a key is present in both snapshot and delta, the record with the latest timestamp wins. Suppose the schema of snapshot and delta is (key INT, value STRING, ts TIMESTAMP). We can first union snapshot and delta, then perform a deduplication similar as the previous query.
CREATE TABLE merged AS
SELECT * FROM snapshot
UNION ALL
SELECT * FROM delta;
INSERT OVERWRITE TABLE new_snapshot AS
SELECT t.*
FROM merged t WHERE t.ts IN
(SELECT MAX(ts) FROM merged s
WHERE s.key=t.key);
- Home
- [Getting Started](Getting Started)
- Architecture
- User Guide
- Working with Job Configuration Files
- [Deployment](Gobblin Deployment)
- Gobblin on Yarn
- Compaction
- [State Management and Watermarks] (State-Management-and-Watermarks)
- Working with the ForkOperator
- [Configuration Glossary](Configuration Properties Glossary)
- [Partitioned Writers](Partitioned Writers)
- Monitoring
- Schedulers
- [Job Execution History Store](Job Execution History Store)
- Gobblin Build Options
- Troubleshooting
- [FAQs] (FAQs)
- Case Studies
- Gobblin Metrics
- [Quick Start](Gobblin Metrics)
- [Existing Reporters](Existing Reporters)
- [Metrics for Gobblin ETL](Metrics for Gobblin ETL)
- [Gobblin Metrics Architecture](Gobblin Metrics Architecture)
- [Implementing New Reporters](Implementing New Reporters)
- [Gobblin Metrics Performance](Gobblin Metrics Performance)
- Developer Guide
- [Customization: New Source](Customization for New Source)
- [Customization: Converter/Operator](Customization for Converter and Operator)
- Code Style Guide
- IDE setup
- Monitoring Design
- Project
- [Feature List](Feature List)
- Contributors/Team
- [Talks/Tech Blogs](Talks and Tech Blogs)
- News/Roadmap
- Posts
- Miscellaneous