Skip to content

Commit

Permalink
[Feature-DTStack#1933][s3] Support more data type conversions
Browse files Browse the repository at this point in the history
  • Loading branch information
libailin authored and lihongwei committed Nov 29, 2024
1 parent 1d77727 commit 99ed97d
Showing 1 changed file with 58 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,27 @@
import com.dtstack.chunjun.converter.IDeserializationConverter;
import com.dtstack.chunjun.converter.ISerializationConverter;
import com.dtstack.chunjun.throwable.UnsupportedTypeException;
import com.dtstack.chunjun.util.DateUtil;

import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;

import org.apache.commons.lang3.StringUtils;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;

public class S3SqlConverter extends AbstractRowConverter<String[], RowData, String[], LogicalType> {
Expand Down Expand Up @@ -82,12 +93,41 @@ protected IDeserializationConverter createInternalConverter(LogicalType type) {
return val -> Float.valueOf((String) val);
case DOUBLE:
return val -> Double.valueOf((String) val);
case DECIMAL:
final int precision = ((DecimalType) type).getPrecision();
final int scale = ((DecimalType) type).getScale();
return val ->
val instanceof BigInteger
? DecimalData.fromBigDecimal(
new BigDecimal((BigInteger) val, 0), precision, scale)
: DecimalData.fromBigDecimal(
StringUtils.isNotEmpty(String.valueOf(val))
? new BigDecimal(String.valueOf(val))
: BigDecimal.ZERO,
precision,
scale);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return val -> {
if (val instanceof String) {
return TimestampData.fromTimestamp(Timestamp.valueOf((String) val));
} else if (val instanceof LocalDateTime) {
return TimestampData.fromTimestamp(Timestamp.valueOf((LocalDateTime) val));
} else {
return TimestampData.fromTimestamp(((Timestamp) val));
}
};
case CHAR:
case VARCHAR:
return val -> StringData.fromString((String) val);
case DATE:
return val ->
(int) ((Date.valueOf(String.valueOf(val))).toLocalDate().toEpochDay());
return val -> {
if (StringUtils.isEmpty(String.valueOf(val))) {
return null;
}
Date date = new Date(DateUtil.stringToDate(String.valueOf(val)).getTime());
return (int) date.toLocalDate().toEpochDay();
};
case TIME_WITHOUT_TIME_ZONE:
return val ->
(int)
Expand Down Expand Up @@ -124,6 +164,22 @@ protected ISerializationConverter<String[]> createExternalConverter(LogicalType
output[index] =
Time.valueOf(LocalTime.ofNanoOfDay(val.getInt(index) * 1_000_000L))
.toString();
case DECIMAL:
return (rowData, index, data) ->
data[index] =
String.valueOf(
rowData.getDecimal(
index,
((DecimalType) type).getPrecision(),
((DecimalType) type).getScale()));
case TIMESTAMP_WITHOUT_TIME_ZONE:
return (rowData, index, data) ->
data[index] =
String.valueOf(
rowData.getTimestamp(
index,
((TimestampType) type).getPrecision())
.toTimestamp());
default:
throw new UnsupportedTypeException(type.toString());
}
Expand Down

0 comments on commit 99ed97d

Please sign in to comment.