添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
partition_table_concurrently(  
  relation   REGCLASS,              -- 主表OID  
  batch_size INTEGER DEFAULT 1000,  -- 一个事务批量迁移多少记录  
  sleep_time FLOAT8 DEFAULT 1.0)    -- 获得行锁失败时,休眠多久再次获取,重试60次退出任务。  
postgres=# select partition_table_concurrently('part_test'::regclass,  
                             10000,  
                             1.0);  
NOTICE:  worker started, you can stop it with the following command: select stop_concurrent_part_task('part_test');  
 partition_table_concurrently   
------------------------------  
(1 row)  

迁移结束后,主表数据已经没有了,全部在分区中

postgres=# select count(*) from only part_test;  
 count   
-------  
(1 row)  

数据迁移完成后,建议禁用主表,这样执行计划就不会出现主表了

postgres=# select set_enable_parent('part_test'::regclass, false);  
 set_enable_parent   
-------------------  
(1 row)  

方法2,原生分区

使用继承表,触发器,异步迁移,交换表名一系列步骤,在线将非分区表,转换为分区表(交换表名是需要短暂的堵塞)。

关键技术:

1、继承表(子分区)

对select, update, delete, truncate, drop透明。

2、触发器

插入,采用before触发器,数据路由到继承分区

更新,采用before触发器,删除老表记录,同时将更新后的数据插入新表

3、后台迁移数据,cte only skip locked , delete only, insert into new table

4、迁移结束(p表没有数据后),短暂上锁,剥离INHERTI关系,切换到原生分区,切换表名。

将一个表在线转换为LIST分区表(伪HASH分区)。

范围分区类似。

如果要转换为原生HASH分区表,需要提取pg内置HASH分区算法。

1、创建测试表(需要被分区的表)

create table old (id int primary key, info text, crt_time timestamp);  

2、写入1000万测试记录

insert into old select generate_series(1,10000000) , md5(random()::text) , now();  

3、创建子分区(本例使用LIST分区)

do language plpgsql $$    
declare    
  parts int := 4;    
begin    
  for i in 0..parts-1 loop    
    execute format('create table old_mid%s (like old including all) inherits (old)', i);    
    execute format('alter table old_mid%s add constraint ck check(abs(mod(id,%s))=%s)', i, parts, i);    
  end loop;    

4、插入,采用before触发器,路由到新表

create or replace function ins_tbl() returns trigger as $$    
declare    
begin    
  case abs(mod(NEW.id,4))    
    when 0 then    
      insert into old_mid0 values (NEW.*);    
    when 1 then    
      insert into old_mid1 values (NEW.*);    
    when 2 then    
      insert into old_mid2 values (NEW.*);    
    when 3 then    
      insert into old_mid3 values (NEW.*);    
      return NEW;  -- 如果是NULL则写本地父表,主键不会为NULL     
  end case;    
  return null;    
$$ language plpgsql strict;    
create trigger tg1 before insert on old for each row execute procedure ins_tbl();    

5、更新,采用before触发器,删除老表,同时将更新后的数据插入新表

create or replace function upd_tbl () returns trigger as $$  
declare  
begin  
  case abs(mod(NEW.id,4))    
    when 0 then    
      insert into old_mid0 values (NEW.*);    
    when 1 then    
      insert into old_mid1 values (NEW.*);    
    when 2 then    
      insert into old_mid2 values (NEW.*);    
    when 3 then    
      insert into old_mid3 values (NEW.*);    
      return NEW;  -- 如果是NULL则写本地父表,主键不会为NULL     
  end case;    
  delete from only old where id=NEW.id;  
  return null;    
$$ language plpgsql strict;    
create trigger tg2 before update on old for each row execute procedure upd_tbl();    

6、old table 如下

postgres=# \dt+ old  
                    List of relations  
 Schema | Name | Type  |  Owner   |  Size  | Description   
--------+------+-------+----------+--------+-------------  
 public | old  | table | postgres | 730 MB |   
(1 row)  
继承关系如下  
postgres=# \d+ old  
                                               Table "public.old"  
  Column  |            Type             | Collation | Nullable | Default | Storage  | Stats target | Description   
