Fork me on GitHub

daydayup863

人生就像一杯茶,不会苦一辈子,但总会苦一阵子。

0%

PostgreSQL双主逻辑复制

利用PostgreSQL逻辑复制和触发器实现PostgreSQL双主复制

准备

实例信息

主实例1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
pg13=#  \conninfo
You are connected to database "pg13" as user "postgres" via socket in "/tmp" at port "5557".
pg13=# show wal_level ;
wal_level
-----------
logical
(1 row)

pg13=# \du postgres
List of roles
Role name | Attributes | Member of
-----------+------------+-----------
postgres | Superuser | {}

pg13=#

主实例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
postgres=# \conninfo
You are connected to database "postgres" as user "postgres" via socket in "/tmp" at port "5556".
postgres=# show wal_level ;
wal_level
-----------
logical
(1 row)

postgres=# \du postgres
List of roles
Role name | Attributes | Member of
-----------+------------------------------------------------------------+-----------
postgres | Superuser, Create role, Create DB, Replication, Bypass RLS | {}

postgres=#

创建测试表

主实例1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
pg13=# create table test_logical_repl(id serial primary key, islocal bool, create_time timestamptz default clock_timestamp(), name text, update_time timestamptz;
CREATE TABLE
pg13=# \d+ test_logical_repl
Table "public.test_logical_repl"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
-------------+--------------------------+-----------+----------+-----------------------------------------------+----------+--------------+-------------
id | integer | | not null | nextval('test_logical_repl_id_seq'::regclass) | plain | |
islocal | boolean | | | | plain | |
create_time | timestamp with time zone | | | clock_timestamp() | plain | |
name | text | | | | extended | |
update_time | timestamp with time zone | | | | plain | |
Indexes:
"test_logical_repl_pkey" PRIMARY KEY, btree (id)
Access method: heap

pg13=# alter sequence test_logical_repl_id_seq increment 4;
ALTER SEQUENCE
pg13=# select setval('test_logical_repl_id_seq', 1)
pg13-# ;
setval
--------
1
(1 row)

pg13=# \d+ test_logical_repl_id_seq
Sequence "public.test_logical_repl_id_seq"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------+-------+---------+------------+-----------+---------+-------
integer | 1 | 1 | 2147483647 | 4 | no | 1
Owned by: public.test_logical_repl.id

pg13=# select nextval('test_logical_repl_id_seq');
nextval
---------
5
(1 row)

pg13=# select nextval('test_logical_repl_id_seq');
nextval
---------
9
(1 row)

pg13=#

主实例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
postgres=# create table test_logical_repl(id serial primary key, islocal bool, create_time timestamptz default clock_timestamp(), name text, update_time timestamptz;
CREATE TABLE
postgres=#
postgres=# \d+ test_logical_repl
Table "public.test_logical_repl"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
-------------+--------------------------+-----------+----------+-----------------------------------------------+----------+--------------+-------------
id | integer | | not null | nextval('test_logical_repl_id_seq'::regclass) | plain | |
islocal | boolean | | | | plain | |
create_time | timestamp with time zone | | | clock_timestamp() | plain | |
name | text | | | | extended | |
update_time | timestamp with time zone | | | | plain | |
Indexes:
"test_logical_repl_pkey" PRIMARY KEY, btree (id)
Access method: heap

postgres=# alter sequence test_logical_repl_id_seq increment 4;
ALTER SEQUENCE
postgres=# select setval('test_logical_repl_id_seq', 2);
setval
--------
2
(1 row)

postgres=# \d+ test_logical_repl_id_seq
Sequence "public.test_logical_repl_id_seq"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------+-------+---------+------------+-----------+---------+-------
integer | 1 | 1 | 2147483647 | 4 | no | 1
Owned by: public.test_logical_repl.id

postgres=# select nextval('test_logical_repl_id_seq');
nextval
---------
6
(1 row)

postgres=# select nextval('test_logical_repl_id_seq');
nextval
---------
10
(1 row)

postgres=#

两实例创建发布

主实例1

1
2
3
4
5
6
7
8
9
10
11
12
pg13=# create publication pub_pg13 for table test_logical_repl with (publish = 'insert, delete, update');
CREATE PUBLICATION
pg13=#
pg13=# \dRp+
Publication pub_pg13
Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
----------+------------+---------+---------+---------+-----------+----------
postgres | f | t | t | t | f | f
Tables:
"public.test_logical_repl"

pg13=#

主实例2

1
2
3
4
5
6
7
8
9
10
11
postgres=# create publication pub_pg12 for table test_logical_repl with (publish = 'insert, delete, update');
CREATE PUBLICATION
postgres=# \dRp+
Publication pub_pg12
Owner | All tables | Inserts | Updates | Deletes | Truncates
----------+------------+---------+---------+---------+-----------
postgres | f | t | t | t | f
Tables:
"public.test_logical_repl"

postgres=#

创建触发器,防止死循环

主实例1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
pg13=# CREATE OR REPLACE FUNCTION public.prevent_round_robin_func()
RETURNS trigger
LANGUAGE plpgsql
STRICT
AS $function$
declare replica_pids int[];
begin
perform from pg_stat_activity where backend_type ~ 'logical replication worker' and pid = pg_backend_pid();
if not found then
NEW.islocal = true;
elsif NEW.islocal = false then
return null;
else
NEW.islocal = false;
end if;
return NEW;
end;
$function$;
CREATE FUNCTION
pg13=#
pg13=# create trigger tg before insert or update on test_logical_repl for each row execute FUNCTION prevent_round_robin_func ();
CREATE TRIGGER
pg13=#

