Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax committed Aug 27, 2024
1 parent af724fb commit 56bfec6
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3042,8 +3042,9 @@ public Write<T> withPropagateSuccessfulStorageApiWrites(
}

/**
* If set to true, then all successful writes will be propagated to {@link WriteResult} and
* accessible via the {@link WriteResult#getSuccessfulStorageApiInserts} method.
* If called, then all successful writes will be propagated to {@link WriteResult} and
* accessible via the {@link WriteResult#getSuccessfulStorageApiInserts} method. The predicate
* allows filtering out columns from appearing in the resulting PCollection.
*/
public Write<T> withPropagateSuccessfulStorageApiWrites(Predicate<String> columnsToPropagate) {
return toBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1095,16 +1095,27 @@ private static long toEpochMicros(Instant timestamp) {
@VisibleForTesting
public static TableRow tableRowFromMessage(
Message message, boolean includeCdcColumns, Predicate<String> includeField) {
return tableRowFromMessage(message, includeCdcColumns, includeField, "");
}

public static TableRow tableRowFromMessage(
Message message,
boolean includeCdcColumns,
Predicate<String> includeField,
String namePrefix) {
// TODO: Would be more correct to generate TableRows using setF.
TableRow tableRow = new TableRow();
for (Map.Entry<FieldDescriptor, Object> field : message.getAllFields().entrySet()) {
StringBuilder fullName = new StringBuilder();
FieldDescriptor fieldDescriptor = field.getKey();
fullName = fullName.append(namePrefix).append(fieldDescriptor.getName());
Object fieldValue = field.getValue();
if ((includeCdcColumns || !StorageApiCDC.COLUMNS.contains(fieldDescriptor.getName()))
if ((includeCdcColumns || !StorageApiCDC.COLUMNS.contains(fullName.toString()))
&& includeField.test(fieldDescriptor.getName())) {
tableRow.put(
fieldDescriptor.getName(),
jsonValueFromMessageValue(fieldDescriptor, fieldValue, true, includeField));
jsonValueFromMessageValue(
fieldDescriptor, fieldValue, true, includeField, fullName.append(".").toString()));
}
}
return tableRow;
Expand All @@ -1114,18 +1125,19 @@ public static Object jsonValueFromMessageValue(
FieldDescriptor fieldDescriptor,
Object fieldValue,
boolean expandRepeated,
Predicate<String> includeField) {
Predicate<String> includeField,
String prefix) {
if (expandRepeated && fieldDescriptor.isRepeated()) {
List<Object> valueList = (List<Object>) fieldValue;
return valueList.stream()
.map(v -> jsonValueFromMessageValue(fieldDescriptor, v, false, includeField))
.map(v -> jsonValueFromMessageValue(fieldDescriptor, v, false, includeField, prefix))
.collect(toList());
}

switch (fieldDescriptor.getType()) {
case GROUP:
case MESSAGE:
return tableRowFromMessage((Message) fieldValue, false, includeField);
return tableRowFromMessage((Message) fieldValue, false, includeField, prefix);
case BYTES:
return BaseEncoding.base64().encode(((ByteString) fieldValue).toByteArray());
case ENUM:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;
import static org.junit.Assume.assumeTrue;

import com.google.api.core.ApiFuture;
Expand Down Expand Up @@ -80,6 +81,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -1593,6 +1595,91 @@ public void testStreamingStorageApiWriteWithAutoShardingWithErrorHandling() thro
storageWriteWithErrorHandling(true);
}

private void storageWriteWithSuccessHandling(boolean columnSubset) throws Exception {
assumeTrue(useStorageApi);
if (!useStreaming) {
assumeFalse(useStorageApiApproximate);
}
List<TableRow> elements =
IntStream.range(0, 30)
.mapToObj(Integer::toString)
.map(i -> new TableRow().set("number", i).set("string", i))
.collect(Collectors.toList());

List<TableRow> expectedSuccessElements = elements;
if (columnSubset) {
expectedSuccessElements =
elements.stream()
.map(tr -> new TableRow().set("number", tr.get("number")))
.collect(Collectors.toList());
}

TableSchema tableSchema =
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("number").setType("INTEGER"),
new TableFieldSchema().setName("string").setType("STRING")));

TestStream<TableRow> testStream =
TestStream.create(TableRowJsonCoder.of())
.addElements(
elements.get(0), Iterables.toArray(elements.subList(1, 10), TableRow.class))
.advanceProcessingTime(Duration.standardMinutes(1))
.addElements(
elements.get(10), Iterables.toArray(elements.subList(11, 20), TableRow.class))
.advanceProcessingTime(Duration.standardMinutes(1))
.addElements(
elements.get(20), Iterables.toArray(elements.subList(21, 30), TableRow.class))
.advanceWatermarkToInfinity();

BigQueryIO.Write<TableRow> write =
BigQueryIO.writeTableRows()
.to("project-id:dataset-id.table-id")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withSchema(tableSchema)
.withMethod(Method.STORAGE_WRITE_API)
.withTestServices(fakeBqServices)
.withPropagateSuccessfulStorageApiWrites(true)
.withoutValidation();
if (columnSubset) {
write =
write.withPropagateSuccessfulStorageApiWrites(
(Serializable & Predicate<String>) s -> s.equals("number"));
}
if (useStreaming) {
if (useStorageApiApproximate) {
write = write.withMethod(Method.STORAGE_API_AT_LEAST_ONCE);
} else {
write = write.withAutoSharding();
}
}

PTransform<PBegin, PCollection<TableRow>> source =
useStreaming ? testStream : Create.of(elements).withCoder(TableRowJsonCoder.of());
PCollection<TableRow> success =
p.apply(source).apply("WriteToBQ", write).getSuccessfulStorageApiInserts();

PAssert.that(success)
.containsInAnyOrder(Iterables.toArray(expectedSuccessElements, TableRow.class));

p.run().waitUntilFinish();

assertThat(
fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
containsInAnyOrder(Iterables.toArray(elements, TableRow.class)));
}

@Test
public void testStorageApiWriteWithSuccessfulRows() throws Exception {
storageWriteWithSuccessHandling(false);
}

@Test
public void testStorageApiWriteWithSuccessfulRowsColumnSubset() throws Exception {
storageWriteWithSuccessHandling(true);
}

@DefaultSchema(JavaFieldSchema.class)
static class SchemaPojo {
final String name;
Expand Down

0 comments on commit 56bfec6

Please sign in to comment.