Chinaunix首页 | 论坛 | 博客
  • 博客访问: 56363
  • 博文数量: 18
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 200
  • 用 户 组: 普通用户
  • 注册时间: 2013-03-01 13:27
个人简介

学习是一种信仰

文章分类

全部博文(18)

文章存档

2016年(8)

2015年(8)

2013年(2)

我的朋友

分类: Mysql/postgreSQL

2016-05-25 22:33:13

一、Postgresql流复制环境搭建请参考:http://blog.chinaunix.net/uid-28646132-id-5209986.html
    (各个服务器之间最好做互信,方便脚本访问各个服务器)

二、此容灾环境中,我配置了一个同步的standby和一个异步的standby,为了防止在master节点数据库宕机前的那一刻
    同步standby节点也发生故障,导致数据丢失,故需要对各个standby节点的最后一个已经同步完成的事务号做一下比较,
    默认应该切换到同步standby节点,具体原理这里不做描述

1、用c写一个程序(德哥支持),判断各个standby的最后同步状态

#include "postgres.h"
#include
#include "fmgr.h"
#include "access/xlog.h"
#include "replication/walreceiver.h"
#include "utils/elog.h"
#include "utils/builtins.h"
#include "utils/timestamp.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "catalog/pg_type.h"
#include "utils/pg_lsn.h"

#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif

PG_FUNCTION_INFO_V1(get_rcv_replication_stat);

Datum
get_rcv_replication_stat(PG_FUNCTION_ARGS)
{
        Assert(PG_NARGS() == 0);   // 表示没有输入参数

        if (!RecoveryInProgress())    // 在数据库处于恢复状态下时运行,否则不允许
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 errmsg("recovery is not in progress"),
                                 errhint("This functions can only be executed during recovery.")));

        /* use volatile pointer to prevent code rearrangement */
        volatile WalRcvData *walrcv = WalRcv;     //   共享内存中用于管理流复制的数据结构
        TupleDesc       tupdesc;    // 创建一个行描述变量
        Datum           values[8];    //  创建一个存储值的Datum数组, 需要返回几个字段, 创建相应长度的数组
        bool            nulls[8];        //   创建一个数组, 表示对应的每个值是否为空

        /* Initialise values and NULL flags arrays 初始化 */
        MemSet(values, 0, sizeof(values));
        MemSet(nulls, 0, sizeof(nulls));

        /* Initialise attributes information in the tuple descriptor 定义字段类型和字段名, 到相应的头文件src/include/catalog/pg_type.h找到对应的类型 */
        tupdesc = CreateTemplateTupleDesc(8, false);
        TupleDescInitEntry(tupdesc, (AttrNumber) 1, "last_walend_time",
                                           TIMESTAMPTZOID, -1, 0);
        TupleDescInitEntry(tupdesc, (AttrNumber) 2, "last_recv_lsn",
                                           LSNOID, -1, 0);
        TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_apply_lsn",
                                           LSNOID, -1, 0);
        TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_apply_delay_ms",
                                           INT4OID, -1, 0);
        TupleDescInitEntry(tupdesc, (AttrNumber) 5, "receiver_pid",
                                           INT4OID, -1, 0);
        TupleDescInitEntry(tupdesc, (AttrNumber) 6, "receiver_state",
                                           INT4OID, -1, 0);
        TupleDescInitEntry(tupdesc, (AttrNumber) 7, "receiver_start_time",
                                           INT8OID, -1, 0);
        TupleDescInitEntry(tupdesc, (AttrNumber) 8, "receiver_conninfo",
                                           TEXTOID, -1, 0);
        BlessTupleDesc(tupdesc);   //  完成对返回类型的构造, 参考src/include/funcapi.h
        
