Fork me on GitHub

daydayup863

人生就像一杯茶,不会苦一辈子,但总会苦一阵子。

0%

Debezium from PostgreSQL to Kafka

本文主要介绍使用Debezium从Postgresql解析WAL到Kafka。

PostgreSQL中安装decoderbufs

编译安装protobuf

需要2.6以上版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
[root@db1 /export]# wget https://github.com/google/protobuf/releases/download/v2.6.1/protobuf-2.6.1.tar.gz
[root@db1 /export]# tar -zxvf protobuf-2.6.1.tar.gz
[root@db1 /export]# cd protobuf-2.6.1
[root@db1 /export/protobuf-2.6.1]# ./configure
checking whether to enable maintainer-specific portions of Makefiles... yes
checking build system type... x86_64-unknown-linux-gnu
checking host system type... x86_64-unknown-linux-gnu
checking target system type... x86_64-unknown-linux-gnu
checking for a BSD-compatible install... /usr/bin/install -c
checking whether build environment is sane... yes
...
...
configure: creating ./config.status
config.status: creating Makefile
config.status: creating scripts/gtest-config
config.status: creating build-aux/config.h
config.status: build-aux/config.h is unchanged
config.status: executing depfiles commands
config.status: executing libtool commands
[root@db1 /export/protobuf-2.6.1]# make && make install
[root@l-pghadb4.pgdba.cn6 /export/protobuf-2.6.1]# make && make install
make all-recursive
make[1]: Entering directory `/export/protobuf-2.6.1'
Making all in .
make[2]: Entering directory `/export/protobuf-2.6.1'
make[2]: Leaving directory `/export/protobuf-2.6.1'
Making all in src
...
...
/usr/bin/mkdir -p '/usr/local/include/google/protobuf/io'
/usr/bin/install -c -m 644 google/protobuf/io/coded_stream.h google/protobuf/io/gzip_stream.h google/protobuf/io/printer.h google/protobuf/io/strtod.h google/protobuf/io/tokenizer.h google/protobuf/io/zero_copy_stream.h google/protobuf/io/zero_copy_stream_impl.h google/protobuf/io/zero_copy_stream_impl_lite.h '/usr/local/include/google/protobuf/io'
make[3]: Leaving directory `/export/protobuf-2.6.1/src'
make[2]: Leaving directory `/export/protobuf-2.6.1/src'
make[1]: Leaving directory `/export/protobuf-2.6.1/src'

编译安装protobuf-c

需要1.2以上版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
[root@db1 /export]# wget https://github.com/protobuf-c/protobuf-c/releases/download/v1.2.1/protobuf-c-1.2.1.tar.gz
[root@db1 /export]# tar -zxvf protobuf-c-1.2.1.tar.gz
protobuf-c-1.2.1/
...
protobuf-c-1.2.1/LICENSE
protobuf-c-1.2.1/Makefile.am
[root@db1 /export]# cd protobuf-c-1.2.1/
[root@db1 /export/protobuf-c-1.2.1]# export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig
[root@db1 /export/protobuf-c-1.2.1]# ./configure
checking for a BSD-compatible install... /usr/bin/install -c
checking whether build environment is sane... yes
checking for a thread-safe mkdir -p... /usr/bin/mkdir -p
checking for gawk... gawk
checking whether make sets $(MAKE)... yes
checking whether make supports nested variables... yes
...
configure: creating ./config.status
config.status: creating Makefile
config.status: creating protobuf-c/libprotobuf-c.pc
config.status: creating Doxyfile
config.status: creating config.h
config.status: executing depfiles commands
config.status: executing libtool commands

protobuf-c 1.2.1

CC: gcc -std=gnu99
CFLAGS: -g -O2
CXX: g++
CXXFLAGS: -g -O2
LDFLAGS:
LIBS:

prefix: /usr/local
sysconfdir: ${prefix}/etc
libdir: ${exec_prefix}/lib
includedir: ${prefix}/include
pkgconfigdir: ${libdir}/pkgconfig

bigendian: no
protobuf version:

