go-etl is primarily an offline data synchronization framework, structured as follows:
readerPlugin(reader) —> Framework(Exchanger+Transformer) -> writerPlugin(riter)
It is built using a Framework + plugin architecture. Data source reading and writing are abstracted into Reader/Writer plugins and integrated into the overall synchronization framework.
- Reader: The Reader is the data acquisition module, responsible for collecting data from the data source and sending it to the Framework.
- Writer: The Writer is the data writing module, responsible for continuously fetching data from the Framework and writing it to the destination.
- Framework: The Framework connects the reader and writer, serving as a data transmission channel between them, and handles core technical issues such as buffering, flow control, concurrency, and data transformation.
A single data synchronization job completed by go-etl is called a Job. When go-etl receives a Job, it starts a process to complete the entire job synchronization process.
The go-etl Job module is the central management node for a single job, responsible for data cleanup, sub-task splitting (converting a single job calculation into multiple sub-Tasks), TaskGroup management, and other functions.
JOB--split--+-- task1--+ +--taskGroup1--+
|-- task2--| |--taskGroup2--|
|-- task3--|-schedule--|--taskGroup3--|
| ...... | | ...... |
|-- taskN--| |--taskGroupM--|
|
+------+---------------------------------------+
| Reader1->Exchanger1(Transformer)->Writer1 |
| Reader2->Exchanger2(Transformer)->Writer2 |
| Reader3->Exchanger3(Transformer)->Writer3 |
| ... ...... ... |
| ReaderN->ExchangerX(Transformer)->WriterX |
+----------------------------------------------+
As shown above, after the go-etl Job starts, it splits the Job into multiple smaller Tasks (sub-tasks) based on different source-side splitting strategies to facilitate concurrent execution. A Task is the smallest unit of a go-etl job, and each Task is responsible for synchronizing a portion of the data. After splitting into multiple Tasks, the go-etl Job calls the Scheduler module. Based on the configured concurrent data volume, the split Tasks are recombined into TaskGroups. The number of Tasks and TaskGroups can be different (N:M). Each TaskGroup is responsible for running all allocated Tasks concurrently with a certain concurrency. The default concurrency for a single TaskGroup is 4. Each Task is started by a TaskGroup. Once a Task starts, it fixes the thread for Reader—>Channel—>Writer to complete the task synchronization work.
When a go-etl job is running, the Job monitors and waits for multiple TaskGroup modules to complete their tasks. Once all TaskGroup tasks are completed, the Job exits successfully. Otherwise, it exits abnormally with a non-zero exit value.
For example, a user submits a go-etl job with 20 concurrencies configured, aiming to synchronize data from 100 MySQL sharded tables to ODPS. The scheduling decision-making process of go-etl is as follows: The go-etl Job splits into 100 Tasks based on the sharding of tables. Based on 20 concurrencies, go-etl calculates that a total of 4 TaskGroups are needed. The 4 TaskGroups evenly distribute the 100 split Tasks, and each TaskGroup is responsible for running 25 Tasks with 5 concurrencies.
- Job: A Job is go-etl's description of a synchronization job from a source to a destination. It is the smallest business unit for go-etl data synchronization. For example, synchronizing from a MySQL table to a specific partition of an ODPS table.
- Task: A Task is the smallest execution unit obtained by maximizing the split of a Job. For example, reading a MySQL sharded table with 1024 sharded tables can be split into 1024 read Tasks and executed with several concurrencies.
- TaskGroup: Describes a set of Task collections. A collection of Tasks executed under the same TaskGroupContainer is called a TaskGroup.
- JobContainer: The Job executor, responsible for global Job splitting, scheduling, pre-statements, post-statements, and other work units. Similar to JobTracker in Yarn.
- TaskGroupContainer: The TaskGroup executor, responsible for executing a set of Tasks. Similar to TaskTracker in Yarn.
The Reader needs to implement the following interfaces:
The Job combines *plugin.BaseJob and implements the following methods:
Init(ctx context.Context) (err error)
Destroy(ctx context.Context) (err error)
Split(ctx context.Context, number int) ([]*config.JSON, error)
Prepare(ctx context.Context) error // Default empty method
Post(ctx context.Context) error // Default empty method
Init
: Initializes the Job object. At this point, the configuration related to this plugin can be obtained throughPluginJobConf()
. The Reader plugin obtains thereader
part of the configuration.Prepare
: Global preparation work.Split
: Splits theTask
. The parameternumber
suggests the number of splits, which is generally the concurrency configured during runtime. The return value is a list ofTask
configurations.Post
: Global post-processing work.Destroy
: Destroys the Job object itself.
The Task combines *plugin.BaseTask and implements the following methods:
Init(ctx context.Context) (err error)
Destroy(ctx context.Context) (err error)
StartRead(ctx context.Context, sender plugin.RecordSender) error
Prepare(ctx context.Context) error // Default empty method
Post(ctx context.Context) error // Default empty method
Init
: Initializes the Task object. At this point, the configuration related to thisTask
can be obtained throughPluginJobConf()
. The configuration here is one of the configuration lists returned by theSplit
method of theJob
.Prepare
: Local preparation work.StartRead
: Reads data from the data source and writes it toRecordSender
.RecordSender
writes the data to the cache queue connecting Reader and Writer.Post
: Local post-processing work.Destroy
: Destroys the Task object itself.
Job() reader.Job
Task() reader.Task
Job
: Gets an instance of the aforementioned Job.Task
: Gets an instance of the aforementioned Task.
cd tools/datax/plugin
# Adds a new Reader named Mysql. The -p command can be in any case and is used to specify the name of the Reader. If -d is added, it means the original template will be deleted.
go run main.go -t reader -p Mysql
This command automatically generates the following Mysql Reader template in datax/plugin/reader to assist in development:
reader---mysql--+-----resources--+--plugin.json
|--job.go |--plugin_job_template.json
|--reader.go
|--README.md
|--task.go
As shown below, don't forget to add the developer's name and description in plugin.json:
{
"name" : "mysqlreader",
"developer":"Breeze0806",
"description":"use github.com/go-sql-driver/mysql."
}
Additionally, this helps developers avoid compilation errors after using the plugin registration command.
If you want to help implement a data source for a relational database, following these guidelines will make the implementation of your data source more convenient.
Refer to the Database Storage Developer Guide. This will not only assist you in implementing the Reader plugin interface more quickly but also aid in the implementation of the Writer plugin interface.
The dbms reader abstracts the DBWrapper structure of database storage into a Querier as follows and then utilizes the Querier to implement Job and Task functionalities.
// Querier Interface
type Querier interface {
// Obtains a specific table based on basic table information
Table(*database.BaseTable) database.Table
// Checks connectivity
PingContext(ctx context.Context) error
// Queries using a query statement
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
// Obtains a specific table based on parameters
FetchTableWithParam(ctx context.Context, param database.Parameter) (database.Table, error)
// Retrieves records using parameters and a handler
FetchRecord(ctx context.Context, param database.Parameter, handler database.FetchHandler) error
// Retrieves records using parameters, a handler, and a transaction
FetchRecordWithTx(ctx context.Context, param database.Parameter, handler database.FetchHandler) error
// Closes resources
Close() error
}
For implementing Job and Reader in the context of MySQL, the Task requires the use of the dbms.StartRead
function to implement the StartRead
method.
Refer to the Two-dimensional Table File Stream Storage Developer Guide. This will assist you in implementing both the Reader and Writer plugin interfaces more quickly.
For Tasks and Readers like CSV, independent implementation of Job is required, specifically implementing the Split
method for splitting and the Init
method for initialization.
Writers need to implement the following interfaces:
The Job combines *plugin.BaseJob
and implements the following methods:
Init(ctx context.Context) (err error)
Destroy(ctx context.Context) (err error)
Split(ctx context.Context, number int) ([]*config.JSON, error)
Prepare(ctx context.Context) error // Default empty method
Post(ctx context.Context) error // Default empty method
Init
: Initializes the Job object. At this point, plugin-related configurations can be obtained throughPluginJobConf()
. The writer section is obtained for the write plugin.Prepare
: Performs global preparation work.Split
: Splits the Task. The parameternumber
suggests the number of splits, generally the configured concurrency level during runtime. The return value is a list of Task configurations.Post
: Performs global post-processing work.Destroy
: Performs destruction work for the Job object itself.
The Task combines *plugin.BaseTask
and implements the following methods:
Init(ctx context.Context) (err error)
Destroy(ctx context.Context) (err error)
StartWrite(ctx context.Context, receiver plugin.RecordReceiver) error
Prepare(ctx context.Context) error // Default empty method
Post(ctx context.Context) error // Default empty method
SupportFailOver() bool // Default empty method
Init
: Initializes the Task object. At this point, the configuration related to this Task can be obtained throughPluginJobConf()
. The configuration here is one of the configuration lists returned by the Job'ssplit
method.Prepare
: Performs local preparation work.StartWrite
: Reads data from theRecordReceiver
and writes it to the target data source. The data in theRecordReceiver
comes from the cache queue between the Reader and Writer.Post
: Performs local post-processing work for the Task.Destroy
: Performs destruction work for the Task itself.SupportFailOver
: Indicates whether the Task supports failover.
Job() writer.Job
Task() writer.Task
Job
: Obtains an instance of the aforementioned Job.Task
: Obtains an instance of the aforementioned Task.
cd tools/go-etl/plugin
# Adds a new writer named Mysql. The -p command can be in any case and is used to specify the name of the writer. If -d is added, it will delete the original template.
go run main.go -t writer -p Mysql
This command automatically generates the following template for a mysql writer in datax/plugin/writer
to assist in development:
writer--mysql---+-----resources--+--plugin.json
|--job.go |--plugin_job_template.json
|--README.md
|--task.go
|--writer.go
Additionally, don't forget to add the developer's name and description to plugin.json
as follows:
{
"name" : "mysqlwriter",
"developer":"Breeze0806",
"description":"Uses github.com/go-sql-driver/mysql. The database/sql DB executes select SQL and retrieves data from the ResultSet. Warning: The more you know about the database, the fewer problems you will encounter."
}
This helps developers avoid compilation errors after using the plugin registration command.
If you want to help implement a data source for a database, following these guidelines will make the implementation of your data source more convenient. However, it is essential that the driver library you use implements the database/sql
interface of the Golang standard library.
Refer to the Database Storage Developer Guide. This will assist you in implementing both the Reader and Writer plugin interfaces more quickly.
The dbms writer abstracts the DBWrapper structure of database storage into an Execer as follows and then utilizes the Execer to implement Job and Task functionalities.
// Execer Interface
type Execer interface {
// Obtains a specific table based on basic table information
Table(*database.BaseTable) database.Table
// Checks connectivity
PingContext(ctx context.Context) error
// Queries using a query statement
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
// Executes a query statement
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
// Obtains a specific table based on parameters
FetchTableWithParam(ctx context.Context, param database.Parameter) (database.Table, error)
// Performs batch execution
BatchExec(ctx context.Context, opts *database.ParameterOptions) error
// Performs batch execution using prepare/exec
BatchExecStmt(ctx context.Context, opts *database.ParameterOptions) error
// Performs batch execution within a transaction
BatchExecWithTx(ctx context.Context, opts *database.ParameterOptions) error
// Performs batch execution using prepare/exec within a transaction
BatchExecStmtWithTx(ctx context.Context, opts *database.ParameterOptions) error
// Closes the connection
Close() error
}
For implementing Job and Writer in the context of MySQL, the Task requires the use of the dbms.StartWrite
function to implement the StartWrite
method.
Refer to the Two-dimensional Table File Stream Storage Developer Guide. This will assist you in implementing both the Reader and Writer plugin interfaces more quickly.
For Tasks and Writers like CSV, independent implementation of Job is required, specifically implementing the Split
method for splitting and the Init
method for initialization.
go-etl
uses json
as the format for its configuration files. A typical go-etl
task configuration looks like this:
{
"core" : {
"container": {
"job":{
"id": 1,
"sleepInterval":100
},
"taskGroup":{
"id": 1,
"failover":{
"retryIntervalInMsec":0
}
}
},
"transport":{
"channel":{
"speed":{
"byte": 100,
"record":100
}
}
}
},
"job":{
"content":[
{
"reader":{
"name": "csvreader",
"parameter": {
"path":["d:\\a.txt"],
"column":[
{
"index":"1",
"type":"time",
"format":"yyyy-MM-dd"
}
],
"encoding":"utf-8",
"delimiter":","
}
},
"writer":{
"name": "postgreswriter",
"parameter": {
"username": "postgres",
"password": "123456",
"writeMode": "copyIn",
"column": ["*"],
"preSql": [],
"connection": {
"url": "postgres://192.168.15.130:5432/postgres?sslmode=disable&connect_timeout=2",
"table": {
"db":"postgres",
"schema":"public",
"name":"cvs"
}
},
"batchTimeout": "1s",
"batchSize":1000
}
},
"transformer":[]
}
],
"setting":{
"speed":{
"byte":3000,
"record":400,
"channel":4
}
}
}
}
In the task configuration, the value
part of job.content.reader.parameter
is passed to Reader.Job
, and the value
part of job.content.writer.parameter
is passed to Writer.Job
. Both Reader.Job
and Writer.Job
can access these values through super.getPluginJobConf()
.
Designing the configuration file is the first step in plugin development!
The parameter
sections under reader
and writer
in the task configuration are the configuration parameters for the plugins. These configuration parameters should follow these principles:
-
Camel Case Naming: All configuration items should use camel case naming, with the first letter lowercase and the first letter of each word capitalized.
-
Orthogonality: Configuration items must be orthogonal, with no overlapping functionality and no hidden rules.
-
Rich Types: Reasonably use JSON types to reduce unnecessary processing logic and potential for errors.
- Use the correct data type. For example, use
true
/false
for bool type values, not"yes"
/"true"
/0
, etc. - Reasonably use collection types. For example, use arrays instead of delimited strings.
- Use the correct data type. For example, use
-
Similar and Universal: Follow the conventions of the same type of plugins. For example, the
connection
parameter for databases typically has the following structure:{ "connection": { "url": "tcp(192.168.0.1:3306)/mysql?parseTime=false", "table": { "db":"source", "name":"type_table" } } }
{
"a":{
"b":[{
"c":"x"
}]
}
}
To access the string "x" in GetConfig
, the path would be a, a.b, a.b.0, a.b.0.c.
Note that because plugins only see a portion of the overall configuration, when using the json.Config
object, it is important to be aware of the current root path.
For more operations with json.Config
, please refer to the documentation of the config
package.
After developing a feature and before submitting it, please run the following command to automatically add a license and format the code using gofmt -s -w
:
go run tools/license/main.go
Before compiling with Golang, plugins need to be registered within the codebase.
Due to Golang's static compilation, the go-etl framework cannot dynamically load plugins at runtime. Therefore, plugins developed by developers, specifically readers and writers, need to be registered via generated code. The following command facilitates this:
go generate ./...
The main principle is to generate plugin.go
files from the plugin.json
resources found in the corresponding go-etl/plugin directory for readers and writers. Additionally, a plugin.go
file is generated in the go-etl directory to import these plugins. This process is implemented in tools/go-etl/build
. Optionally, the -i
command can be used to ignore compiling certain data sources, such as DB2, which uses ODBC for database access and requires additional Linux dependencies.
Similar to the typical "producer-consumer" pattern, data transfer between the Reader
and Writer
plugins occurs through channels
. These channels can be in-memory or persistent, and plugins do not need to concern themselves with the implementation details. Plugins write data to the channel using RecordSender
and read data from the channel using RecordReceiver
.
A single item in the channel is a Record
object, which can hold multiple Column
objects. This can be simply understood as a record and its columns in a database. For more details on the Record
prototype, refer to the "Records" chapter in the documentation.
Since Record
is an interface, the Reader
plugin first calls RecordSender.createRecord()
to create a Record
instance and then adds Column
objects to it.
The Writer
plugin calls RecordReceiver.getFromReader()
to retrieve a Record
and then iterates over the Column
objects to write them to the target storage. While the Reader
is still active and transmission is ongoing, if there is no data currently available, RecordReceiver.getFromReader()
will block until data becomes available. Once transmission has ended, it will return ErrTerminate
, allowing the Writer
plugin to determine when to end its startWrite
method.
To standardize data type conversion operations between the source and destination, and to ensure data fidelity, go-etl supports six internal data types. For more details, refer to the "Data Type Conversion" chapter in the documentation.
Include the following chapters in the plugin's README.md documentation:
- Quick Introduction: Describes the plugin's use cases and features.
- Implementation Principles: Explains the underlying principles of the plugin's implementation.
- Configuration Instructions:
- Provides a sample JSON configuration file for a typical synchronization task.
- Describes the meaning, requirement, default value, range, and other constraints of each parameter.
- Type Conversion:
- Explains how the plugin converts between the actual storage type and go-etl's internal type.
- Mentions any special handling.
- Performance Report:
- Details the hardware and software environment, system version, Java version, CPU, memory, etc.
- Describes the data characteristics, such as record size.
- Lists the test parameter sets, system parameters (e.g., concurrency), and plugin parameters (e.g., batchSize).
- Provides synchronization speeds (Rec/s, MB/s), machine load (load, cpu), and the impact on the data source (load, cpu, mem) for different parameters.
- Constraints and Limitations: Mentions any additional usage restrictions.
- FAQ: Addresses commonly asked questions by users.
- golang 1.20 or higher.
make dependencies
make release
Before compiling, set the IGNORE_PACKAGES
environment variable to db2
:
export IGNORE_PACKAGES=db2
make dependencies
make release
- A MinGW-w64 environment with GCC 7.2.0 or higher is required for compilation.
- golang 1.20 or higher.
- The minimum supported compilation environment is Windows 7.
release.bat
Before compiling, set the IGNORE_PACKAGES
environment variable to db2
:
set IGNORE_PACKAGES=db2
release.bat
+---datax---|---plugin---+---reader--mysql---|--README.md
| | .......
| |
| |---writer--mysql---|--README.md
| | .......
|
+---bin----datax
+---examples---+---csvpostgres----config.json
| |---db2------------config.json
| | .......
|
+---README_USER.md
- The
datax/plugin
directory contains documentation for each plugin. - The
bin
directory contains the data synchronization programdatax
. - The
examples
directory contains configuration files for various data synchronization scenarios. README_USER.md
is the user manual.
datax -http :8443 -c examples/limit/config.json
Use a web browser to access http://127.0.0.1:8443/debug/pprof
to retrieve debug information.
/debug/pprof/
Types of profiles available:
Count Profile
19 allocs
0 block
0 cmdline
18 goroutine
19 heap
0 mutex
0 profile
10 threadcreate
0 trace
full goroutine stack dump
Profile Descriptions:
allocs: A sampling of all past memory allocations
block: Stack traces that led to blocking on synchronization primitives
cmdline: The command line invocation of the current program
goroutine: Stack traces of all current goroutines
heap: A sampling of memory allocations of live objects. You can specify the gc GET parameter to run GC before taking the heap sample.
mutex: Stack traces of holders of contended mutexes
profile: CPU profile. You can specify the duration in the seconds GET parameter. After you get the profile file, use the go tool pprof command to investigate the profile.
threadcreate: Stack traces that led to the creation of new OS threads
trace: A trace of execution of the current program. You can specify the duration in the seconds GET parameter. After you get the trace file, use the go tool trace command to investigate the trace.