主实例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
postgres=# CREATE OR REPLACE FUNCTION public.prevent_round_robin_func()
postgres-# RETURNS trigger
postgres-# LANGUAGE plpgsql
postgres-# STRICT
postgres-# AS $function$
postgres$# declare replica_pids int[];
postgres$# begin
postgres$# perform from pg_stat_activity where backend_type ~ 'logical replication worker' and pid = pg_backend_pid();
postgres$# if not found then
postgres$# NEW.islocal = true;
postgres$# elsif NEW.islocal = false then
postgres$# return null;
postgres$# else
postgres$# NEW.islocal = false;
postgres$# end if;
postgres$# return NEW;
postgres$# end;
postgres$# $function$;
CREATE FUNCTION
postgres=#
postgres=# create trigger tg before insert or update on test_logical_repl for each row execute FUNCTION prevent_round_robin_func ();
CREATE TRIGGER
postgres=#

订阅

主实例1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
pg13=# create subscription sub_pg13 CONNECTION 'host=127.0.0.1 port=5556 user=postgres dbname=postgres password=postgres' PUBLICATION pub_pg12 with (COPY_DATA=false);
NOTICE: created replication slot "sub_pg13" on publisher
CREATE SUBSCRIPTION
pg13=#
pg13=# table pg_stat_replication ;
pid | usesysid | usename | application_name | client_addr | client_hostname | client_port | backend_start | backend_xmin | state | sent_lsn | write_lsn | flush_lsn | replay_lsn | write_lag | flush_lag | replay_lag | sync_priority | sync_state | r
eply_time
---------+----------+----------+------------------+-------------+-----------------+-------------+-------------------------------+--------------+-----------+-----------+-----------+-----------+------------+-----------+-----------+------------+---------------+------------+-----------
--------------------
1847284 | 16385 | postgres | sub_pg12 | 127.0.0.1 | | 40564 | 2021-05-26 14:52:31.174826+08 | | streaming | 0/B9416A0 | 0/B9416A0 | 0/B9416A0 | 0/B9416A0 | | | | 0 | async | 2021-05-26
14:53:21.238743+08
(1 row)

pg13=#

主实例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
postgres=# create subscription sub_pg12 CONNECTION 'host=127.0.0.1 port=5557 user=postgres dbname=pg13 password=postgres' PUBLICATION pub_pg13 with (COPY_DATA=false);
NOTICE: created replication slot "sub_pg12" on publisher
CREATE SUBSCRIPTION
postgres=#
postgres=# table pg_stat_replication ;
pid | usesysid | usename | application_name | client_addr | client_hostname | client_port | backend_start | backend_xmin | state | sent_lsn | write_lsn | flush_lsn | replay_lsn | write_lag | flush_lag | replay_lag | sync_priority | sync_state |
reply_time
---------+----------+----------+------------------+-------------+-----------------+-------------+-------------------------------+--------------+-----------+-------------+-------------+-------------+-------------+-----------+-----------+------------+---------------+------------+----
---------------------------
1847272 | 10 | postgres | sub_pg13 | 127.0.0.1 | | 40110 | 2021-05-26 14:52:10.578243+08 | | streaming | 10/7F7646E0 | 10/7F7646E0 | 10/7F7646E0 | 10/7F7646E0 | | | | 0 | async | 202
1-05-26 14:53:28.851805+08
(1 row)

postgres=#

冲突的解决

  1. 停掉冲突端订阅,手动删除冲突的数据,这样就不会与复制过来的数据冲突了
  2. 使用 pg_replication_origin_advance跳过冲突的事务

发布端获取冲突的lsn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
pg13=# SELECT pg_copy_logical_replication_slot('sub_pg12', 'sub_pg12_copy_1', true, 'test_decoding');
pg_copy_logical_replication_slot
----------------------------------
(sub_pg12_copy_1,0/109FDF28)
(1 row)

pg13=#
pg13=# SELECT * from pg_logical_slot_peek_changes('sub_pg12_copy_1', NULL, NULL);
lsn | xid | data
------------+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
0/109FE1E0 | 55884 | BEGIN 55884
0/109FE1E0 | 55884 | table public.test_logical_repl: INSERT: id[integer]:108 islocal[boolean]:true create_time[timestamp with time zone]:'2021-05-26 16:19:39.554447+08' name[text]:'pg13' update_time[timestamp with time zone]:'2021-05-26 16:19:39.554234+08'
0/109FE380 | 55884 | COMMIT 55884
(3 rows)

pg13=#

一般是第一个记录的commit这里。 也就是0/109FE380

订阅端执行pg_replication_origin_advance

1
2
3
4
5
6
7
8
9
10
11
12
13
14

postgres=# select * from pg_replication_origin_status;
local_id | external_id | remote_lsn | local_lsn
----------+-------------+------------+-------------
1 | pg_308115 | 0/109FE380 | 10/8477F0B0
(1 rows)

postgres=#
postgres=# SELECT pg_replication_origin_advance ('pg_308115', '0/4A903B98'::pg_lsn);
pg_replication_origin_advance
-------------------------------

(1 row)

-------------本文结束感谢您的阅读-------------
听说,打赏我的人都找到了真爱

欢迎关注我的其它发布渠道