[root@db1 /export/protobuf-c-1.2.1]# make
GEN t/test.pb-c.c
GEN t/test-full.pb-c.c
GEN t/test-optimized.pb-c.c
...
...
cd /usr/local/include/google/protobuf-c && rm -vf protobuf-c.h
cd /usr/local/include/google/protobuf-c && ln -s ../../protobuf-c/protobuf-c.h protobuf-c.h
make[3]: Leaving directory `/export/protobuf-c-1.2.1'
make[2]: Leaving directory `/export/protobuf-c-1.2.1'
make[1]: Leaving directory `/export/protobuf-c-1.2.1'

[root@db1 /export]# git clone https://github.com/debezium/postgres-decoderbufs.git
Cloning into 'postgres-decoderbufs'...
remote: Enumerating objects: 289, done.
remote: Counting objects: 100% (4/4), done.
remote: Compressing objects: 100% (4/4), done.
remote: Total 289 (delta 0), reused 3 (delta 0), pack-reused 285
Receiving objects: 100% (289/289), 91.64 KiB | 0 bytes/s, done.
Resolving deltas: 100% (132/132), done.

修改makefile并完成编译安装

修改Makefile中pg_config路径为/opt/pg11/bin/pg_config

1
2
3
4
5
6
7
8
9
10
[root@db1 /export/postgres-decoderbufs]# make
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -march=core2 -O2 -fPIC -std=c11 -I/usr/local/include -I/usr/local/include -I. -I./ -I/opt/pg96/include/postgresql/server -I/opt/pg96/include/postgresql/internal -D_GNU_SOURCE -I/usr/include/libxml2 -c -o src/decoderbufs.o src/decoderbufs.c
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -march=core2 -O2 -fPIC -std=c11 -I/usr/local/include -I/usr/local/include -I. -I./ -I/opt/pg96/include/postgresql/server -I/opt/pg96/include/postgresql/internal -D_GNU_SOURCE -I/usr/include/libxml2 -c -o src/proto/pg_logicaldec.pb-c.o src/proto/pg_logicaldec.pb-c.c
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -march=core2 -O2 -fPIC -shared -o decoderbufs.so src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o -L/opt/pg96/lib -Wl,--as-needed -Wl,-rpath,'/opt/pg96/lib',--enable-new-dtags -L/usr/local/lib -lprotobuf-c
[root@db1 /export/postgres-decoderbufs]# make install
/bin/mkdir -p '/opt/pg11/lib/postgresql'
/bin/mkdir -p '/opt/pg11/share/postgresql/extension'
/bin/install -c -m 755 decoderbufs.so '/opt/pg11/lib/postgresql/decoderbufs.so'
/bin/install -c -m 644 .//decoderbufs.control '/opt/pg11/share/postgresql/extension/'
[root@db1 /export/postgres-decoderbufs]# ldconfig

PostgreSQL参数修改

重启生效

1
2
3
wal_level = logical
shared_preload_libraries = 'decoderbufs, pg_stat_statements'
max_wal_senders = 5 #留够数

简单测试decoderbufs可用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
postgres=# select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
INFO: Exiting startup callback
slot_name | lsn
------------------+------------
decoderbufs_demo | 0/53000108
(1 row)

postgres=# create table test(id int);
CREATE TABLE
postgres=# select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
NOTICE: Decoderbufs DEBUG MODE is ON.
INFO: Exiting startup callback
data
-------------------------------------------------
txid[588], commit_time[1650942354911888], op[3]
txid[588], commit_time[1650942354911888], op[4]
(2 rows)

postgres=# insert into test select 1;
INSERT 0 1
postgres=#
postgres=# select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
NOTICE: Decoderbufs DEBUG MODE is ON.
INFO: Exiting startup callback
data
---------------------------------------------------------------------
txid[589], commit_time[1650942394358075], op[3]
txid[589], commit_time[1650942394358075], table[public.test], op[0]+
NEW TUPLE: +
column_name[id], column_type[23], datum[1] +
+

txid[589], commit_time[1650942394358075], op[4]
(3 rows)

postgres=# select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
NOTICE: Decoderbufs DEBUG MODE is ON.
INFO: Exiting startup callback
data
------
(0 rows)

postgres=# select * from pg_replication_slots where slot_type = 'logical';
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
------------------+-------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
decoderbufs_demo | decoderbufs | logical | 13220 | postgres | f | f | | | 590 | 0/53001670 | 0/530016A8
(1 row)

需要安装JAVA环境

需要jdk11以上, 下载地址, 安装方法自行百度,不做说明。

1
2
3
4
[root@dmsrv.com /opt/kafka_2.13-3.1.0]# java -version
openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)

PostgreSQL中创建发布

对需要的表创建发布。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
postgres=# create table test1 (
ID int primary key,
AGE int
);
CREATE TABLE
postgres=#
postgres=#
postgres=# CREATE PUBLICATION dbz_planning FOR TABLE test1;
CREATE PUBLICATION
postgres=#
postgres=# \dRp+
Publication dbz_planning
Owner | All tables | Inserts | Updates | Deletes | Truncates
----------+------------+---------+---------+---------+-----------
postgres | f | t | t | t | t
Tables:
"public.test1"

安装kafka

1
2
3
[root@dmsrv.com /opt]# wget https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz
[root@dmsrv.com /opt]# tar -zxvf kafka_2.13-3.1.0.tgz
[root@dmsrv.com /opt]# cd kafka_2.13-3.1.0

配置&&启动kafka

需要修改config/server.properties中的zookeeper.connect, zk的安装不做过多说明,请自行百度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[root@dmsrv.com /opt/kafka_2.13-3.1.0]# bin/kafka-server-start.sh config/server.properties
[2022-04-26 17:36:18,627] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2022-04-26 17:36:18,929] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2022-04-26 17:36:19,053] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2022-04-26 17:36:19,056] INFO starting (kafka.server.KafkaServer)
[2022-04-26 17:36:19,057] INFO Connecting to zookeeper on zksrv.com:2181 (kafka.server.KafkaServer)
[2022-04-26 17:36:19,073] INFO [ZooKeeperClient Kafka server] Initializing a new session to zksrv.com:2181. (kafka.zookeeper.ZooKeeperClient)
[2022-04-26 17:36:19,078] INFO Client environment:zookeeper.version=3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT (org.apache.zookeeper.ZooKeeper)
[2022-04-26 17:36:19,078] INFO Client environment:host.name=dmsrv.com (org.apache.zookeeper.ZooKeeper)
[2022-04-26 17:36:19,078] INFO Client environment:java.version=11.0.2 (org.apache.zookeeper.ZooKeeper)
[2022-04-26 17:36:19,078] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2022-04-26 17:36:19,078] INFO Client environment:java.home=/home/q/java/jdk-11.0.2 (org.apache.zookeeper.ZooKeeper)
...
...
[2022-04-26 17:36:20,807] INFO [ExpirationReaper-0-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-04-26 17:36:20,825] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2022-04-26 17:36:20,849] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Starting socket server acceptors and processors (kafka.network.SocketServer)
[2022-04-26 17:36:20,855] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
[2022-04-26 17:36:20,855] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Started socket server acceptors and processors (kafka.network.SocketServer)
[2022-04-26 17:36:20,861] INFO Kafka version: 3.1.0 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-26 17:36:20,861] INFO Kafka commitId: 37edeed0777bacb3 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-26 17:36:20,861] INFO Kafka startTimeMs: 1650965780855 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-26 17:36:20,863] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

安装debezium-connector-postgres

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[root@dmsrv.com /export]# wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.9.1.Final/debezium-connector-postgres-1.9.1.Final-plugin.tar.gz
[root@dmsrv.com /export]# tar -zxvf debezium-connector-postgres-1.9.1.Final-plugin.tar.gz
debezium-connector-postgres/CHANGELOG.md
debezium-connector-postgres/CONTRIBUTE.md
debezium-connector-postgres/COPYRIGHT.txt
debezium-connector-postgres/LICENSE-3rd-PARTIES.txt
debezium-connector-postgres/LICENSE.txt
debezium-connector-postgres/README.md
debezium-connector-postgres/README_JA.md
debezium-connector-postgres/README_ZH.md
debezium-connector-postgres/postgres.json
debezium-connector-postgres/debezium-core-1.9.1.Final.jar
debezium-connector-postgres/debezium-api-1.9.1.Final.jar
debezium-connector-postgres/guava-30.1.1-jre.jar
debezium-connector-postgres/failureaccess-1.0.1.jar
debezium-connector-postgres/postgresql-42.3.3.jar
debezium-connector-postgres/protobuf-java-3.19.2.jar
debezium-connector-postgres/debezium-connector-postgres-1.9.1.Final.jar

[root@dmsrv.com /export]# cd debezium-connector-postgres/
[root@dmsrv.com /export/debezium-connector-postgres]# cp *.jar /opt/kafka_2.13-3.1.0/libs/

配置 postgres.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[root@dmsrv.com /opt/kafka_2.13-3.1.0]# cat config/postgres.properties
name=dbz-test-connector

include.schema.changes=false

slot.name=debezium_planning
publication.name=dbz_planning

connector.class=io.debezium.connector.postgresql.PostgresConnector
tasks.max=5
plugin.name=decoderbufs

database.hostname=192.168.0.1
database.port=5432
database.user=pgdba
database.password=db2c1a5d
database.dbname=postgres
database.history.kafka.bootstrap.servers=kafka.server.com:9092
database.server.name=DBTestServer
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

启动 bin/connect-standalone.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[2022-04-26 17:38:47,013] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:68)
[2022-04-26 17:38:47,027] INFO WorkerInfo values:
jvm.args = -Xms256M, -Xmx2G, -XX:+UseG1GC, -XX:MaxGCPauseMillis=20, -XX:InitiatingHeapOccupancyPercent=35, -XX:+ExplicitGCInvokesConcurrent, -XX:MaxInlineLevel=15, -Djava.awt.headless=true, -Dcom.sun.management.jmxremote, -Dcom.sun.management.jmxremote.authenticate=false, -Dcom.sun.management.jmxremote.ssl=false, -Dkafka.logs.dir=/opt/kafka_2.13-3.1.0/bin/../logs, -Dlog4j.configuration=file:bin/../config/connect-log4j.properties
jvm.spec = Oracle Corporation, OpenJDK 64-Bit Server VM, 11.0.2, 11.0.2+9
...
...
[2022-04-26 17:39:14,360] INFO [dbz-test-connector|task-0] Retrieved latest position from stored offset 'LSN{0/77000108}' (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource:127)
[2022-04-26 17:39:14,361] INFO [dbz-test-connector|task-0] Looking for WAL restart position for last commit LSN 'LSN{0/77000108}' and last change LSN 'LSN{0/77000108}' (io.debezium.connector.postgresql.connection.WalPositionLocator:40)
[2022-04-26 17:39:14,371] INFO [dbz-test-connector|task-0] Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/76000340}, catalogXmin=685] (io.debezium.connector.postgresql.connection.PostgresConnection:251)
[2022-04-26 17:39:14,373] INFO [dbz-test-connector|task-0] Connection gracefully closed (io.debezium.jdbc.JdbcConnection:956)
[2022-04-26 17:39:14,415] INFO [dbz-test-connector|task-0] Requested thread factory for connector PostgresConnector, id = DBTestServer named = keep-alive (io.debezium.util.Threads:270)
[2022-04-26 17:39:14,416] INFO [dbz-test-connector|task-0] Creating thread debezium-postgresconnector-DBTestServer-keep-alive (io.debezium.util.Threads:287)
[2022-04-26 17:39:14,455] INFO [dbz-test-connector|task-0] REPLICA IDENTITY for 'public.test1' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns (io.debezium.connector.postgresql.PostgresSchema:103)
[2022-04-26 17:39:14,456] INFO [dbz-test-connector|task-0] REPLICA IDENTITY for 'public.decoderbufs' is 'FULL'; UPDATE AND DELETE events will contain the previous values of all the columns (io.debezium.connector.postgresql.PostgresSchema:103)
[2022-04-26 17:39:14,457] INFO [dbz-test-connector|task-0] REPLICA IDENTITY for 'public.db_cluster' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns (io.debezium.connector.postgresql.PostgresSchema:103)
[2022-04-26 17:39:14,460] INFO [dbz-test-connector|task-0] Searching for WAL resume position (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource:314)
[2022-04-26 17:39:14,489] INFO [dbz-test-connector|task-0] First LSN 'LSN{0/77000108}' received (io.debezium.connector.postgresql.connection.WalPositionLocator:60)
[2022-04-26 17:39:14,500] INFO [dbz-test-connector|task-0] LSN after last stored change LSN 'LSN{0/770001B8}' received (io.debezium.connector.postgresql.connection.WalPositionLocator:71)
[2022-04-26 17:39:14,500] INFO [dbz-test-connector|task-0] WAL resume position 'LSN{0/770001B8}' discovered (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource:335)
[2022-04-26 17:39:14,501] INFO [dbz-test-connector|task-0] Connection gracefully closed (io.debezium.jdbc.JdbcConnection:956)

模拟消费

1
2
3
[root@dmsrv.com /opt/kafka_2.13-3.1.0]# bin/kafka-console-consumer.sh --bootstrap-server dmsrv.com:9092 --topic DBTestServer.public.test1 --from-beginning --partition 0
{"before":null,"after":{"id":1,"age":1},"source":{"version":"1.9.1.Final","connector":"postgresql","name":"DBTestServer","ts_ms":1650956299891,"snapshot":"true","db":"postgres","sequence":"[null,\"1728053888\"]","schema":"public","table":"test1","txId":643,"lsn":1728053888,"xmin":null},"op":"r","ts_ms":1650956299891,"transaction":null}
{"before":null,"after":{"id":2,"age":1},"source":{"version":"1.9.1.Final","connector":"postgresql","name":"DBTestServer","ts_ms":1650956299892,"snapshot":"true","db":"postgres","sequence":"[null,\"1728053888\"]","schema":"public","table":"test1","txId":643,"lsn":1728053888,"xmin":null},"op":"r","ts_ms":1650956299892,"transaction":null}

其它

查看有哪些topics

1
[root@dmsrv.com /opt/kafka_2.13-3.1.0]# bin/kafka-topics.sh --list  --bootstrap-server dmsrv.com:9092

查看表对应的topic和partitions

如日志所示,DBTestServer.public.test1-0(对应postgres.properties配置中的database.server.name + schema+relname)为topic, 0 为partition号,模拟消费中需要使用

1
[2022-04-26 16:48:11,005] INFO [Partition DBTestServer.public.test1-0 broker=0] Log loaded for partition DBTestServer.public.test1-0 with initial high watermark 46 (kafka.cluster.Partition)

看connector状态

1
curl -s localhost:8083/connectors/dbz-test-connector/status
-------------本文结束感谢您的阅读-------------
听说,打赏我的人都找到了真爱

欢迎关注我的其它发布渠道