----------+-----------------------------+-----------+----------+---------+----------+--------------+-------------  
 id       | integer                     |           | not null |         | plain    |              |   
 info     | text                        |           |          |         | extended |              |   
 crt_time | timestamp without time zone |           |          |         | plain    |              |   
Indexes:  
    "old_pkey" PRIMARY KEY, btree (id)  
Triggers:  
    tg1 BEFORE INSERT ON old FOR EACH ROW EXECUTE PROCEDURE ins_tbl()  
    tg2 BEFORE UPDATE ON old FOR EACH ROW EXECUTE PROCEDURE upd_tbl()  
Child tables: old_mid0,  
              old_mid1,  
              old_mid2,  
              old_mid3  

7、验证insert, update, delete, select完全符合要求。对业务SQL请求透明。

postgres=# insert into old values (0,'test',now());  
INSERT 0 0  
postgres=# select tableoid::regclass,* from old where id=1;  
 tableoid | id |               info               |         crt_time            
----------+----+----------------------------------+---------------------------  
 old      |  1 | 22be06200f2a967104872f6f173fd038 | 31-JAN-19 12:52:25.887242  
(1 row)  
postgres=# select tableoid::regclass,* from old where id=0;  
 tableoid | id | info |         crt_time            
----------+----+------+---------------------------  
 old_mid0 |  0 | test | 31-JAN-19 13:02:35.859899  
(1 row)  
postgres=# update old set info='abc' where id in (0,2) returning tableoid::regclass,*;  
 tableoid | id | info |         crt_time            
----------+----+------+---------------------------  
 old_mid0 |  0 | abc  | 31-JAN-19 13:02:35.859899  
(1 row)  
UPDATE 1  
postgres=# select tableoid::regclass,* from old where id in (0,2);  
 tableoid | id | info |         crt_time            
----------+----+------+---------------------------  
 old_mid0 |  0 | abc  | 31-JAN-19 13:12:03.343559  
 old_mid2 |  2 | abc  | 31-JAN-19 13:11:04.763652  
(2 rows)  
postgres=# delete from old where id=3;  
DELETE 1  
postgres=# select tableoid::regclass,* from old where id=3;  
 tableoid | id | info | crt_time   
----------+----+------+----------  
(0 rows)  

8、开启压测,后台对原表数据进行迁移

create or replace function test_ins(int) returns void as $$  
declare  
begin  
  insert into old values ($1,'test',now());  
  exception when others then  
  return;  
$$ language plpgsql strict;  
vi test.sql  
\set id1 random(10000001,200000000)  
\set id2 random(1,5000000)  
\set id3 random(5000001,10000000)  
delete from old where id=:id2;  
update old set info=md5(random()::text),crt_time=now() where id=:id3;  
select test_ins(:id1);  
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 4 -j 4 -T 1200  
progress: 323.0 s, 12333.1 tps, lat 0.324 ms stddev 0.036  
progress: 324.0 s, 11612.9 tps, lat 0.344 ms stddev 0.203  
progress: 325.0 s, 12546.0 tps, lat 0.319 ms stddev 0.061  
progress: 326.0 s, 12728.7 tps, lat 0.314 ms stddev 0.038  
progress: 327.0 s, 12536.9 tps, lat 0.319 ms stddev 0.040  
progress: 328.0 s, 12534.1 tps, lat 0.319 ms stddev 0.042  
progress: 329.0 s, 12228.1 tps, lat 0.327 ms stddev 0.047  

9、在线迁移数据

批量迁移,每一批迁移N条。调用以下SQL

