- PostgreSQL Logical Replication client for node.js
- Supported plugins
- pgoutput (Native to PostgreSQL, Recommended)
- Use the pgoutput plugin to process huge transactions.
- wal2json
- decoderbufs
- test_decoding (Not recommended)
- pgoutput (Native to PostgreSQL, Recommended)
- Document for old version(1.x)
- pg-logical-replication depends on pq(node-postgres) >= 6.2.2 and eventemitter2
$ npm install pg-logical-replication
- This is an example using
wal2json
. A replication slot(test_slot_wal2json
) must be created on the PostgreSQL server.SELECT * FROM pg_create_logical_replication_slot('test_slot_wal2json', 'wal2json')
const slotName = 'test_slot_wal2json';
const service = new LogicalReplicationService(
/**
* node-postgres Client options for connection
* https://github.com/DefinitelyTyped/DefinitelyTyped/blob/master/types/pg/index.d.ts#L16
*/
{
database: 'playground',
// ...
},
/**
* Logical replication service config
* https://github.com/kibae/pg-logical-replication/blob/main/src/logical-replication-service.ts#L9
*/
{
acknowledge: {
auto: true,
timeoutSeconds: 10
}
}
)
// `TestDecodingPlugin` for test_decoding and `ProtocolBuffersPlugin` for decoderbufs are also available.
const plugin = new Wal2JsonPlugin({
/**
* Plugin options for wal2json
* https://github.com/kibae/pg-logical-replication/blob/main/src/output-plugins/wal2json/wal2json-plugin-options.type.ts
*/
//...
});
/**
* Wal2Json.Output
* https://github.com/kibae/pg-logical-replication/blob/ts-main/src/output-plugins/wal2json/wal2json-plugin-output.type.ts
*/
service.on('data', (lsn: string, log: Wal2Json.Output) => {
// Do something what you want.
// log.change.filter((change) => change.kind === 'insert').length;
});
// Start subscribing to data change events.
(function proc() {
service.subscribe(plugin, slotName)
.catch((e) => {
console.error(e);
})
.then(() => {
setTimeout(proc, 100);
});
})();
const service = new LogicalReplicationService(
/**
* node-postgres Client options for connection
* https://github.com/DefinitelyTyped/DefinitelyTyped/blob/master/types/pg/index.d.ts#L16
*/
clientConfig: {
user?: string | undefined;
database?: string | undefined;
password?: string | (() => string | Promise<string>) | undefined;
port?: number | undefined;
host?: string | undefined;
connectionString?: string | undefined;
keepAlive?: boolean | undefined;
stream?: stream.Duplex | undefined;
statement_timeout?: false | number | undefined;
parseInputDatesAsUTC?: boolean | undefined;
ssl?: boolean | ConnectionOptions | undefined;
query_timeout?: number | undefined;
keepAliveInitialDelayMillis?: number | undefined;
idle_in_transaction_session_timeout?: number | undefined;
application_name?: string | undefined;
connectionTimeoutMillis?: number | undefined;
types?: CustomTypesConfig | undefined;
options?: string | undefined;
},
/**
* Logical replication service config
* https://github.com/kibae/pg-logical-replication/blob/main/src/logical-replication-service.ts#L9
*/
config?: Partial<{
acknowledge?: {
/**
* If the value is false, acknowledge must be done manually.
* Default: true
*/
auto: boolean;
/**
* Acknowledge is performed every set time (sec). If 0, do not do it.
* Default: 10
*/
timeoutSeconds: 0 | 10 | number;
};
}>
)
- Receive changes from the server.
plugin
output plugins.slotName
Logical replication slot name. You can create slot via pg_create_logical_replication_slot function.uptoLsn
(optional) The starting point of the data to be streamed.
- After processing the data, it signals the PostgreSQL server that it is OK to clear the WAL log.
- Usually this is done automatically.
- Manually use only when
new LogicalReplicationService({}, {acknowledge: {auto: false}})
.
on(event: 'start', listener: () => Promise<void> | void)
- Emitted when start replication.
on(event: 'data', listener: (lsn: string, log: any) => Promise<void> | void)
- Emitted when PostgreSQL data changes. The log value type varies depending on the plugin.
on(event: 'error', listener: (err: Error) => void)
on(event: 'acknowledge', listener: (lsn: string) => Promise<void> | void)
- Emitted when acknowledging automatically.
on(event: 'heartbeat', listener: (lsn: string, timestamp: number, shouldRespond: boolean) => Promise<void> | void)
- A heartbeat check signal has been received from the server. You may need to run
service.acknowledge()
.
- A heartbeat check signal has been received from the server. You may need to run
stop(): Promise<this>
- Terminate the server's connection and stop replication.
isStop(): boolean
- Returns false when replication starts from the server.
lastLsn(): string
- Returns the last LSN(Log Sequence Number) received from the server.
4-1. PgoutputPlugin
for pgoutput (Native to PostgreSQL)
- Use the pgoutput plugin to process large-scale transactions.