利用PostgreSQL逻辑复制和触发器实现PostgreSQL双主复制
准备 实例信息 主实例1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 pg13 =# \conninfo You are connected to database "pg13" as user "postgres" via socket in "/tmp" at port "5557" .pg13 =# show wal_level ; wal_level ----------- logical (1 row)pg13 =# \du postgres List of roles Role name | Attributes | Member of -----------+------------+----------- postgres | Superuser | {}pg13 =#
主实例2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 postgres =# \conninfo You are connected to database "postgres" as user "postgres" via socket in "/tmp" at port "5556" .postgres =# show wal_level ; wal_level ----------- logical (1 row)postgres =# \du postgres List of roles Role name | Attributes | Member of -----------+------------------------------------------------------------+----------- postgres | Superuser, Create role, Create DB, Replication, Bypass RLS | {}postgres =#
创建测试表 主实例1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 pg13 =# create table test_logical_repl(id serial primary key, islocal bool, create_time timestamptz default clock_timestamp(), name text, update_time timestamptz; CREATE TABLEpg13 =# \d+ test_logical_repl Table "public.test_logical_repl" Column | Type | Collation | Nullable | Default | Storage | Stats target | Description -------------+--------------------------+-----------+----------+-----------------------------------------------+----------+--------------+------------- id | integer | | not null | nextval('test_logical_repl_id_seq' ::regclass) | plain | | islocal | boolean | | | | plain | | create_time | timestamp with time zone | | | clock_timestamp() | plain | | name | text | | | | extended | | update_time | timestamp with time zone | | | | plain | | Indexes: "test_logical_repl_pkey" PRIMARY KEY, btree (id) Access method: heappg13 =# alter sequence test_logical_repl_id_seq increment 4; ALTER SEQUENCEpg13 =# select setval('test_logical_repl_id_seq' , 1) pg13-# ; setval -------- 1 (1 row)pg13 =# \d+ test_logical_repl_id_seq Sequence "public.test_logical_repl_id_seq" Type | Start | Minimum | Maximum | Increment | Cycles? | Cache ---------+-------+---------+------------+-----------+---------+------- integer | 1 | 1 | 2147483647 | 4 | no | 1 Owned by: public.test_logical_repl.idpg13 =# select nextval('test_logical_repl_id_seq' ); nextval --------- 5 (1 row)pg13 =# select nextval('test_logical_repl_id_seq' ); nextval --------- 9 (1 row)pg13 =#
主实例2 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 postgres =# create table test_logical_repl(id serial primary key, islocal bool, create_time timestamptz default clock_timestamp(), name text, update_time timestamptz; CREATE TABLEpostgres =#postgres =# \d+ test_logical_repl Table "public.test_logical_repl" Column | Type | Collation | Nullable | Default | Storage | Stats target | Description -------------+--------------------------+-----------+----------+-----------------------------------------------+----------+--------------+------------- id | integer | | not null | nextval('test_logical_repl_id_seq' ::regclass) | plain | | islocal | boolean | | | | plain | | create_time | timestamp with time zone | | | clock_timestamp() | plain | | name | text | | | | extended | | update_time | timestamp with time zone | | | | plain | | Indexes: "test_logical_repl_pkey" PRIMARY KEY, btree (id) Access method: heappostgres =# alter sequence test_logical_repl_id_seq increment 4; ALTER SEQUENCEpostgres =# select setval('test_logical_repl_id_seq' , 2); setval -------- 2 (1 row)postgres =# \d+ test_logical_repl_id_seq Sequence "public.test_logical_repl_id_seq" Type | Start | Minimum | Maximum | Increment | Cycles? | Cache ---------+-------+---------+------------+-----------+---------+------- integer | 1 | 1 | 2147483647 | 4 | no | 1 Owned by: public.test_logical_repl.idpostgres =# select nextval('test_logical_repl_id_seq' ); nextval --------- 6 (1 row)postgres =# select nextval('test_logical_repl_id_seq' ); nextval --------- 10 (1 row)postgres =#
两实例创建发布 主实例1 1 2 3 4 5 6 7 8 9 10 11 12 pg13=# create publication pub_pg13 for table test_logical_repl with (publish = 'insert, delete, update'); CREATE PUBLICATION pg13=# pg13=# \dRp+ Publication pub_pg13 Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ----------+------------+---------+---------+---------+-----------+---------- postgres | f | t | t | t | f | f Tables : "public.test_logical_repl" pg13=#
主实例2 1 2 3 4 5 6 7 8 9 10 11 postgres=# create publication pub_pg12 for table test_logical_repl with (publish = 'insert, delete, update'); CREATE PUBLICATION postgres=# \dRp+ Publication pub_pg12 Owner | All tables | Inserts | Updates | Deletes | Truncates ----------+------------+---------+---------+---------+----------- postgres | f | t | t | t | f Tables : "public.test_logical_repl" postgres=#
创建触发器,防止死循环 主实例1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 pg13=# CREATE OR REPLACE FUNCTION public .prevent_round_robin_func () RETURNS trigger LANGUAGE plpgsql STRICT AS $function $declare replica_pids int [];begin perform from pg_stat_activity where backend_type ~ 'logical replication worker' and pid = pg_backend_pid(); if not found then NEW .islocal = true ; elsif NEW .islocal = false then return null; else NEW .islocal = false ; end if ; return NEW ;end ; $function $;CREATE FUNCTION pg13 =#pg13 =# create trigger tg before insert or update on test_logical_repl for each row execute FUNCTION prevent_round_robin_func () ;CREATE TRIGGER pg13=#
主实例2 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 postgres= postgres- postgres- postgres- postgres- postgres$# declare replica_pids int[]; postgres$# begin postgres$# perform from pg_stat_activity where backend_type ~ 'logical replication worker' and pid = pg_backend_pid(); postgres$# if not found then postgres$# NEW.islocal = true ; postgres$# elsif NEW.islocal = false then postgres$# return null; postgres$# else postgres$# NEW.islocal = false ; postgres$# end if ; postgres$# return NEW; postgres$# end ; postgres$# $function$; CREATE FUNCTION postgres= postgres= CREATE TRIGGER postgres=
订阅 主实例1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 pg13= NOTICE: created replication slot "sub_pg13" on publisher CREATE SUBSCRIPTION pg13= pg13= pid | usesysid | usename | application_name | client_addr | client_hostname | client_port | backend_start | backend_xmin | state | sent_lsn | write_lsn | flush_lsn | replay_lsn | write_lag | flush_lag | replay_lag | sync_priority | sync_state | r eply_time ---------+----------+----------+------------------+-------------+-----------------+-------------+-------------------------------+--------------+-----------+-----------+-----------+-----------+------------+-----------+-----------+------------+---------------+------------+----------- -------------------- 1847284 | 16385 | postgres | sub_pg12 | 127.0.0.1 | | 40564 | 2021-05-26 14:52:31.174826+08 | | streaming | 0/B9416A0 | 0/B9416A0 | 0/B9416A0 | 0/B9416A0 | | | | 0 | async | 2021-05-26 14:53:21.238743+08 (1 row) pg13=#
主实例2 1 2 3 4 5 6 7 8 9 10 11 12 13 14 postgres= NOTICE: created replication slot "sub_pg12" on publisher CREATE SUBSCRIPTION postgres= postgres= pid | usesysid | usename | application_name | client_addr | client_hostname | client_port | backend_start | backend_xmin | state | sent_lsn | write_lsn | flush_lsn | replay_lsn | write_lag | flush_lag | replay_lag | sync_priority | sync_state | reply_time ---------+----------+----------+------------------+-------------+-----------------+-------------+-------------------------------+--------------+-----------+-------------+-------------+-------------+-------------+-----------+-----------+------------+---------------+------------+---- --------------------------- 1847272 | 10 | postgres | sub_pg13 | 127.0.0.1 | | 40110 | 2021-05-26 14:52:10.578243+08 | | streaming | 10/7F7646E0 | 10/7F7646E0 | 10/7F7646E0 | 10/7F7646E0 | | | | 0 | async | 202 1-05-26 14:53:28.851805+08 (1 row) postgres=#
冲突的解决 停掉冲突端订阅,手动删除冲突的数据,这样就不会与复制过来的数据冲突了 使用 pg_replication_origin_advance跳过冲突的事务 发布端获取冲突的lsn
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 pg13=# SELECT pg_copy_logical_replication_slot('sub_pg12' , 'sub_pg12_copy_1' , true , 'test_decoding' ); pg_copy_logical_replication_slot (sub_pg12_copy_1,0 /109 FDF28) (1 row ) pg13=# pg13=# SELECT * from pg_logical_slot_peek_changes('sub_pg12_copy_1' , NULL , NULL ); lsn | xid | data 0 /109 FE1E0 | 55884 | BEGIN 55884 0 /109 FE1E0 | 55884 | table public .test_logical_repl: INSERT : id[integer ]:108 islocal[boolean ]:true create_time[timestamp with time zone ]:'2021-05-26 16:19:39.554447+08' name [text ]:'pg13' update_time[timestamp with time zone ]:'2021-05-26 16:19:39.554234+08' 0 /109 FE380 | 55884 | COMMIT 55884 (3 rows ) pg13=#
一般是第一个记录的commit这里。 也就是0/109FE380
订阅端执行pg_replication_origin_advance
1 2 3 4 5 6 7 8 9 10 11 12 13 14 postgres=# select * from pg_replication_origin_status; local_id | external_id | remote_lsn | local_lsn 1 | pg_308115 | 0 /109 FE380 | 10 /8477 F0B0 (1 rows ) postgres=# postgres=# SELECT pg_replication_origin_advance ('pg_308115' , '0/4A903B98' ::pg_lsn); pg_replication_origin_advance (1 row )