diff --git a/highflip-vendors/highflip-adaptor-fate/pom.xml b/highflip-vendors/highflip-adaptor-fate/pom.xml
index 3fbcc3d..c6622a8 100644
--- a/highflip-vendors/highflip-adaptor-fate/pom.xml
+++ b/highflip-vendors/highflip-adaptor-fate/pom.xml
@@ -17,6 +17,16 @@
+
+ com.baidu.highflip
+ highflip-proto
+ 1.0.0-SNAPSHOT
+
+
+ com.baidu.highflip
+ highflip-core
+ 1.0.0-SNAPSHOT
+
org.junit.jupiter
junit-jupiter
@@ -30,21 +40,42 @@
test
- com.baidu.highflip
- highflip-core
- 1.0.0-SNAPSHOT
- compile
+ com.google.protobuf
+ protobuf-java
+ 3.21.1
io.github.openfeign
feign-core
11.10
+
+ org.slf4j
+ slf4j-api
+ 1.7.36
+
+
+
+ org.slf4j
+ slf4j-simple
+ 1.7.36
+
io.github.openfeign
feign-jackson
11.10
+
+ io.github.openfeign
+ feign-httpclient
+ 11.10
+
+
+
+ org.apache.commons
+ commons-compress
+ 1.21
+
@@ -54,6 +85,25 @@
maven-surefire-plugin
2.22.2
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ 3.3.0
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
diff --git a/highflip-vendors/highflip-adaptor-fate/src/main/java/com/webank/ai/fate/adaptor/DataAdaptor.java b/highflip-vendors/highflip-adaptor-fate/src/main/java/com/webank/ai/fate/adaptor/DataAdaptor.java
index 52b367f..45952eb 100644
--- a/highflip-vendors/highflip-adaptor-fate/src/main/java/com/webank/ai/fate/adaptor/DataAdaptor.java
+++ b/highflip-vendors/highflip-adaptor-fate/src/main/java/com/webank/ai/fate/adaptor/DataAdaptor.java
@@ -1,10 +1,167 @@
package com.webank.ai.fate.adaptor;
+import static com.webank.ai.fate.common.FateConstants.DATA_ID_SEPARATOR;
+
+import com.baidu.highflip.core.entity.runtime.Data;
+import com.baidu.highflip.core.entity.runtime.basic.DataCategory;
+import com.baidu.highflip.core.entity.runtime.basic.KeyPair;
+import com.baidu.highflip.core.utils.Foreach;
+import com.webank.ai.fate.client.form.ResultForm;
+import com.webank.ai.fate.common.DataMultipartFile;
+import com.webank.ai.fate.common.DecompressUtils;
import com.webank.ai.fate.context.FateContext;
+import feign.Response;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.web.multipart.MultipartFile;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Getter
+public class DataAdaptor implements com.baidu.highflip.core.adaptor.DataAdaptor {
-public class DataAdaptor {
+ private final FateContext context;
+
+ private final String DEFAULT_NAMESPACES = "HIGH-FLIP";
+
+ private final String DEFAULT_DELIMITER = ",";
public DataAdaptor(FateContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public Data updateData(Data data) {
+ return null;
+ }
+
+ @Override
+ public int getDataCount() {
+ return 0;
+ }
+
+ @Override
+ public Data getDataByIndex(int index, Data data) {
+ return null;
+ }
+
+ @Override
+ public void deleteData(Data data) {
+ getContext().getClient().deleteData(data.getBingingId(), DEFAULT_NAMESPACES);
+ }
+
+ @Override
+ public InputStream readDataRaw(Data data) {
+ return null;
+ }
+
+ @Override
+ public Iterator> readDataDense(Data data) {
+ if(data == null) {
+ throw new RuntimeException("data is null.");
+ }
+ log.info("data:{}", data);
+
+ if (DataCategory.RESULT_DATA.equals(data.getCategory())) {
+ String jobId = (String) data.getBinding().get("jobId");
+ String componentName = (String) data.getBinding().get("componentName");
+ String role = (String) data.getBinding().get("role");
+ String partyId = (String) data.getBinding().get("partyId");
+ log.info("jobId: {}, componentName: {}, role: {}, partyId: {}",
+ jobId, componentName, role, partyId);
+
+ try (Response response = getContext().getClient()
+ .downloadComponentResultData(jobId, componentName,
+ role, partyId)) {
+ String content = DecompressUtils.decompressTarGzToStringMap(
+ response.body().asInputStream(),
+ s -> s.contains("csv")).get("data.csv");
+ return Arrays.stream(content.split("\n"))
+ .map(s -> Arrays.stream(s.split(DEFAULT_DELIMITER))
+ .map(d -> (Object) d)
+ .collect(Collectors.toList()))
+ .collect(Collectors.toList()).iterator();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ try (Response response = getContext().getClient()
+ .downloadData(data.getName(),
+ DEFAULT_NAMESPACES)) {
+ String content = DecompressUtils.decompressTarGzToStringMap(
+ response.body().asInputStream(),
+ s -> s.contains("csv")).get("table.csv");
+ return Arrays.stream(content.split("\n"))
+ .map(s -> Arrays.stream(s.split(DEFAULT_DELIMITER))
+ .map(d -> (Object) d)
+ .collect(Collectors.toList()))
+ .collect(Collectors.toList()).iterator();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public Iterator> readDataSparse(Data data) {
+ return null;
+ }
+
+ @Override
+ public Data createData(Data data) {
+ return null;
+ }
+
+ @Override
+ public void writeDataRaw(Data data, InputStream body) {
+
+ }
+
+ @Override
+ public void writeDataDense(Data data, Iterator> body) {
+ StringBuilder stringBuilder = new StringBuilder();
+ try {
+ String headers =
+ data.getColumns().stream().map(c -> c.getName())
+ .collect(Collectors.joining(DEFAULT_DELIMITER));
+ stringBuilder.append(headers).append("\n");
+ final Foreach> foreach = Foreach.from(body);
+ for (List