//  接下来将每个值转换为对应的Datum存储到values数组, 对应的nulls数组仅当值为空时设置为true.
        TimestampTz receipttime;
        receipttime = walrcv->latestWalEndTime;
        values[0] = TimestampTzGetDatum(receipttime);
        XLogRecPtr      recvPtr;
        recvPtr = GetWalRcvWriteRecPtr(NULL, NULL);
        if (recvPtr == 0)
           nulls[1] = true;
        else
           values[1] = LSNGetDatum(recvPtr);
        XLogRecPtr      applyPtr;
        applyPtr = GetXLogReplayRecPtr(NULL);
        if (recvPtr == 0)
           nulls[2] = true;
        else
           values[2] = LSNGetDatum(applyPtr);
        int  apply_delay_ms;
        apply_delay_ms = GetReplicationApplyDelay();
        if (apply_delay_ms == -1)
           nulls[3] = true;
        else
           values[3] = Int32GetDatum(apply_delay_ms);
        values[4] = Int32GetDatum(walrcv->pid);
        values[5] = Int32GetDatum(walrcv->walRcvState);
        values[6] = Int64GetDatum(walrcv->startTime);
        values[7] = PointerGetDatum(cstring_to_text((char *)walrcv->conninfo));
        //  返回
        /* Returns the record as Datum */
        PG_RETURN_DATUM(HeapTupleGetDatum(
                                                                   heap_form_tuple(tupdesc, values, nulls)));
        
}

2、将程序编译,并copy到数据库安装目录的lib下
gcc -O3 -Wall -Wextra  -I /usr/local/src/postgresql-9.4.1/src/include -g -fPIC -c ./get_upstream_conninfo.c -o get_upstream_conninfo.o
gcc -O3 -Wall -Wextra  -I /usr/local/src/postgresql-9.4.1/src/include -g -shared get_upstream_conninfo.o -o get_upstream_conninfo.so
cp get_upstream_conninfo.so /usr/local/pgsql/lib

3、创建函数和视图
postgres=# create or replace function get_rcv_replication_stat() returns record as '$libdir/libget_upstream_conninfo.so', 'get_rcv_replication_stat' language C STRICT;
postgres=# create or replace view get_rcv_replication_stat as 
select now(),
extract(epoch from now()) as now_epoch,
last_walend_time,
last_recv_lsn,
last_apply_lsn,
last_apply_delay_ms,
receiver_pid,
case receiver_state 
when 0 then 'stopped' 
when 1 then 'starting' 
when 2 then 'streaming' 
when 3 then 'waiting' 
when 4 then 'restarting' 
when 5 then 'stopping' 
else null end as receiver_state,
receiver_start_epoch,                 
conninfo                               
from get_rcv_replication_stat() as   
t (last_walend_time timestamptz,        
last_recv_lsn pg_lsn,                 
last_apply_lsn pg_lsn,             
last_apply_delay_ms int,         
receiver_pid int,     
receiver_state int,                
receiver_start_epoch int8,       
conninfo text );

赋权限:
grant usage on schema public to trace;
grant select on all tables in schema public to trace;

4、在主节点查询
postgres=# select * from get_rcv_replication_stat ;
ERROR:  recovery is not in progress
HINT:  This functions can only be executed during recovery.

postgres=# select * from pg_stat_replication ;
-[ RECORD 1 ]----+------------------------------
pid              | 27217
usesysid         | 16384
usename          | rep
application_name | standby_1
client_addr      | 192.168.236.192
client_hostname  | 
client_port      | 16687
backend_start    | 2016-05-26 04:54:37.051702+08
backend_xmin     | 1823
state            | streaming
sent_location    | 0/502F450
write_location   | 0/502F450
flush_location   | 0/502F450
replay_location  | 0/502F450
sync_priority    | 1
sync_state       | sync
-[ RECORD 2 ]----+------------------------------
pid              | 27265
usesysid         | 16384
usename          | rep
application_name | standby_2
client_addr      | 192.168.236.193
client_hostname  | 
client_port      | 37528
backend_start    | 2016-05-26 04:57:58.36732+08
backend_xmin     | 1823
state            | streaming
sent_location    | 0/502F450
write_location   | 0/502F450
flush_location   | 0/502F450
replay_location  | 0/502F450
sync_priority    | 2
sync_state       | potential

5、在standby节点查询
postgres=# select * from get_rcv_replication_stat ;
-[ RECORD 1 ]--------+-----------------------------------------------------------------------
now                  | 2016-05-26 06:27:26.181334+08
now_epoch            | 1464215246.18133
last_walend_time     | 2016-05-26 06:26:16.884012+08
last_recv_lsn        | 0/502F378
last_apply_lsn       | 0/502F378
last_apply_delay_ms  | 0
receiver_pid         | 1969
receiver_state       | streaming
receiver_start_epoch | 1464209677
conninfo             | host=hadoop port=5432 user=rep password=rep application_name=standby_1

阅读(2995) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~