Skip to content

Commit

Permalink
Merge branch 'apache:master' into INLONG-11199
Browse files Browse the repository at this point in the history
  • Loading branch information
qy-liuhuo authored Oct 11, 2024
2 parents 31c1fea + 0e555f9 commit fd959ca
Show file tree
Hide file tree
Showing 37 changed files with 2,180 additions and 31 deletions.
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug-report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ body:
multiple: false
options:
- 'master'
- '2.0.0'
- '1.13.0'
- '1.12.0'
- '1.11.0'
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/ci_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ jobs:
dpkg-query -Wf '${Installed-Size}\t${Package}\n' | sort -n | tail -n 100
df -h
echo "Removing large packages"
sudo apt-get remove -y '^dotnet-.*'
echo "Removing mongodb-.* packages..."
sudo apt-get remove -y '^mongodb-.*'
echo "Removed mongodb-.* packages..."
df -h
sudo apt-get remove -y azure-cli google-chrome-stable google-cloud-cli microsoft-edge-stable firefox powershell mono-devel libgl1-mesa-dri
df -h
echo "Removing large directories"
Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/ci_ut.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4

# Release space size
- name: Remove unnecessary packages
run: |
echo "=== Before pruning ==="
Expand All @@ -68,8 +69,10 @@ jobs:
dpkg-query -Wf '${Installed-Size}\t${Package}\n' | sort -n | tail -n 100
df -h
echo "Removing large packages"
sudo apt-get remove -y '^dotnet-.*'
echo "Removing mongodb-.* packages..."
sudo apt-get remove -y '^mongodb-.*'
echo "Removed mongodb-.* packages..."
df -h
sudo apt-get remove -y azure-cli google-chrome-stable google-cloud-cli microsoft-edge-stable firefox powershell mono-devel libgl1-mesa-dri
df -h
echo "Removing large directories"
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/ci_ut_flink13.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ jobs:
dpkg-query -Wf '${Installed-Size}\t${Package}\n' | sort -n | tail -n 100
df -h
echo "Removing large packages"
sudo apt-get remove -y '^dotnet-.*'
echo "Removing mongodb-.* packages..."
sudo apt-get remove -y '^mongodb-.*'
echo "Removed mongodb-.* packages..."
df -h
sudo apt-get remove -y azure-cli google-chrome-stable google-cloud-cli microsoft-edge-stable firefox powershell mono-devel libgl1-mesa-dri
df -h
echo "Removing large directories"
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/ci_ut_flink15.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ jobs:
dpkg-query -Wf '${Installed-Size}\t${Package}\n' | sort -n | tail -n 100
df -h
echo "Removing large packages"
sudo apt-get remove -y '^dotnet-.*'
echo "Removing mongodb-.* packages..."
sudo apt-get remove -y '^mongodb-.*'
echo "Removed mongodb-.* packages..."
df -h
sudo apt-get remove -y azure-cli google-chrome-stable google-cloud-cli microsoft-edge-stable firefox powershell mono-devel libgl1-mesa-dri
df -h
echo "Removing large directories"
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/ci_ut_flink18.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ jobs:
dpkg-query -Wf '${Installed-Size}\t${Package}\n' | sort -n | tail -n 100
df -h
echo "Removing large packages"
sudo apt-get remove -y '^dotnet-.*'
echo "Removing mongodb-.* packages..."
sudo apt-get remove -y '^mongodb-.*'
echo "Removed mongodb-.* packages..."
df -h
sudo apt-get remove -y azure-cli google-chrome-stable google-cloud-cli microsoft-edge-stable firefox powershell mono-devel libgl1-mesa-dri
df -h
echo "Removing large directories"
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/codeql_analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ jobs:
dpkg-query -Wf '${Installed-Size}\t${Package}\n' | sort -n | tail -n 100
df -h
echo "Removing large packages"
sudo apt-get remove -y '^dotnet-.*'
echo "Removing mongodb-.* packages..."
sudo apt-get remove -y '^mongodb-.*'
echo "Removed mongodb-.* packages..."
df -h
sudo apt-get remove -y azure-cli google-chrome-stable google-cloud-cli microsoft-edge-stable firefox powershell mono-devel libgl1-mesa-dri
df -h
echo "Removing large directories"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.encode;

import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.PbSinkInfo;
import org.apache.inlong.sdk.transform.process.Context;

import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;

