Skip to content

Commit

Permalink
add test and comment
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax committed Aug 27, 2024
1 parent 56bfec6 commit 303c62b
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3044,7 +3044,13 @@ public Write<T> withPropagateSuccessfulStorageApiWrites(
/**
* If called, then all successful writes will be propagated to {@link WriteResult} and
* accessible via the {@link WriteResult#getSuccessfulStorageApiInserts} method. The predicate
* allows filtering out columns from appearing in the resulting PCollection.
* allows filtering out columns from appearing in the resulting PCollection. The argument to the
* predicate is the name of the field to potentially be included in the output. Nested fields
* will be presented using . notation - e.g. a.b.c.
*
* <p>The predicate will be invoked repeatedly for every field in every message, so it is
* recommended that it be as lightweight as possible. e.g. looking up fields in a hash table
* instead of searching a list of field names.
*/
public Write<T> withPropagateSuccessfulStorageApiWrites(Predicate<String> columnsToPropagate) {
return toBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1603,14 +1603,23 @@ private void storageWriteWithSuccessHandling(boolean columnSubset) throws Except
List<TableRow> elements =
IntStream.range(0, 30)
.mapToObj(Integer::toString)
.map(i -> new TableRow().set("number", i).set("string", i))
.map(
i ->
new TableRow()
.set("number", i)
.set("string", i)
.set("nested", new TableRow().set("number", i)))
.collect(Collectors.toList());

List<TableRow> expectedSuccessElements = elements;
if (columnSubset) {
expectedSuccessElements =
elements.stream()
.map(tr -> new TableRow().set("number", tr.get("number")))
.map(
tr ->
new TableRow()
.set("number", tr.get("number"))
.set("nested", new TableRow().set("number", tr.get("number"))))
.collect(Collectors.toList());
}

Expand All @@ -1619,7 +1628,13 @@ private void storageWriteWithSuccessHandling(boolean columnSubset) throws Except
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("number").setType("INTEGER"),
new TableFieldSchema().setName("string").setType("STRING")));
new TableFieldSchema().setName("string").setType("STRING"),
new TableFieldSchema()
.setName("nested")
.setType("RECORD")
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("number").setType("INTEGER")))));

TestStream<TableRow> testStream =
TestStream.create(TableRowJsonCoder.of())
Expand All @@ -1645,7 +1660,8 @@ private void storageWriteWithSuccessHandling(boolean columnSubset) throws Except
if (columnSubset) {
write =
write.withPropagateSuccessfulStorageApiWrites(
(Serializable & Predicate<String>) s -> s.equals("number"));
(Serializable & Predicate<String>)
s -> s.equals("number") || s.equals("nested") || s.equals("nested.number"));
}
if (useStreaming) {
if (useStorageApiApproximate) {
Expand Down

0 comments on commit 303c62b

Please sign in to comment.