-
Notifications
You must be signed in to change notification settings - Fork 0
/
logical_replicator.go
112 lines (90 loc) · 2.84 KB
/
logical_replicator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// ==============================================================================
// Author : Mohamed Tanveer ([email protected])
// Description : script to replicate ddl changes from source to target in postgresl
// ==============================================================================
package main
import (
"database/sql"
"flag"
"fmt"
"log"
_ "github.com/lib/pq"
)
func main() {
// arguments passed in command line - source
sHost := flag.String("shost", "", "mention the host name/ip")
sPort := flag.Int("sport", 5432, "mention the port")
sDatabase := flag.String("sdatabase", "", "mention the database name")
sUser := flag.String("suser", "flyway_prod", "mention the user name")
// arguments passed in command line - target
tHost := flag.String("thost", "", "mention the host name/ip")
tPort := flag.Int("tport", 5432, "mention the port")
tDatabase := flag.String("tdatabase", "", "mention the database name")
tUser := flag.String("tuser", "flyway_prod", "mention the user name")
// parse the arguments passed in the command line
flag.Parse()
// Creating the connection string
sPsqlInfo := fmt.Sprintf("host=%s port=%d dbname=%s user=%s sslmode=require", *sHost, *sPort, *sDatabase, *sUser)
tPsqlInfo := fmt.Sprintf("host=%s port=%d dbname=%s user=%s sslmode=require", *tHost, *tPort, *tDatabase, *tUser)
// validates the connection to our database
sDb, err := sql.Open("postgres", sPsqlInfo)
// error out if the output is not null
if err != nil {
panic(err)
}
defer sDb.Close()
// test connection
err = sDb.Ping()
// error out if the output is not null
if err != nil {
panic(err)
}
// print message
// fmt.Println("Source Connection Successful!")
// validates the connection to our database
tDb, err := sql.Open("postgres", tPsqlInfo)
// error out if the output is not null
if err != nil {
panic(err)
}
defer tDb.Close()
// test connection
err = tDb.Ping()
// error out if the output is not null
if err != nil {
panic(err)
}
// print message
// fmt.Println("Target Connection Successful!")
// insert data into the table and return the serial id
sqlStatement := `select id,query from admin.log_ddl_publication_tables`
// exceute the insert query and store the id value in the serialId variable
rows, err := sDb.Query(sqlStatement)
if err != nil {
log.Fatal(err)
}
defer rows.Close()
for rows.Next() {
var (
id, query string
)
// scan copies the value to the query var
if err := rows.Scan(&id, &query); err != nil {
log.Fatal(err)
}
// fmt.Println(query)
// execute the ddl query in the target
_, err := tDb.Query(query)
if err != nil {
log.Fatal(err)
}
// delete the row from the source
sqlStatement = `DELETE FROM admin.log_ddl_publication_tables WHERE id = $1`
_, err = sDb.Query(sqlStatement, id)
if err != nil {
log.Fatal(err)
}
}
sDb.Close()
tDb.Close()
}