Skip to content

Commit

Permalink
[feature](paimon)support paimon with dlf for 2.1 (apache#41247) (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
wuwenchi authored Oct 13, 2024
1 parent 629f500 commit ec0c008
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.paimon;

import org.apache.doris.datasource.property.constants.PaimonProperties;

import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;

public class PaimonDLFExternalCatalog extends PaimonExternalCatalog {
private static final Logger LOG = LogManager.getLogger(PaimonDLFExternalCatalog.class);

public PaimonDLFExternalCatalog(long catalogId, String name, String resource,
Map<String, String> props, String comment) {
super(catalogId, name, resource, props, comment);
}

@Override
protected void initLocalObjectsImpl() {
super.initLocalObjectsImpl();
catalogType = PAIMON_DLF;
catalog = createCatalog();
}

@Override
protected void setPaimonCatalogOptions(Map<String, String> properties, Map<String, String> options) {
options.put(PaimonProperties.PAIMON_CATALOG_TYPE, PaimonProperties.PAIMON_HMS_CATALOG);
options.put(PaimonProperties.PAIMON_METASTORE_CLIENT, ProxyMetaStoreClient.class.getName());
options.put(PaimonProperties.PAIMON_OSS_ENDPOINT,
properties.get(PaimonProperties.PAIMON_OSS_ENDPOINT));
options.put(PaimonProperties.PAIMON_OSS_ACCESS_KEY,
properties.get(PaimonProperties.PAIMON_OSS_ACCESS_KEY));
options.put(PaimonProperties.PAIMON_OSS_SECRET_KEY,
properties.get(PaimonProperties.PAIMON_OSS_SECRET_KEY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public abstract class PaimonExternalCatalog extends ExternalCatalog {
public static final String PAIMON_CATALOG_TYPE = "paimon.catalog.type";
public static final String PAIMON_FILESYSTEM = "filesystem";
public static final String PAIMON_HMS = "hms";
public static final String PAIMON_DLF = "dlf";
protected String catalogType;
protected Catalog catalog;
protected AuthenticationConfig authConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.property.constants.HMSProperties;

import org.apache.commons.lang3.StringUtils;

Expand All @@ -38,6 +39,9 @@ public static ExternalCatalog createCatalog(long catalogId, String name, String
return new PaimonHMSExternalCatalog(catalogId, name, resource, props, comment);
case PaimonExternalCatalog.PAIMON_FILESYSTEM:
return new PaimonFileExternalCatalog(catalogId, name, resource, props, comment);
case PaimonExternalCatalog.PAIMON_DLF:
props.put(HMSProperties.HIVE_METASTORE_TYPE, HMSProperties.DLF_TYPE);
return new PaimonDLFExternalCatalog(catalogId, name, resource, props, comment);
default:
throw new DdlException("Unknown " + PaimonExternalCatalog.PAIMON_CATALOG_TYPE
+ " value: " + metastoreType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ protected Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) {
@Override
public TTableDescriptor toThrift() {
List<Column> schema = getFullSchema();
if (PaimonExternalCatalog.PAIMON_HMS.equals(getPaimonCatalogType()) || PaimonExternalCatalog.PAIMON_FILESYSTEM
.equals(getPaimonCatalogType())) {
if (PaimonExternalCatalog.PAIMON_HMS.equals(getPaimonCatalogType())
|| PaimonExternalCatalog.PAIMON_FILESYSTEM.equals(getPaimonCatalogType())
|| PaimonExternalCatalog.PAIMON_DLF.equals(getPaimonCatalogType())) {
THiveTable tHiveTable = new THiveTable(dbName, name, new HashMap<>());
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, schema.size(), 0,
getName(), dbName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public static Map<String, String> convertToMetaProperties(Map<String, String> pr
}
metaProperties = convertToGlueProperties(props, credential);
} else if (props.containsKey(DLFProperties.ENDPOINT)
|| props.containsKey(DLFProperties.REGION)
|| props.containsKey(DataLakeConfig.CATALOG_ENDPOINT)) {
metaProperties = convertToDLFProperties(props, DLFProperties.getCredential(props));
} else if (props.containsKey(S3Properties.Env.ENDPOINT)) {
Expand Down Expand Up @@ -436,10 +437,18 @@ private static void getPropertiesFromDLFProps(Map<String, String> props,
if (Strings.isNullOrEmpty(uid)) {
throw new IllegalArgumentException("Required dlf property: " + DLFProperties.UID);
}
String endpoint = props.get(DLFProperties.ENDPOINT);
props.put(DataLakeConfig.CATALOG_ENDPOINT, endpoint);
props.put(DataLakeConfig.CATALOG_REGION_ID, props.getOrDefault(DLFProperties.REGION,
S3Properties.getRegionOfEndpoint(endpoint)));

// region
String region = props.get(DLFProperties.REGION);
if (Strings.isNullOrEmpty(region)) {
throw new IllegalArgumentException("Required dlf property: " + DLFProperties.REGION);
}
props.put(DataLakeConfig.CATALOG_REGION_ID, region);

// endpoint
props.put(DataLakeConfig.CATALOG_ENDPOINT,
props.getOrDefault(DLFProperties.ENDPOINT, getDlfEndpointByRegion(region)));

props.put(DataLakeConfig.CATALOG_PROXY_MODE, props.getOrDefault(DLFProperties.PROXY_MODE, "DLF_ONLY"));
props.put(DataLakeConfig.CATALOG_ACCESS_KEY_ID, credential.getAccessKey());
props.put(DataLakeConfig.CATALOG_ACCESS_KEY_SECRET, credential.getSecretKey());
Expand Down Expand Up @@ -500,6 +509,10 @@ private static String getOssEndpoint(String region, boolean publicAccess) {
return prefix + region + suffix;
}

private static String getDlfEndpointByRegion(String region) {
return "dlf-vpc." + region + ".aliyuncs.com";
}

private static Map<String, String> convertToGlueProperties(Map<String, String> props, CloudCredential credential) {
// convert doris glue property to glue properties, s3 client property and BE property
String metastoreType = props.get(HMSProperties.HIVE_METASTORE_TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class PaimonProperties {
public static final String PAIMON_OSS_SECRET_KEY = org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET;
public static final String PAIMON_HMS_CATALOG = "hive";
public static final String PAIMON_FILESYSTEM_CATALOG = "filesystem";
public static final String PAIMON_METASTORE_CLIENT = "metastore.client.class";


public static Map<String, String> convertToS3Properties(Map<String, String> properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
import org.apache.doris.datasource.paimon.PaimonDLFExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalDatabase;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
Expand Down Expand Up @@ -241,7 +242,8 @@ public class GsonUtils {
.registerSubtype(PaimonHMSExternalCatalog.class, PaimonHMSExternalCatalog.class.getSimpleName())
.registerSubtype(PaimonFileExternalCatalog.class, PaimonFileExternalCatalog.class.getSimpleName())
.registerSubtype(MaxComputeExternalCatalog.class, MaxComputeExternalCatalog.class.getSimpleName())
.registerSubtype(TestExternalCatalog.class, TestExternalCatalog.class.getSimpleName());
.registerSubtype(TestExternalCatalog.class, TestExternalCatalog.class.getSimpleName())
.registerSubtype(PaimonDLFExternalCatalog.class, PaimonDLFExternalCatalog.class.getSimpleName());

// routine load data source
private static RuntimeTypeAdapterFactory<AbstractDataSourceProperties> rdsTypeAdapterFactory =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !c1 --
1 a
2 b

-- !c2 --
1 a
2 b

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_paimon_dlf_catalog", "p2,external,paimon,external_remote,external_remote_paimon") {
String enabled = context.config.otherConfigs.get("enablePaimonTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
return
}

try {
String catalog = "test_paimon_dlf_catalog"
String uid = context.config.otherConfigs.get("dlf_uid")
String region = context.config.otherConfigs.get("dlf_region")
String catalog_id = context.config.otherConfigs.get("dlf_catalog_id")
String access_key = context.config.otherConfigs.get("dlf_access_key")
String secret_key = context.config.otherConfigs.get("dlf_secret_key")


sql """drop catalog if exists ${catalog};"""
sql """
create catalog if not exists ${catalog} properties (
"type" = "paimon",
"paimon.catalog.type" = "dlf",
"warehouse" = "oss://selectdb-qa-datalake-test/p2_regression_case",
"dlf.proxy.mode" = "DLF_ONLY",
"dlf.uid" = "${uid}",
"dlf.region" = "${region}",
"dlf.catalog.id" = "${catalog_id}",
"dlf.access_key" = "${access_key}",
"dlf.secret_key" = "${secret_key}"
);
"""

sql """ use ${catalog}.regression_paimon """

sql """set force_jni_scanner=false"""
qt_c1 """ select * from tb_simple order by id """
sql """set force_jni_scanner=true"""
qt_c2 """ select * from tb_simple order by id """

} finally {
sql """set force_jni_scanner=false"""
}
}

0 comments on commit ec0c008

Please sign in to comment.