Skip to content

Commit

Permalink
Add capability to dagger to read python udfs from Ali(oss) and Tencen…
Browse files Browse the repository at this point in the history
…t(cosn) storage services

Given the configuration provided correctly. Set the below environment
variables accordingly to access the files stored in the respective
bucket.

Ali(oss)
- OSS_ACCESS_KEY_ID
- OSS_ACCESS_KEY_SECRET

Tencent(cos)
- COS_SECRET_ID
- COS_SECRET_KEY
- COS_REGION
  • Loading branch information
rajuGT committed Oct 15, 2024
1 parent 715a65d commit d0bb427
Show file tree
Hide file tree
Showing 11 changed files with 425 additions and 0 deletions.
2 changes: 2 additions & 0 deletions dagger-functions/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ dependencies {
dependenciesFunctionsJar group: 'org.apache.commons', name: 'commons-jexl3', version: '3.1'
dependenciesFunctionsJar group: 'org.isuper', name: 's2-geometry-library-java', version: '0.0.1'
dependenciesFunctionsJar group: 'com.google.cloud', name: 'google-cloud-storage', version: '2.23.0'
dependenciesFunctionsJar group: 'com.aliyun.oss', name: 'aliyun-sdk-oss', version: '3.18.1'
dependenciesFunctionsJar group: 'com.qcloud', name: 'cos_api', version: '5.6.227'

testImplementation project(':dagger-common').sourceSets.test.output
testImplementation group: 'junit', name: 'junit', version: '4.12'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.gotocompany.dagger.functions.udfs.python.file.source;

import com.gotocompany.dagger.functions.udfs.python.file.source.cos.CosFileSource;
import com.gotocompany.dagger.functions.udfs.python.file.source.gcs.GcsFileSource;
import com.gotocompany.dagger.functions.udfs.python.file.source.local.LocalFileSource;
import com.gotocompany.dagger.functions.udfs.python.file.source.oss.OssFileSource;

/**
* The type File source factory.
Expand All @@ -17,6 +19,10 @@ public class FileSourceFactory {
public static FileSource getFileSource(String pythonFile) {
if ("GS".equals(getFileSourcePrefix(pythonFile))) {
return new GcsFileSource(pythonFile);
} else if ("OSS".equals(getFileSourcePrefix(pythonFile))) {
return new OssFileSource(pythonFile);
} else if ("COSN".equals(getFileSourcePrefix(pythonFile))) {
return new CosFileSource(pythonFile);
} else {
return new LocalFileSource(pythonFile);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.gotocompany.dagger.functions.udfs.python.file.source.cos;

import com.qcloud.cos.COSClient;
import com.qcloud.cos.ClientConfig;
import com.qcloud.cos.auth.BasicCOSCredentials;
import com.qcloud.cos.auth.COSCredentials;
import com.qcloud.cos.model.COSObject;
import com.qcloud.cos.model.COSObjectInputStream;
import com.qcloud.cos.region.Region;
import com.qcloud.cos.utils.IOUtils;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class CosClient {

// TODO find better way to initialize clients
private final String ENV_COS_SECRET_ID = "COS_SECRET_ID";
private final String ENV_COS_SECRET_KEY = "COS_SECRET_KEY";
private final String ENV_COS_REGION = "COS_REGION";

private final COSClient libCosClient;

/**
* Instantiates a new Cos client.
*/
public CosClient() {
String secretID = System.getenv(ENV_COS_SECRET_ID);
String secretKey = System.getenv(ENV_COS_SECRET_KEY);
String region = System.getenv(ENV_COS_REGION); // ap-singapore

COSCredentials credentials = new BasicCOSCredentials(secretID, secretKey);
ClientConfig clientConfig = new ClientConfig(new Region(region));
libCosClient = new COSClient(credentials, clientConfig);
}

/**
* Instantiates a new Cos client.
* This constructor used for unit test purposes.
*
* @param libCosClient the storage
*/
public CosClient(COSClient libCosClient) {
this.libCosClient = libCosClient;
}

/**
* Get file byte [ ].
*
* @param pythonFile the python file
* @return the byte [ ]
*/
public byte[] getFile(String pythonFile) throws IOException {
List<String> file = Arrays.asList(pythonFile.replace("cosn://", "").split("/"));

String bucketName = file.get(0);
String objectName = file.stream().skip(1).collect(Collectors.joining("/"));

COSObject cosObject = libCosClient.getObject(bucketName, objectName);
try (COSObjectInputStream inputStream = cosObject.getObjectContent()) {
return IOUtils.toByteArray(inputStream);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.gotocompany.dagger.functions.udfs.python.file.source.cos;

import com.gotocompany.dagger.functions.udfs.python.file.source.FileSource;

import java.io.IOException;

public class CosFileSource implements FileSource {

private CosClient cosClient;
private final String pythonFile;

/**
* Instantiates a new Cos file source.
*
* @param pythonFile the python file
*/
public CosFileSource(String pythonFile) {
this.pythonFile = pythonFile;
}

/**
* Instantiates a new Cos file source.
*
* @param pythonFile the python file
*/
public CosFileSource(String pythonFile, CosClient cosClient) {
this.pythonFile = pythonFile;
this.cosClient = cosClient;
}

@Override
public byte[] getObjectFile() throws IOException {
return getCosClient().getFile(pythonFile);
}

/**
* Gets cos client.
*
* @return the cos client
*/
private CosClient getCosClient() {
if (this.cosClient == null) {
this.cosClient = new CosClient();
}
return this.cosClient;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.gotocompany.dagger.functions.udfs.python.file.source.oss;

import com.aliyun.core.utils.IOUtils;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
import com.aliyun.oss.model.OSSObject;
import com.aliyuncs.exceptions.ClientException;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class OssClient {

// TODO refactor to take this value from the configuration
private static final String endpoint = "oss-cn-hangzhou.aliyuncs.com";

private final OSS libOssClient;

/**
* Instantiates a new Oss client.
*/
public OssClient() {
try {
libOssClient = new OSSClientBuilder().build(endpoint, CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider());
} catch (ClientException e) {
throw new RuntimeException("failed to initialise oss client", e);
}
}

/**
* Instantiates a new OSS client.
* This constructor used for unit test purposes.
*
* @param libOssClient the storage
*/
public OssClient(OSS libOssClient) {
this.libOssClient = libOssClient;
}

/**
* Get file byte [ ].
*
* @param pythonFile the python file
* @return the byte [ ]
*/
public byte[] getFile(String pythonFile) throws IOException {
List<String> file = Arrays.asList(pythonFile.replace("oss://", "").split("/"));

String bucketName = file.get(0);
String objectName = file.stream().skip(1).collect(Collectors.joining("/"));

OSSObject ossObject = libOssClient.getObject(bucketName, objectName);
try (InputStream inputStream = ossObject.getObjectContent()) {
return IOUtils.toByteArray(inputStream);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.gotocompany.dagger.functions.udfs.python.file.source.oss;

import com.gotocompany.dagger.functions.udfs.python.file.source.FileSource;

import java.io.IOException;

public class OssFileSource implements FileSource {

private OssClient ossClient;
private final String pythonFile;

/**
* Instantiates a new Oss file source.
*
* @param pythonFile the python file
*/
public OssFileSource(String pythonFile) {
this.pythonFile = pythonFile;
}

/**
* Instantiates a new Oss file source.
*
* @param pythonFile the python file
*/
public OssFileSource(String pythonFile, OssClient ossClient) {
this.pythonFile = pythonFile;
this.ossClient = ossClient;
}

@Override
public byte[] getObjectFile() throws IOException {
return getOssClient().getFile(pythonFile);
}

/**
* Gets oss client.
*
* @return the oss client
*/
private OssClient getOssClient() {
if (this.ossClient == null) {
this.ossClient = new OssClient();
}
return this.ossClient;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.gotocompany.dagger.functions.udfs.python.file.source;

import com.gotocompany.dagger.functions.udfs.python.file.source.cos.CosFileSource;
import com.gotocompany.dagger.functions.udfs.python.file.source.gcs.GcsFileSource;
import com.gotocompany.dagger.functions.udfs.python.file.source.local.LocalFileSource;
import com.gotocompany.dagger.functions.udfs.python.file.source.oss.OssFileSource;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -24,4 +26,22 @@ public void shouldGetGcsFileSource() {

Assert.assertTrue(fileSource instanceof GcsFileSource);
}

@Test
public void shouldGetOssFileSource() {
String pythonFile = "oss://bucket-name/path/to/file/test_function.py";

FileSource fileSource = FileSourceFactory.getFileSource(pythonFile);

Assert.assertTrue(fileSource instanceof OssFileSource);
}

@Test
public void shouldGetCosnFileSource() {
String pythonFile = "cosn://bucket-name/path/to/file/test_function.py";

FileSource fileSource = FileSourceFactory.getFileSource(pythonFile);

Assert.assertTrue(fileSource instanceof CosFileSource);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.gotocompany.dagger.functions.udfs.python.file.source.cos;

import com.qcloud.cos.COSClient;
import com.qcloud.cos.model.COSObject;
import com.qcloud.cos.model.COSObjectInputStream;
import org.apache.http.client.methods.HttpRequestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Arrays;

import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;

public class CosClientTest {

@Mock
private COSClient libCosClient;

@Mock
private COSObject cosObject;

@Before
public void setup() {
initMocks(this);
}

@Test
public void shouldGetObjectFile() throws IOException {
HttpRequestBase mockRequest = Mockito.mock(HttpRequestBase.class);

String pythonFile = "cosn://bucket_name/path/to/file/python_udf.zip";
String bucketName = "bucket_name";
String objectName = "path/to/file/python_udf.zip";
String expectedValue = Arrays.toString("objectFile".getBytes());

when(libCosClient.getObject(bucketName, objectName)).thenReturn(cosObject);
when(cosObject.getObjectContent()).thenReturn(new COSObjectInputStream(new ByteArrayInputStream("objectFile".getBytes()), mockRequest));

CosClient cosClient = new CosClient(libCosClient);
byte[] actualValue = cosClient.getFile(pythonFile);

verify(libCosClient, times(1)).getObject(bucketName, objectName);
verify(cosObject, times(1)).getObjectContent();
Assert.assertEquals(expectedValue, Arrays.toString(actualValue));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.gotocompany.dagger.functions.udfs.python.file.source.cos;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

public class CosFileSourceTest {

@Mock
private CosClient cosClient;

@Before
public void setup() {
initMocks(this);
}

@Test
public void shouldGetObjectFile() throws IOException {
ClassLoader classLoader = getClass().getClassLoader();
String pythonFile = classLoader.getResource("python_udf.zip").getFile();
byte[] expectedObject = Files.readAllBytes(Paths.get(pythonFile));

when(cosClient.getFile(pythonFile)).thenReturn(expectedObject);
CosFileSource cosFileSource = new CosFileSource(pythonFile, cosClient);

byte[] actualObject = cosFileSource.getObjectFile();

Assert.assertEquals(expectedObject, actualObject);
}
}
Loading

0 comments on commit d0bb427

Please sign in to comment.