import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class PbSinkEncoder implements SinkEncoder<byte[]> {

protected PbSinkInfo sinkInfo;

private Descriptors.Descriptor dynamicDescriptor;

private final Map<String, Descriptors.FieldDescriptor.Type> fieldTypes;

public PbSinkEncoder(PbSinkInfo pbSinkInfo) {
this.sinkInfo = pbSinkInfo;
this.fieldTypes = new HashMap<>();
for (FieldInfo field : pbSinkInfo.getFields()) {
fieldTypes.put(field.getName(), Descriptors.FieldDescriptor.Type.STRING);
}
// decode protoDescription
this.dynamicDescriptor = decodeProtoDescription(pbSinkInfo.getProtoDescription());
}

@Override
public byte[] encode(SinkData sinkData, Context context) {
try {
DynamicMessage.Builder dynamicBuilder = DynamicMessage.newBuilder(dynamicDescriptor);

// Dynamically fill message fields
for (String key : sinkData.keyList()) {
Descriptors.FieldDescriptor fieldDescriptor = dynamicDescriptor.findFieldByName(key);
if (fieldDescriptor != null) {
String fieldValue = sinkData.getField(key);
if (fieldValue != null) {
Object value = convertValue(fieldDescriptor, fieldValue);
dynamicBuilder.setField(fieldDescriptor, value);
}
}
}
// Serialize to byte[]
return dynamicBuilder.build().toByteArray();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Decode the base64-encoded proto description into a Descriptor
*
* @param base64ProtoDescription The base64-encoded proto description
* @return The dynamic Descriptor
*/
private Descriptors.Descriptor decodeProtoDescription(String base64ProtoDescription) {
try {
byte[] protoBytes = Base64.getDecoder().decode(base64ProtoDescription);
DescriptorProtos.FileDescriptorSet fileDescriptorSet =
DescriptorProtos.FileDescriptorSet.parseFrom(protoBytes);
Descriptors.FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom(
fileDescriptorSet.getFile(0), new Descriptors.FileDescriptor[]{});
return fileDescriptor.getMessageTypes().get(0);
} catch (Exception e) {
throw new RuntimeException("Failed to decode protoDescription", e);
}
}

private Object convertValue(Descriptors.FieldDescriptor fieldDescriptor, Object value) {
switch (fieldDescriptor.getType()) {
case STRING:
return value.toString();
case INT32:
return Integer.parseInt(value.toString());
case INT64:
return Long.parseLong(value.toString());
case BOOL:
return Boolean.parseBoolean(value.toString());
case BYTES:
return ByteString.copyFromUtf8(value.toString());
default:
throw new IllegalArgumentException("Unsupported field type: " + fieldDescriptor.getType());
}
}

@Override
public List<FieldInfo> getFields() {
return sinkInfo.getFields();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
import org.apache.inlong.sdk.transform.pojo.ParquetSinkInfo;
import org.apache.inlong.sdk.transform.pojo.PbSinkInfo;

public class SinkEncoderFactory {

Expand All @@ -39,4 +40,9 @@ public static MapSinkEncoder createMapEncoder(MapSinkInfo mapSinkInfo) {
public static ParquetSinkEncoder createParquetEncoder(ParquetSinkInfo parquetSinkInfo) {
return new ParquetSinkEncoder(parquetSinkInfo);
}

public static PbSinkEncoder createPbEncoder(PbSinkInfo pbSinkInfo) {
return new PbSinkEncoder(pbSinkInfo);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.pojo;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import lombok.experimental.SuperBuilder;
import org.apache.commons.collections.CollectionUtils;

import java.util.List;

@JsonIgnoreProperties(ignoreUnknown = true)
@SuperBuilder
@Data
public class PbSinkInfo extends SinkInfo {

private List<FieldInfo> fields;
private String protoDescription;

public PbSinkInfo(
@JsonProperty("charset") String charset,
@JsonProperty("protoDescription") String protoDescription,
@JsonProperty("fields") List<FieldInfo> fields) {
super(SinkInfo.PB, charset);
if (CollectionUtils.isEmpty(fields)) {
throw new IllegalArgumentException("failed to init pb sink info, fieldInfos is empty");
}
this.protoDescription = protoDescription;
this.fields = fields;
}

/**
* get fields
* @return the fields
*/
@JsonProperty("fields")
public List<FieldInfo> getFields() {
return fields;
}

/**
* set fields
* @param fields the fields to set
*/
public void setFields(List<FieldInfo> fields) {
this.fields = fields;
}

/**
* get protoDescription
* @return the protoDescription
*/
@JsonProperty("protoDescription")
public String getProtoDescription() {
return protoDescription;
}

/**
* set protoDescription
* @param protoDescription the protoDescription to set
*/
public void setProtoDescription(String protoDescription) {
this.protoDescription = protoDescription;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public abstract class SinkInfo {
public static final String KV = "kv";
public static final String ES_MAP = "es_map";
public static final String PARQUET = "parquet";
public static final String PB = "pb";

@JsonIgnore
private String type;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.util.List;

/**
* LengthFunction
* description: char_length(string)
* - return the character length of the string
* - return NULL if the string is NULL
*/
@TransformFunction(names = {"char_length"})
public class CharLengthFunction implements ValueParser {

private final ValueParser stringParser;

public CharLengthFunction(Function expr) {
List<Expression> expressions = expr.getParameters().getExpressions();
stringParser = OperatorTools.buildParser(expressions.get(0));
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object stringObject = stringParser.parse(sourceData, rowIndex, context);
if (stringObject == null) {
return null;
}
String str = OperatorTools.parseString(stringObject);
return str.length();
}
}
Loading

0 comments on commit fd959ca

Please sign in to comment.