Skip to content

Commit

Permalink
Added sample implmentation and cross functional UT for custom transfo…
Browse files Browse the repository at this point in the history
…rmation (#1732)

* Added sample implmentation and cross functional UT for custom transformation

* added UTs

* minor fix
  • Loading branch information
shreyakhajanchi authored Jul 18, 2024
1 parent 0701be1 commit d7db191
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ public void processElement(ProcessContext c) {
c.output(DatastreamToSpannerConstants.FILTERED_EVENT_TAG, msg.getOriginalPayload());
return;
}
if (migrationTransformationResponse.getResponseRow() != null) {
if (migrationTransformationResponse != null
&& migrationTransformationResponse.getResponseRow() != null) {
changeEvent =
ChangeEventToMapConvertor.transformChangeEventViaCustomTransformation(
changeEvent, migrationTransformationResponse.getResponseRow());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.custom;

import com.google.cloud.teleport.v2.spanner.exceptions.InvalidTransformationException;
import com.google.cloud.teleport.v2.spanner.utils.ISpannerMigrationTransformer;
import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationRequest;
import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationResponse;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This is a sample class to be implemented by the customer. All the relevant dependencies have been
* added and users need to implement the toSpannerRow() and/or toSourceRow() method for forward and
* reverse replication flows respectively
*/
public class CustomTransformationFetcher implements ISpannerMigrationTransformer {

private static final Logger LOG = LoggerFactory.getLogger(CustomShardIdFetcher.class);

@Override
public void init(String customParameters) {
LOG.info("init called with {}", customParameters);
}

@Override
public MigrationTransformationResponse toSpannerRow(MigrationTransformationRequest request)
throws InvalidTransformationException {
if (request.getTableName().equals("Customers")) {
Map<String, Object> requestRow = request.getRequestRow();
Map<String, Object> responseRow = new HashMap<>();

responseRow.put(
"full_name", requestRow.get("first_name") + " " + requestRow.get("last_name"));
responseRow.put("migration_shard_id", request.getShardId() + "_" + requestRow.get("id"));
MigrationTransformationResponse response =
new MigrationTransformationResponse(responseRow, false);
return response;
}
return new MigrationTransformationResponse(null, false);
}

@Override
public MigrationTransformationResponse toSourceRow(MigrationTransformationRequest request)
throws InvalidTransformationException {
if (request.getTableName().equals("Customers")) {
Map<String, Object> requestRow = request.getRequestRow();
Map<String, Object> responseRow = new HashMap<>();
String fullName = (String) requestRow.get("full_name");
String[] nameParts = fullName.split(" ", 2);
responseRow.put("first_name", nameParts[0]);
responseRow.put("last_name", nameParts[1]);
String migrationShardId = (String) requestRow.get("migration_shard_id");
String[] idParts = migrationShardId.split("_", 2);
responseRow.put("id", idParts[1]);
MigrationTransformationResponse response =
new MigrationTransformationResponse(responseRow, false);
return response;
}
return new MigrationTransformationResponse(null, false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.custom;

import static org.junit.Assert.assertEquals;

import com.google.cloud.teleport.v2.spanner.exceptions.InvalidTransformationException;
import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationRequest;
import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationResponse;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;

/** Tests for CustomTransformationFetcher class. */
public class CustomTransformationFetcherTest {
@Test
public void endToEndTest() throws InvalidTransformationException {
CustomTransformationFetcher customTransformationFetcher = new CustomTransformationFetcher();
Map<String, Object> requestRow = new HashMap<>();
requestRow.put("first_name", "abc");
requestRow.put("last_name", "xyz");
requestRow.put("id", "123");
MigrationTransformationRequest request =
new MigrationTransformationRequest("Customers", requestRow, "ls1", "");
MigrationTransformationResponse response = customTransformationFetcher.toSpannerRow(request);
MigrationTransformationResponse response2 =
customTransformationFetcher.toSourceRow(
new MigrationTransformationRequest("Customers", response.getResponseRow(), "ls1", ""));
assertEquals(request.getRequestRow(), response2.getResponseRow());
}

@Test
public void testToSourceRowInvalidTableName() throws InvalidTransformationException {
CustomTransformationFetcher customTransformationFetcher = new CustomTransformationFetcher();
Map<String, Object> requestRow = new HashMap<>();
requestRow.put("first_name", "abc");
requestRow.put("last_name", "xyz");
requestRow.put("id", "123");
MigrationTransformationRequest request =
new MigrationTransformationRequest("xyz", requestRow, "ls1", "");
MigrationTransformationResponse response = customTransformationFetcher.toSourceRow(request);
assertEquals(response.getResponseRow(), null);
}

@Test
public void testToSpannerRowInvalidTableName() throws InvalidTransformationException {
CustomTransformationFetcher customTransformationFetcher = new CustomTransformationFetcher();
Map<String, Object> requestRow = new HashMap<>();
requestRow.put("first_name", "abc");
requestRow.put("last_name", "xyz");
requestRow.put("id", "123");
MigrationTransformationRequest request =
new MigrationTransformationRequest("xyz", requestRow, "ls1", "");
MigrationTransformationResponse response = customTransformationFetcher.toSpannerRow(request);
assertEquals(response.getResponseRow(), null);
}
}

0 comments on commit d7db191

Please sign in to comment.