Skip to content

Commit

Permalink
[Feature][Connector-V2][Milvus] Support Milvus source & sink
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas-HuWei committed Jul 10, 2024
1 parent 57e5627 commit 839437e
Show file tree
Hide file tree
Showing 35 changed files with 2,967 additions and 1 deletion.
39 changes: 39 additions & 0 deletions docs/en/connector-v2/sink/Mivlus.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Milvus

> Milvus sink connector
## Description

Write data to Milvus or Zilliz Cloud

## Key Features

- [x] [batch](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [column projection](../../concept/connector-v2-features.md)

## Sink Options

| Name | Type | Required | Default | Description |
|----------------------|---------|----------|------------------------------|-----------------------------------------------------------|
| url | String | Yes | - | The URL to connect to Milvus or Zilliz Cloud. |
| token | String | Yes | - | User:password |
| database | String | No | - | Write data to which database, default is source database. |
| schema_save_mode | enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Auto create table when table not exist. |
| enable_auto_id | boolean | No | false | Primary key column enable autoId. |
| enable_upsert | boolean | No | false | Upsert data not insert. |
| enable_dynamic_field | boolean | No | true | Enable create table with dynamic field. |
| batch_size | int | No | 1000 | Write batch size. |

## Task Example

```bash
sink {
Milvus {
url = "http://127.0.0.1:19530"
token = "username:password"
batch_size = 1000
}
}
```

35 changes: 35 additions & 0 deletions docs/en/connector-v2/source/Mivlus.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Milvus

> Milvus source connector
## Description

Read data from Milvus or Zilliz Cloud

## Key Features

- [x] [batch](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [column projection](../../concept/connector-v2-features.md)

## Source Options

| Name | Type | Required | Default | Description |
|------------|--------|----------|---------|--------------------------------------------------------------------------------------------|
| url | String | Yes | - | The URL to connect to Milvus or Zilliz Cloud. |
| token | String | Yes | - | User:password |
| database | String | Yes | default | Read data from which database. |
| collection | String | No | - | If set, will only read one collection, otherwise will read all collections under database. |

## Task Example

```bash
source {
Milvus {
url = "http://127.0.0.1:19530"
token = "username:password"
database = "default"
}
}
```

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,25 @@ public class PrimaryKey implements Serializable {

private final List<String> columnNames;

private Boolean enableAutoId;

public PrimaryKey(String primaryKey, List<String> columnNames) {
this.primaryKey = primaryKey;
this.columnNames = columnNames;
this.enableAutoId = null;
}

public static boolean isPrimaryKeyField(PrimaryKey primaryKey, String fieldName) {
if (primaryKey == null || primaryKey.getColumnNames() == null) {
return false;
}
return primaryKey.getColumnNames().contains(fieldName);
}

public static PrimaryKey of(String primaryKey, List<String> columnNames, Boolean autoId) {
return new PrimaryKey(primaryKey, columnNames, autoId);
}

public static PrimaryKey of(String primaryKey, List<String> columnNames) {
return new PrimaryKey(primaryKey, columnNames);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public final class TableSchema implements Serializable {

private final List<ConstraintKey> constraintKeys;

private List<VectorIndex> vectorIndexes;

private Boolean enableDynamicField;

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -68,6 +72,10 @@ public static final class Builder {

private final List<ConstraintKey> constraintKeys = new ArrayList<>();

private final List<VectorIndex> vectorIndexes = new ArrayList<>();

private Boolean enableDynamicField;

public Builder columns(List<Column> columns) {
this.columns.addAll(columns);
return this;
Expand All @@ -83,6 +91,16 @@ public Builder primaryKey(PrimaryKey primaryKey) {
return this;
}

public Builder enableDynamicField(Boolean enableDynamicField) {
this.enableDynamicField = enableDynamicField;
return this;
}

public Builder vectorIndexes(List<VectorIndex> vectorIndexes) {
this.vectorIndexes.addAll(vectorIndexes);
return this;
}

public Builder constraintKey(ConstraintKey constraintKey) {
this.constraintKeys.add(constraintKey);
return this;
Expand All @@ -94,7 +112,8 @@ public Builder constraintKey(List<ConstraintKey> constraintKeys) {
}

public TableSchema build() {
return new TableSchema(columns, primaryKey, constraintKeys);
return new TableSchema(
columns, primaryKey, constraintKeys, vectorIndexes, enableDynamicField);
}
}

Expand All @@ -106,6 +125,8 @@ public TableSchema copy() {
.constraintKey(copyConstraintKeys)
.columns(copyColumns)
.primaryKey(primaryKey == null ? null : primaryKey.copy())
.enableDynamicField(enableDynamicField)
.vectorIndexes(vectorIndexes)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.catalog;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@AllArgsConstructor
@Builder
@NoArgsConstructor
@Data
public class VectorIndex implements Serializable {

private String indexName;

private String fieldName;

private String indexType;

private String metricType;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public enum SqlType {
DATE,
TIME,
TIMESTAMP,
BINARY_VECTOR,
FLOAT_VECTOR,
FLOAT16_VECTOR,
BFLOAT16_VECTOR,
SPARSE_FLOAT_VECTOR,
ROW,
MULTIPLE_ROW;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.type;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;

public class VectorType<T> implements SeaTunnelDataType<T> {
private static final long serialVersionUID = 2L;

public static final VectorType<Float> VECTOR_FLOAT_TYPE =
new VectorType<>(Float.class, SqlType.FLOAT_VECTOR);

public static final VectorType<Map> VECTOR_SPARSE_FLOAT_TYPE =
new VectorType<>(Map.class, SqlType.SPARSE_FLOAT_VECTOR);

public static final VectorType<Byte> VECTOR_BINARY_TYPE =
new VectorType<>(Byte.class, SqlType.BINARY_VECTOR);

public static final VectorType<ByteBuffer> VECTOR_FLOAT16_TYPE =
new VectorType<>(ByteBuffer.class, SqlType.FLOAT16_VECTOR);

public static final VectorType<ByteBuffer> VECTOR_BFLOAT16_TYPE =
new VectorType<>(ByteBuffer.class, SqlType.BFLOAT16_VECTOR);

// --------------------------------------------------------------------------------------------

/** The physical type class. */
private final Class<T> typeClass;

private final SqlType sqlType;

protected VectorType(Class<T> typeClass, SqlType sqlType) {
this.typeClass = typeClass;
this.sqlType = sqlType;
}

@Override
public Class<T> getTypeClass() {
return this.typeClass;
}

@Override
public SqlType getSqlType() {
return this.sqlType;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof VectorType)) {
return false;
}
VectorType<?> that = (VectorType<?>) obj;
return Objects.equals(typeClass, that.typeClass) && Objects.equals(sqlType, that.sqlType);
}

@Override
public int hashCode() {
return Objects.hash(typeClass, sqlType);
}

@Override
public String toString() {
return sqlType.toString();
}
}
60 changes: 60 additions & 0 deletions seatunnel-connectors-v2/connector-milvus/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-connectors-v2</artifactId>
<version>${revision}</version>
</parent>

<artifactId>connector-milvus</artifactId>
<name>SeaTunnel : Connectors V2 : Milvus</name>

<dependencies>
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>2.4.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>4.11.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>4.11.0</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Loading

0 comments on commit 839437e

Please sign in to comment.