Skip to content

Commit

Permalink
Dart Support for COS Service Provider
Browse files Browse the repository at this point in the history
  • Loading branch information
rajuGT committed Oct 22, 2024
1 parent 8a4ca54 commit 6b9ba03
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class Constants {
public static final String UDF_STORE_PROVIDER_KEY = "UDF_STORE_PROVIDER";
public static final String UDF_STORE_PROVIDER_GCS = "GCS";
public static final String UDF_STORE_PROVIDER_OSS = "OSS";
public static final String UDF_STORE_PROVIDER_COS = "COS";

public static final String PYTHON_UDF_CONFIG = "PYTHON_UDF_CONFIG";
public static final String PYTHON_UDF_ENABLE_KEY = "PYTHON_UDF_ENABLE";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStore;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DefaultDartDataStore;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos.CosDartClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.gcs.GcsDartClient;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.oss.OssDartClient;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
Expand Down Expand Up @@ -156,6 +157,10 @@ private DartDataStore getDartDataSource() {
// TODO Check if OSS SDK supports projectID selection
dartDataStoreClient = new OssDartClient();
break;
case Constants.UDF_STORE_PROVIDER_COS:
// TODO Check if COS SDK supports projectID selection
dartDataStoreClient = new CosDartClient();
break;
default:
throw new IllegalArgumentException("Unknown UDF Store Provider: " + udfStoreProvider);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos;

import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager;
import com.gotocompany.dagger.functions.exceptions.TagDoesNotExistException;
import com.gotocompany.dagger.functions.udfs.scalar.dart.DartAspects;
import com.gotocompany.dagger.functions.udfs.scalar.dart.store.DartDataStoreClient;
import com.qcloud.cos.COSClient;
import com.qcloud.cos.ClientConfig;
import com.qcloud.cos.auth.BasicCOSCredentials;
import com.qcloud.cos.auth.COSCredentials;
import com.qcloud.cos.model.COSObject;
import com.qcloud.cos.model.COSObjectInputStream;
import com.qcloud.cos.region.Region;
import com.qcloud.cos.utils.IOUtils;

import java.io.IOException;

import static com.gotocompany.dagger.common.core.Constants.UDF_TELEMETRY_GROUP_KEY;

public class CosDartClient implements DartDataStoreClient {
private static final Double BYTES_TO_KB = 1024.0;
private static final String DART_PATH = "dartpath";

private static final String ENV_COS_SECRET_ID = "COS_SECRET_ID";
private static final String ENV_COS_SECRET_KEY = "COS_SECRET_KEY";
private static final String ENV_COS_REGION = "COS_REGION";

private final COSClient libCosClient;

/**
* Instantiates a new Cos client.
*/
public CosDartClient() {
String secretID = System.getenv(ENV_COS_SECRET_ID);
String secretKey = System.getenv(ENV_COS_SECRET_KEY);
String region = System.getenv(ENV_COS_REGION); // ap-singapore

COSCredentials credentials = new BasicCOSCredentials(secretID, secretKey);
ClientConfig clientConfig = new ClientConfig(new Region(region));
libCosClient = new COSClient(credentials, clientConfig);
}


public String fetchJsonData(String udfName, GaugeStatsManager gaugeStatsManager, String bucketName, String dartName) {
COSObject cosObject = libCosClient.getObject(bucketName, dartName);
String dartJson;
byte[] contentByteArray;
try (COSObjectInputStream inputStream = cosObject.getObjectContent()) {
contentByteArray = IOUtils.toByteArray(inputStream);
dartJson = new String(contentByteArray);
} catch (IOException e) {
throw new TagDoesNotExistException("Could not find the content in oss for + dartName", e);
}
gaugeStatsManager.registerString(UDF_TELEMETRY_GROUP_KEY, udfName, DartAspects.DART_GCS_PATH.getValue(), dartName);
gaugeStatsManager.registerDouble(DART_PATH, dartName, DartAspects.DART_GCS_FILE_SIZE.getValue(), contentByteArray.length / BYTES_TO_KB);
return dartJson;
}

/**
* Instantiates a new Cos client.
* This constructor used for unit test purposes.
*
* @param libCosClient the storage
*/
public CosDartClient(COSClient libCosClient) {
this.libCosClient = libCosClient;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.gotocompany.dagger.functions.udfs.scalar.dart.store.cos;

import com.gotocompany.dagger.common.metrics.managers.GaugeStatsManager;
import com.qcloud.cos.COSClient;
import com.qcloud.cos.model.COSObject;
import com.qcloud.cos.model.COSObjectInputStream;
import org.apache.http.client.methods.HttpRequestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

import java.io.ByteArrayInputStream;

import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;

public class CosDartClientTest {

@Mock
private COSClient libCosClient;

@Mock
private COSObject cosObject;

@Mock
private GaugeStatsManager gaugeStatsManager;

@Mock
private HttpRequestBase mockRequest;

@Before
public void setup() {
initMocks(this);
}

@Test
public void shouldGetObjectFile() {
String bucketName = "bucket_name";
String udfName = "DartGet";
String dartName = "dart-get/path/to/data.json";
String jsonFileContent = "{\"name\":\"house-stark-dev\"}";

when(libCosClient.getObject(bucketName, dartName)).thenReturn(cosObject);
when(cosObject.getObjectContent()).thenReturn(new COSObjectInputStream(new ByteArrayInputStream(jsonFileContent.getBytes()), mockRequest));

CosDartClient cosDartClient = new CosDartClient(libCosClient);
String jsonData = cosDartClient.fetchJsonData(udfName, gaugeStatsManager, bucketName, dartName);

verify(libCosClient, times(1)).getObject(bucketName, dartName);
verify(cosObject, times(1)).getObjectContent();
Assert.assertEquals(jsonFileContent, jsonData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.mockito.Mock;

import java.io.ByteArrayInputStream;
import java.io.IOException;

import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;
Expand All @@ -30,7 +29,7 @@ public void setup() {
}

@Test
public void shouldGetObjectFile() throws IOException {
public void shouldGetObjectFile() {
String bucketName = "bucket_name";
String udfName = "DartGet";
String dartName = "dart-get/path/to/data.json";
Expand Down

0 comments on commit 6b9ba03

Please sign in to comment.