Skip to content

Commit

Permalink
Handle Date type in HCatToRow (#32695)
Browse files Browse the repository at this point in the history
* Handle Date type in HCatToRow

Some initial notes:
- The issue (#20685) deals with java.sql.Date, which I wasn't able to
  reproduce fully (I can currently write hcatalog hadoop.hive date)
- On this note, 267f76f changed the
  code involved so that there's a direct cast to AbstractInstant in
  RowUtils.java. This doesn't change much, but jfyi.

* Run: ./gradlew :sdks:java:io:hcatalog:spotlessApply

* review cr: castTypes util

- s/castHDate/maybeCastHDate/ to be more concise
- move values manipulation to a separate util (hopefully, I understood
  the cr in the right way)
  • Loading branch information
deadb0d4 authored Oct 9, 2024
1 parent c243491 commit 2ee6100
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.hcatalog;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -25,6 +27,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.joda.time.Instant;

/** Utilities to convert {@link HCatRecord HCatRecords} to {@link Row Rows}. */
@SuppressWarnings({
Expand Down Expand Up @@ -74,14 +77,26 @@ public PCollection<Row> expand(PBegin input) {
private static class HCatToRowFn extends DoFn<HCatRecord, Row> {
private final Schema schema;

private Object maybeCastHDate(Object obj) {
if (obj instanceof org.apache.hadoop.hive.common.type.Date) {
return new Instant(((org.apache.hadoop.hive.common.type.Date) obj).toEpochMilli());
}
return obj;
}

/** Cast objects of the types that aren't supported by {@link Row}. */
private List<Object> castTypes(List<Object> values) {
return values.stream().map(this::maybeCastHDate).collect(Collectors.toList());
}

HCatToRowFn(Schema schema) {
this.schema = schema;
}

@ProcessElement
public void processElement(ProcessContext c) {
HCatRecord hCatRecord = c.element();
c.output(Row.withSchema(schema).addValues(hCatRecord.getAll()).build());
c.output(Row.withSchema(schema).addValues(castTypes(hCatRecord.getAll())).build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.TEST_RECORDS_COUNT;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.TEST_TABLE;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.buildHCatRecords;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.buildHCatRecordsWithDate;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.getConfigPropertiesAsMap;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.getExpectedRecords;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.getReaderContext;
Expand Down Expand Up @@ -54,12 +55,14 @@
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
Expand Down Expand Up @@ -230,6 +233,44 @@ public void processElement(ProcessContext c) {
readAfterWritePipeline.run();
}

/** Perform test for reading Date column type from an hcatalog. */
@Test
public void testReadHCatalogDateType() throws Exception {
service.executeQuery("drop table if exists " + TEST_TABLE);
service.executeQuery("create table " + TEST_TABLE + "(mycol1 string, mycol2 date)");

defaultPipeline
.apply(Create.of(buildHCatRecordsWithDate(TEST_RECORDS_COUNT)))
.apply(
HCatalogIO.write()
.withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
.withDatabase(TEST_DATABASE)
.withTable(TEST_TABLE)
.withPartition(new java.util.HashMap<>()));
defaultPipeline.run().waitUntilFinish();

final PCollection<String> output =
readAfterWritePipeline
.apply(
HCatToRow.fromSpec(
HCatalogIO.read()
.withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
.withDatabase(TEST_DATABASE)
.withTable(TEST_TABLE)
.withFilter(TEST_FILTER)))
.apply(
ParDo.of(
new DoFn<Row, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().getDateTime("mycol2").toString("yyyy-MM-dd HH:mm:ss"));
}
}))
.apply(Distinct.create());
PAssert.that(output).containsInAnyOrder(ImmutableList.of("2014-01-20 00:00:00"));
readAfterWritePipeline.run();
}

/** Test of Write to a non-existent table. */
@Test
public void testWriteFailureTableDoesNotExist() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map.Entry;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.values.KV;
import org.apache.hadoop.hive.common.type.Date;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
Expand Down Expand Up @@ -120,4 +121,13 @@ public static Map<String, String> getConfigPropertiesAsMap(HiveConf hiveConf) {
private static DefaultHCatRecord toHCatRecord(int value) {
return new DefaultHCatRecord(Arrays.asList("record " + value, value));
}

/** Returns a list of HCatRecords of passed size with some dummy date as a field. */
public static List<HCatRecord> buildHCatRecordsWithDate(int size) {
List<HCatRecord> expected = new ArrayList<>();
for (int i = 0; i < size; i++) {
expected.add(new DefaultHCatRecord(Arrays.asList("record " + i, Date.valueOf("2014-01-20"))));
}
return expected;
}
}

0 comments on commit 2ee6100

Please sign in to comment.