with a as (  
delete from only old where ctid = any (array (select ctid from only old limit 1000 for update skip locked) ) returning *  
insert into old select * from a;  
INSERT 0 0  
postgres=# select count(*) from only old;  
  count    
---------  
 9998998  
(1 row)  
postgres=# select count(*) from old;  
  count     
----------  
 10000000  
(1 row)  
postgres=# with a as (                     
delete from only old where ctid = any (array (select ctid from only old limit 1000 for update skip locked) ) returning *  
insert into old select * from a;  
INSERT 0 0  
postgres=# select count(*) from old;  
  count     
----------  
 10000000  
(1 row)  
postgres=# select count(*) from only old;  
  count    
---------  
 9997998  
(1 row)  
postgres=# with a as (                
delete from only old where ctid = any (array (select ctid from only old limit 100000 for update skip locked) ) returning *  
insert into old select * from a;  
INSERT 0 0  
postgres=# select count(*) from only old;  
  count    
---------  
 9897998  
(1 row)  
postgres=# select count(*) from old;  
  count     
----------  
 10000000  
(1 row)  

一次迁移1万条,分批操作。

with a as (                
delete from only old where ctid = any (array (select ctid from only old limit 10000 for update skip locked) ) returning *  
insert into old select * from a;  

持续调用以上接口,直到当old表已经没有数据,完全迁移到了分区。

select count(*) from only old;  
 count   
-------  
(1 row)  

10、切换到分区表

创建分区表如下,分区方法与继承约束一致。

create table new (id int, info text, crt_time timestamp) partition by list (abs(mod(id,4)));    

切换表名,防止雪崩,使用锁超时,由于只涉及表名变更,所以速度非常快。

begin;  
set lock_timeout ='3s';   
alter table old_mid0 no inherit old;   
alter table old_mid1 no inherit old;   
alter table old_mid2 no inherit old;   
alter table old_mid3 no inherit old;   
alter table old rename to old_tmp;  
alter table new rename to old;  
alter table old ATTACH PARTITION old_mid0 for values in (0);    
alter table old ATTACH PARTITION old_mid1 for values in (1);    
alter table old ATTACH PARTITION old_mid2 for values in (2);    
alter table old ATTACH PARTITION old_mid3 for values in (3);    

切换后的原生分区表如下

postgres=# \d+ old  
                                               Table "public.old"  
  Column  |            Type             | Collation | Nullable | Default | Storage  | Stats target | Description   
----------+-----------------------------+-----------+----------+---------+----------+--------------+-------------  
 id       | integer                     |           |          |         | plain    |              |   
 info     | text                        |           |          |         | extended |              |   
 crt_time | timestamp without time zone |           |          |         | plain    |              |   
Partition key: LIST (abs(mod(id, 4)))  
Partitions: old_mid0 FOR VALUES IN (0),  
            old_mid1 FOR VALUES IN (1),  
            old_mid2 FOR VALUES IN (2),  
            old_mid3 FOR VALUES IN (3)  
postgres=# explain select * from old where id=1;  
                                     QUERY PLAN                                        
-------------------------------------------------------------------------------------  
 Append  (cost=0.29..10.04 rows=4 width=44)  
   ->  Index Scan using old_mid0_pkey on old_mid0  (cost=0.29..2.51 rows=1 width=44)  
         Index Cond: (id = 1)  
   ->  Index Scan using old_mid1_pkey on old_mid1  (cost=0.29..2.51 rows=1 width=45)  
         Index Cond: (id = 1)  
   ->  Index Scan using old_mid2_pkey on old_mid2  (cost=0.29..2.51 rows=1 width=44)  
         Index Cond: (id = 1)  
   ->  Index Scan using old_mid3_pkey on old_mid3  (cost=0.29..2.51 rows=1 width=45)  
         Index Cond: (id = 1)  
(9 rows)  
postgres=# explain select * from old where id=? and abs(mod(id, 4)) = abs(mod(?, 4));  
                                     QUERY PLAN                                        
-------------------------------------------------------------------------------------  
 Append  (cost=0.29..2.52 rows=1 width=45)  
   ->  Index Scan using old_mid1_pkey on old_mid1  (cost=0.29..2.51 rows=1 width=45)  
         Index Cond: (id = 1)  
         Filter: (mod(id, 4) = 1)  
(4 rows)  
postgres=# select count(*) from old;  
  count     
----------  
 10455894  
(1 row)  

方法3,logical replication

使用逻辑复制的方法,同步到分区表。

简单步骤如下:

snapshot 快照(lsn位点)  
增量(逻辑复制,从LSN位置开始解析WAL LOG)  

hash函数

postgres=# \df *.*hash*  
                                            List of functions  
   Schema   |           Name           | Result data type |          Argument data types          | Type   
------------+--------------------------+------------------+---------------------------------------+------  
 pg_catalog | hash_aclitem             | integer          | aclitem                               | func  
 pg_catalog | hash_aclitem_extended    | bigint           | aclitem, bigint                       | func  
 pg_catalog | hash_array               | integer          | anyarray                              | func  
 pg_catalog | hash_array_extended      | bigint           | anyarray, bigint                      | func  
 pg_catalog | hash_numeric             | integer          | numeric                               | func  
 pg_catalog | hash_numeric_extended    | bigint           | numeric, bigint                       | func  
 pg_catalog | hash_range               | integer          | anyrange                              | func  
 pg_catalog | hash_range_extended      | bigint           | anyrange, bigint                      | func  
 pg_catalog | hashbpchar               | integer          | character                             | func  
 pg_catalog | hashbpcharextended       | bigint           | character, bigint                     | func  
 pg_catalog | hashchar                 | integer          | "char"                                | func  
 pg_catalog | hashcharextended         | bigint           | "char", bigint                        | func  
 pg_catalog | hashenum                 | integer          | anyenum                               | func  
 pg_catalog | hashenumextended         | bigint           | anyenum, bigint                       | func  
 pg_catalog | hashfloat4               | integer          | real                                  | func  
 pg_catalog | hashfloat4extended       | bigint           | real, bigint                          | func  
 pg_catalog | hashfloat8               | integer          | double precision                      | func  
 pg_catalog | hashfloat8extended       | bigint           | double precision, bigint              | func  
 pg_catalog | hashhandler              | index_am_handler | internal                              | func  
 pg_catalog | hashinet                 | integer          | inet                                  | func  
 pg_catalog | hashinetextended         | bigint           | inet, bigint                          | func  
 pg_catalog | hashint2                 | integer          | smallint                              | func  
 pg_catalog | hashint2extended         | bigint           | smallint, bigint                      | func  
 pg_catalog | hashint4                 | integer          | integer                               | func  
 pg_catalog | hashint4extended         | bigint           | integer, bigint                       | func  
 pg_catalog | hashint8                 | integer          | bigint                                | func  
 pg_catalog | hashint8extended         | bigint           | bigint, bigint                        | func  
 pg_catalog | hashmacaddr              | integer          | macaddr                               | func  
 pg_catalog | hashmacaddr8             | integer          | macaddr8                              | func  
 pg_catalog | hashmacaddr8extended     | bigint           | macaddr8, bigint                      | func  
 pg_catalog | hashmacaddrextended      | bigint           | macaddr, bigint                       | func  
 pg_catalog | hashname                 | integer          | name                                  | func  
 pg_catalog | hashnameextended         | bigint           | name, bigint                          | func  
 pg_catalog | hashoid                  | integer          | oid                                   | func  
 pg_catalog | hashoidextended          | bigint           | oid, bigint                           | func  
 pg_catalog | hashoidvector            | integer          | oidvector                             | func  
 pg_catalog | hashoidvectorextended    | bigint           | oidvector, bigint                     | func  
 pg_catalog | hashtext                 | integer          | text                                  | func  
 pg_catalog | hashtextextended         | bigint           | text, bigint                          | func  
 pg_catalog | hashvarlena              | integer          | internal                              | func  
 pg_catalog | hashvarlenaextended      | bigint           | internal, bigint                      | func  
 pg_catalog | interval_hash            | integer          | interval                              | func  
 pg_catalog | interval_hash_extended   | bigint           | interval, bigint                      | func  
 pg_catalog | jsonb_hash               | integer          | jsonb                                 | func  
 pg_catalog | jsonb_hash_extended      | bigint           | jsonb, bigint                         | func  
 pg_catalog | pg_lsn_hash              | integer          | pg_lsn                                | func  
 pg_catalog | pg_lsn_hash_extended     | bigint           | pg_lsn, bigint                        | func  
 pg_catalog | satisfies_hash_partition | boolean          | oid, integer, integer, VARIADIC "any" | func  
 pg_catalog | time_hash                | integer          | time without time zone                | func  
 pg_catalog | time_hash_extended       | bigint           | time without time zone, bigint        | func  
 pg_catalog | timestamp_hash           | integer          | timestamp without time zone           | func  
 pg_catalog | timestamp_hash_extended  | bigint           | timestamp without time zone, bigint   | func  
 pg_catalog | timetz_hash              | integer          | time with time zone                   | func  
 pg_catalog | timetz_hash_extended     | bigint           | time with time zone, bigint           | func  
 pg_catalog | uuid_hash                | integer          | uuid                                  | func  
 pg_catalog | uuid_hash_extended       | bigint           | uuid, bigint                          | func  
(56 rows)  

在线将表转换为分区表,可以使用的方法:

1、转换为pg_pathman分区,直接调用pg_pathman的UDF即可。

2、转换为原生分区,使用继承,异步迁移的方法。割接是短暂锁表。

不支持 insert ino on conflict 语法。

insert into old values (1,'test',now()) on conflict(id) do update set info=excluded.info, crt_time=excluded.crt_time;  

3、逻辑复制的方法,将数据增量迁移到分区表(目标可以是原生分区方法或者是pg_pathman分区方法的新表)。

《PostgreSQL 9.x, 10, 11 hash分区表 用法举例》

《PostgreSQL 触发器 用法详解 1》

《PostgreSQL 触发器 用法详解 2》

《PostgreSQL 9.5+ 高效分区表实现 - pg_pathman》

免费领取阿里云RDS PostgreSQL实例、ECS虚拟机

PolarDB 开源版 使用PostGIS 以及泰森多边形 解决 "零售、配送、综合体、教培、连锁店等经营"|"通信行业基站建设功率和指向" 的地理最优解问题 阿里云rds并发性能解读-大分区表高并发性能提升100倍?
Redis读写分离实例的原理是:key统一写入到master,然后通过主从复制同步到slave,用户的请求通过proxy做判断,如果是写请求,转发到master;如果是读请求,分散转发到slave,这种架构适合读请求数量远大于写请求数量的业务
PostgreSQL 普通表在线转换为分区表 - online exchange to partition table
PostgreSQL 普通表在线转换为分区表 - online exchange to partition table
大分区表高并发性能提升100倍?阿里云 RDS PostgreSQL 12 特性解读
世界上几乎最强大的开源数据库系统 PostgreSQL,于 2019 年 10 月 3 日发布了 12 版本,该版本已经在阿里云正式发布。PostgreSQL 12 在功能和性能上都有很大提升,如大分区表高并发性能提升百倍,B-tree 索引空间和性能优化,实现 SQL 2016 标准的 JSON 特性,支持多列 MCV(Most-Common-Value)统计,内联 CTE(Common table expressions)以及可插拔的表存储访问接口等。本文对部分特性进行解读。
PostgreSQL9.6支持基本表的分区。这部分将描述为什么以及如何来实现表分区作为你数据库设计的一部分。 概述 分区指的是将逻辑上一的一个大表分成多个小的物理上的片(子表),分区可以提供以下好处: .在某些情况下查询性能能够显著提升,特别是当那些访问压力大的行在一个分区或者少数几个分区时。 PostgreSQL 12 新增三个分区查询函数,如下: pg_partition_tree(regclass): 返回分区表详细信息,例如分区名称、上一级分区名称、是否叶子结点、层级,层级 0 表示顶层父表。 PostgreSQL 11 新特性解读:分区表支持创建主键、外键、索引
PostgreSQL 10 版本虽然支持创建范围分区表和列表分区表,但创建过程依然比较繁琐,需要手工定义子表索引、主键,详见 PostgreSQL10:重量级新特性-支持分区表,PostgreSQL 11 版本得到增强,在父表上创建索引、主键、外键后,子表上将自动创建,本文演示这三种场景。
PostgreSQL 和它的LOGO大象一样,给人非常强大的安全感。 就拿它的Feature来说,一个大的feature要打磨很多年才能正式的合并到master分支。 比如并行计算的特性,从9.4就开始准备,加入了work process和dynamic s PolarDB 开源版 使用PostGIS 以及泰森多边形 解决 "零售、配送、综合体、教培、连锁店等经营"|"通信行业基站建设功率和指向" 的地理最优解问题