半个PostgreSQL DBA,热衷于数据库相关的技术。我的ppt分享https://pan.baidu.com/s/1eRQsdAa https://github.com/chenhuajun https://chenhuajun.github.io
分类: Mysql/postgreSQL
2018-10-04 01:33:42
对一个分布式数据库来说,动态扩缩容是不可回避的需求。但是citus的动态扩缩容功能只在企业版中才有。好消息是,citus的分片信息是存储在元数据表里的,通过修改元数据表,我们完全可以在citus社区版上实现动态的平滑扩缩容。
实验环境可参考《citus实战系列之二实验环境搭建》搭建。
citus提供了现成的管理函数可以添加新的worker节点,但现有的分片表和参考表却不会自动分布到新加的worker上。 我们需要手动移动这些分片,并且要保证分片移动过程中不中断业务。主要过程可以分为以下几个步骤
表复制采用PostgreSQL的逻辑复制实现,因此所有worker节点必须预先打开逻辑复制开关。
wal_level = logical
注1:citus在添加新worker节点时已经在新worker上拷贝了参考表,不需要再人工处理。
注2:扩容时,如果把worker数翻倍,也可以用物理复制实现。使用物理复制时,如果有参考表不能调用master_add_node添加节点,必须手动修改元数据表。逻辑复制不支持复制DDL,物理复制没有这个限制,但物理复制没有逻辑复制灵活,只支持worker粒度的扩容,而且不能实现缩容。
创建以下测试分片表
create table tb1(id int primary key, c1 int); set citus.shard_count=8; select create_distributed_table('tb1','id'); insert into tb1 select id,random()*1000 from generate_series(1,100)id;
检查分片位置
postgres=# select * from pg_dist_placement where shardid in (select shardid from pg_dist_shard where logicalrelid='tb1'::regclass); placementid | shardid | shardstate | shardlength | groupid -------------+---------+------------+-------------+--------- 33 | 102040 | 1 | 0 | 1 34 | 102041 | 1 | 0 | 2 35 | 102042 | 1 | 0 | 1 36 | 102043 | 1 | 0 | 2 37 | 102044 | 1 | 0 | 1 38 | 102045 | 1 | 0 | 2 39 | 102046 | 1 | 0 | 1 40 | 102047 | 1 | 0 | 2 (8 rows)
上面的groupid代表了对应哪个worker
postgres=# select * from pg_dist_node; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster --------+---------+----------+----------+----------+-------------+----------+----------+------------- 1 | 1 | cituswk1 | 5432 | default | f | t | primary | default 2 | 2 | cituswk2 | 5432 | default | f | t | primary | default (2 rows)
在CN节点上执行以下SQL,将新的worker节点cituswk3加入到集群中
SELECT * from master_add_node('cituswk3', 5432);
检查pg_dist_node元数据表。新的worker节点的groupid为4
postgres=# select * from pg_dist_node; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster --------+---------+----------+----------+----------+-------------+----------+----------+------------- 1 | 1 | cituswk1 | 5432 | default | f | t | primary | default 2 | 2 | cituswk2 | 5432 | default | f | t | primary | default 4 | 4 | cituswk3 | 5432 | default | f | t | primary | default (3 rows)
目前cituswk1和cituswk2上各有4个分片,cituswk3上没有分片,为了保持数据分布均匀可以移动部分分片到cituswk3上。
下面移动cituswk1上的分片102046到cituswk3上。
在cituswk1上创建PUBLICATION
CREATE PUBLICATION pub_shard FOR TABLE tb1_102046;
在cituswk3上创建分片表和SUBSCRIPTION
create table tb1_102046(id int primary key, c1 int); CREATE SUBSCRIPTION sub_shard CONNECTION 'host=cituswk1' PUBLICATION pub_shard;
锁表,阻止应用修改表
lock table tb1 IN EXCLUSIVE MODE;
等待数据完全同步后,修改元数据
update pg_dist_placement set groupid=4 where shardid=102046 and groupid=1;
在cituswk1上删除分片表和PUBLICATION
DROP PUBLICATION pub_shard; drop table tb1_102046;
在cituswk3上删除SUBSCRIPTION
DROP SUBSCRIPTION sub_shard;
参考分片表扩容的步骤,将要删除的worker(cituswk3)上的分片(102046)移到其它worker(cituswk1)上,然后删除worker(cituswk3)。
select master_remove_node('cituswk3',5432);
citus的分片表之间存在亲和性关系,具有亲和性(即colocationid相同)的所有分片表的同一范围的分片其所在位置必须相同。 移动某个分片时,必须将这些亲和分片捆绑移动。可以通过以下SQL查出某个分片的所有亲和分片。
postgres=# select * from pg_dist_shard where logicalrelid in(select logicalrelid from pg_dist_partition where colocationid=(select colocationid from pg_dist_partition where partmethod='h' and logicalrelid='tb1'::regclass)) and (shardminvalue,shardmaxvalue)=(select shardminvalue,shardmaxvalue from pg_dist_shard where shardid=102046); logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue --------------+---------+--------------+---------------+--------------- tb1 | 102046 | t | 1073741824 | 1610612735 tb2 | 102055 | t | 1073741824 | 1610612735 (2 rows)
对应的分片表元数据如下:
postgres=# select logicalrelid,partmethod,colocationid from pg_dist_partition; logicalrelid | partmethod | colocationid --------------+------------+-------------- tb1 | h | 2 tb2 | h | 2 tb3 | h | 4 (3 rows)
在实际生产环境中,citus集群中可能会存储了非常多的表,每个表又拆成了非常多的分片。如果按照上面的步骤手工对citus扩缩容,将是一件非常痛苦的事情,也很容易出错。所以需要将这些步骤打包成自动化程序。
citus企业版在扩缩容时利用了一个叫master_move_shard_placement()的函数迁移分片,我们可以实现一个接口类似的函数citus_move_shard_placement()。
CREATE TYPE citus.old_shard_placement_drop_method AS ENUM ( 'none', -- do not drop or rename old shards, only record it into citus.citus_move_shard_placement_remained_old_shard 'rename', -- move old shards to schema "citus_move_shard_placement_recyclebin" 'drop' -- drop old shards in source node ); CREATE TABLE citus.citus_move_shard_placement_remained_old_shard( id serial primary key, optime timestamptz NOT NULL default now(), nodename text NOT NULL, nodeport text NOT NULL, tablename text NOT NULL, drop_method citus.old_shard_placement_drop_method NOT NULL ); -- move this shard and it's all colocated shards from source node to target node. -- drop_method define how to process old shards in the source node, default is 'none' which does not block SELECT. -- old shards should be drop in the future will be recorded into table citus.citus_move_shard_placement_remained_old_shard CREATE OR REPLACE FUNCTION pg_catalog.citus_move_shard_placement(shard_id bigint, source_node_name text, source_node_port integer, target_node_name text, target_node_port integer, drop_method citus.old_shard_placement_drop_method DEFAULT 'none') RETURNS void AS $citus_move_shard_placement$ ... 这部分太长了,略过 ... $citus_move_shard_placement$ LANGUAGE plpgsql SET search_path = 'pg_catalog','public'; -- drop old shards in source node CREATE OR REPLACE FUNCTION pg_catalog.citus_move_shard_placement_cleanup() RETURNS void AS $$ BEGIN delete from citus.citus_move_shard_placement_remained_old_shard where id in (select id from (select id,dblink_exec('host='||nodename || ' port='||nodeport,'DROP TABLE IF EXISTS ' || tablename) drop_result from citus.citus_move_shard_placement_remained_old_shard)a where drop_result='DROP TABLE'); PERFORM run_command_on_workers('DROP SCHEMA IF EXISTS citus_move_shard_placement_recyclebin CASCADE'); END; $$ LANGUAGE plpgsql SET search_path = 'pg_catalog','public';
注:上面的工具函数未经过严格的测试,并且不支持后面的多CN架构。
下面是一个使用的例子
把102928分片从cituswk1迁移到cituswk2,drop_method使用rename旧的分片不删除而是移到名为citus_move_shard_placement_recyclebin的schema下。
postgres=# select citus_move_shard_placement(102928,'cituswk1',5432,'cituswk2',5432,'rename'); NOTICE: BEGIN move shards(102928,102944) from cituswk1:5432 to cituswk2:5432 NOTICE: [1/2] LOCK TABLE scale_test.tb_dist2 IN SHARE UPDATE EXCLUSIVE MODE ... NOTICE: [2/2] LOCK TABLE scale_test.tb_dist IN SHARE UPDATE EXCLUSIVE MODE ... NOTICE: CREATE PUBLICATION in source node cituswk1:5432 NOTICE: create shard table in the target node cituswk2:5432 NOTICE: CREATE SUBSCRIPTION on target node cituswk2:5432 NOTICE: wait for init data sync... NOTICE: init data sync in 00:00:01.010502 NOTICE: [1/2] LOCK TABLE scale_test.tb_dist2 IN EXCLUSIVE MODE ... NOTICE: [2/2] LOCK TABLE scale_test.tb_dist IN EXCLUSIVE MODE ... NOTICE: wait for data sync... NOTICE: data sync in 00:00:00.273212 NOTICE: UPDATE pg_dist_placement NOTICE: DROP SUBSCRIPTION and PUBLICATION NOTICE: END citus_move_shard_placement ---------------------------- (1 row)
从上面的输出可以看出,有一个步骤是锁表,这段时间内所有SQL都会被阻塞。对分析型业务来说,几十秒甚至更长SQL执行时间是很常见的,这意味着有可能出现先拿到一个表的锁,再拿下一个锁时,等了几十秒。更糟糕的情况下还可能发生死锁。 回避这种风险的办法是将drop_method设置为none,这也是默认值。drop_method为none时将会改为获取一个EXCLUSIVE锁,EXCLUSIVE锁和SELECT不会冲突。这大大降低了分片迁移对业务的影响,死锁发生的概率也同样大大降低(仅有可能发生在应用程序在一个事务里先后更新了2张分片表时)。
确认扩容成功后,删除残留的旧分片(drop_method为drop时不需要清理)。
postgres=# select citus_move_shard_placement_cleanup(); citus_move_shard_placement_cleanup ------------------------------------ (1 row)