学习是一种信仰。
分类: Oracle
2014-05-19 15:38:34
1 前言
一般系统的应用可以分为:立即要执行和可以延迟要执行的事情,区分这个很重要。
为了提高系统的性能,缩短系统等待时间,引入队列技术。
队列是一种能将应用程序的处理工作有效地划分为前台任务和后台任务的技术。当处理容量允许时,这种技术通过存储消息、确定消息处理的优先顺序和向应用程序提交消息来发挥作用。它使你能够平衡本地计算机的负荷,或将任务分配到远程计算机。
为了减少用户的等待时间,应用程序可以让说明需要后台处理的消息排入队列。然后就可以从页面的呈递过程中去掉该处理任务。由一个后台进程来读取并队列处理这些消息,或者甚至可以交由一个单独的系统来处理它们。
队列可以实现各个系统之间的数据共享,消息通信。
2 功能概述
书写本文的目的:利用oracle高级队列实现pl/sql代码,为其它语言实现高级队列的功能作接口。
Oracle高级队列有一下好处:
1 高级队列管理是Oracle数据库的一个特性,它提供消息队列管理功能。这是一个非常可靠、安全和可伸缩的消息管理系统,因为它使用与其他基于Oracle技术的应用程序相同的数据库特性。
2 高级队列管理的一个很大优点是它可以通过PL/SQL、Java或C来访问,这样你就可以把来自一个Java servlet的消息入队列和使PL/SQL存储过程中的相同消息出队列。
3高级队列管理的另一个优点是你可以利用这一软件通过Oracle Net Services (SQL*Net)、HTTP(S)和SMTP,在远程节点之间传播消息。高级队列甚至可以通过消息网关与非Oracle的消息管理系统(如IBM MQSeries)相集成。
4 Oracle高级队列管理提供了单消费者队列和多消费者队列。单消费者队列只面向单一的接收者。多消费者队列可以被多个接收者使用。当把消息放入多消费者队列时,应用程序的程序员必须显式地在消息属性中指定这些接收者,或者建立决定每条消息的接收者的基于规则的订阅过程。
3、队列的开发
主要有:队列的管理和队列的操作。
具体开发步骤如下:
1 首先确定应用的需求,是否适合使用高级队列?使用高级队列预计提高性能的预期值
2确定队列包体结构。
3 队列管理。
4 队列操作。
2 确定队列包体结构
底层消息的通信oracle已经实现,并且封装了,现在需要执行消息包体的结构,消息的发送和消息接收要通过一定的消息结构完成。
现在以短信push为例子,完成高级队列在短信push的应用。
安装好oracle数据库后,对smpuser授权
Grant aq_administrator_role To smpuser;
注意:需要把dbms_aq and dbms_aq_admin 包的调用功能权限付给smpuser.
CREATE OR REPLACE Type mt_struc As Object
/*
功能:push 的mt结构,和T_OUTBOX_PUSH结构一致
日期:2006-09-20
write by hancy
*/
(
OP_ID NUMBER ,
DEST_MOBILENO VARCHAR2(32) ,
FEE_MOBILENO VARCHAR2(32) ,
LONG_NO VARCHAR2(21) ,
MT_CONTENT VARCHAR2(500),
SERVICE_ID VARCHAR2(10) ,
MT_FLAG NUMBER(1) ,
MSG_FMT NUMBER(1) ,
FEE_TYPE NUMBER(1) ,
FEE_VALUE NUMBER(4) ,
BEG_REPORT NUMBER(1) ,
CLIENT_ID NUMBER(4) ,
CLIENT_NAME VARCHAR2(10) ,
MT_TIME DATE ,
MT_ID VARCHAR2(40) ,
PLAN_ID NUMBER ,
PUSH_TYPE VARCHAR2(20) ,
SEND_DATE DATE
)
4 队列管理
4.1建立队列表
队列表是队列的集合,代码如下:
创建下行和上行队列表:
exec dbms_aqadm.create_queue_table(queue_table=>'sms_mt_tab', queue_payload_type=>'mt_struc');
默认:只有一个接收者
具体参数如下:
SQL> desc dbms_aqadm.create_queue_table
Parameter Type Mode Default?
------------------ -------------- ---- --------
QUEUE_TABLE VARCHAR2 IN
QUEUE_PAYLOAD_TYPE VARCHAR2 IN
STORAGE_CLAUSE VARCHAR2 IN Y
SORT_LIST VARCHAR2 IN Y
MULTIPLE_CONSUMERS BOOLEAN IN Y
MESSAGE_GROUPING BINARY_INTEGER IN Y
COMMENT VARCHAR2 IN Y
AUTO_COMMIT BOOLEAN IN Y
PRIMARY_INSTANCE BINARY_INTEGER IN Y
SECONDARY_INSTANCE BINARY_INTEGER IN Y
COMPATIBLE VARCHAR2 IN Y
NON_REPUDIATION BINARY_INTEGER IN Y
SECURE BOOLEAN IN Y
参考dbms_aqadm.create_queue_table
1.存储参数storage_clause 可以是 MAXTRANS,LOB等
2.sort_list 可以是PRIORITY,enq_time这2个参数或者其中一个。具体可以察看对应的 队列表的数据结构。例如下面的例子
--创建队列表,‘sms_aq_tab’ 队列表名,' sms_aq_tab '队列对象名(充当消息体的对象名称),以下同
dbms_aqadm.create_queue_table('WDZAQTABLE','WDZAQMSG');
--创建具有排序功能的队列表
dbms_aqadm.create_queue_table('WDZSQRTAQTABLE','WDZAQMSG',sort_list => 'PRIORITY,enq_time');
3。消息分组参数 message_grouping 可以是 NONE,或 TRANSACTIONAL
-- message grouping
dbms_aqadm.TRANSACTIONAL CONSTANT BINARY_INTEGER := 1;
dbms_aqadm.NONE CONSTANT BINARY_INTEGER := 0;后者表示一个与事务相关的消息分成1组,在提取消息的时候可以当成一组相关消息来提取。例如:
--创建带带分组功能的消息队列表
dbms_aqadm.create_queue_table('WDZGROUPAQTABLE','WDZAQMSG',
sort_list => 'PRIORITY,enq_time',
message_grouping =>dbms_aqadm.TRANSACTIONAL/*dbms_aqadm.NONE*/ );
4.multiple_consumers表示消息接受者是否为多个用户。默认是只有1个接收者。如果要多个用户可以接受消息,需要设置=true
--创建多个接收者的消息队列表
dbms_aqadm.create_queue_table('WDZMUTIAQTABLE','WDZAQMSG',sort_list => 'PRIORITY,enq_time',
message_grouping =>dbms_aqadm.TRANSACTIONAL/*dbms_aqadm.NONE*/
,multiple_consumers => true);
4.2删除队列表
Exec dbms_aqadm.drop_queue_table(queue_table => 'sms_mt_tab');
Exec dbms_aqadm.drop_queue_table(queue_table => 'sms_mo_tab');
4.3建立队列
建立队列代码如下:
exec dbms_aqadm.create_queue(queue_name=>'sms_mt_queue', queue_table=>'sms_mt_tab');
exec dbms_aqadm.create_queue(queue_name=>'sms_mt_queue_exception', queue_table=>'sms_mt_tab',queue_type=>dbms_aqadm.EXCEPTION_QUEUE);
exec dbms_aqadm.create_queue(queue_name=>'sms_mt_queue_backup', queue_table=>'sms_mt_tab');
参数含义:
参考 dbms_aqadm.create_queue
--创建队列, sms_mt_queue为队列的名称,' sms_mt_tab ' 队列表名
dbms_aqadm.create_queue('WDZQUEUE','WDZAQTABLE',queue_type => dbms_aqadm.NORMAL_QUEUE);参数queue_type 表示队列是否为正常队列还是异常队列,参数值为dbms_aqadm.NORMAL_QUEUE 或者dbms_aqadm.EXCEPTION_QUEUE
--创建多用户队列, 'WDZQUEUE' 为队列的名称,'WDZAQTABLE' 队列表名
dbms_aqadm.create_queue('WDZMUTIQUEUE','WDZMUTIAQTABLE',
queue_type => dbms_aqadm.NORMAL_QUEUE);
--创建分组队列, 'WDZGROUPQUEUE' 为队列的名称,'WDZAQTABLE' 队列表名
dbms_aqadm.create_queue('WDZGROUPQUEUE', 'WDZAQTABLE',
queue_type => dbms_aqadm.NORMAL_QUEUE);
4.4删除队列
代码如下:
Exec dbms_aqadm.drop_queue('sms_mt_queue');
4.5队列参数调整
A 创建非持久队列
非持久队列 顾名思义就是没有永久保存到数据的队列,队列只存在于系统的内存中。
参考 dbms_aqadm.create_np_queue
dbms_aqadm. create_np_queue ('sms_mt_queue', multiple_consumers=>false);
B 启动一个队列,' sms_mt_queue ' 为队列的名称
dbms_aqadm.start_queue('sms_mt_queue',enqueue=>true, dequeue=> true);
C 停止队列
参数的意思参考启动一个队列
dbms_aqadm.stop_queue(sms_mt_queue ',enqueue=>false, dequeue=>true);
5 队列操作
说明:
1 入队
P100_MT_ENQUEUE
(
p_equeue_name In varchar2, --队列名单大写字母,主队列:SMS_MT_QUEUE 备份队列:SMS_MT_QUEUE_BACKUP
p_body In t_outbox_push%Rowtype,--入参,记录类型,数据源头直接调用
p_level In Number:=3, --优先级别1-5,数值越小越快
p_delay In Number:=0, --入队延迟时间,单位:秒
p_res_str OUT VARCHAR2, --0 成功 其它失败
p_msg_id OUT Varchar2 --返回的msgid,主键
)
2 出队列
p102_mt_dequeue
(
p_equeue_name In varchar2, --队列名单大写字母,主队列:SMS_MT_QUEUE 备份队列:SMS_MT_QUEUE_BACKUP
p_client_id Out Number, --短信内部网关
p_res_str Out varchar2, --返回值 0 成功 -2 队列为空其它失败
p_label out varchar2, --标签
p_body out varchar2) --数据包体
3 数据格式转换 p103_change_label_body_str
(
p_dequeue_body In Mt_Struc,
p_res_str Out Varchar2,
p_label Out varchar2,
p_body Out varchar2
)/*
功能:把出队列的字符串翻译成有规则的label,和body
日期:2006-09-20
write by hancy
*/
4 测试入队
procedure P101_MT_ENQUEUE_TEST
(
p_equeue_name In varchar2,--队列名单大写字母,主队列:SMS_MT_QUEUE 备份队列:SMS_MT_QUEUE_BACKUP
p_res_str out Varchar2,
p_msg_id OUT Varchar2
)
5.1入队
create or replace procedure P100_MT_ENQUEUE
(
p_equeue_name In varchar2, --队列名单大写字母,主队列:SMS_MT_QUEUE 备份队列:SMS_MT_QUEUE_BACKUP
p_body In t_outbox_push%Rowtype,--入参,记录类型,数据源头直接调用
p_level In Number:=3, --优先级别1-5,数值越小越快
p_delay In Number:=0, --入队延迟时间,单位:秒
p_res_str OUT VARCHAR2, --0 成功 其它失败
p_msg_id OUT Varchar2 --返回的msgid,主键
)/*
功能:数据源头,push,的入队操作
日期:2006-09-20
write by hancy
*/
Is
v_enqueue_options dbms_aq.enqueue_options_t;
v_message_properties dbms_aq.message_properties_t;
v_message_handle raw(16);
v_body mt_struc;---写日志区域
vPid NUMBER:=100;
vProName VARCHAR2(50):='P100_MT_ENQUEUE';
vProTip VARCHAR2(255);
vErrorCode VARCHAR2(20);
vErrorMsg VARCHAR2(2000);
Begin
--1初始化
p_res_str:='-1';
--2 P_BODY 到 v_body 的转化 把一个字符串转化为一个结构体
v_body:=mt_struc(
p_body.OP_ID ,
p_body.DEST_MOBILENO ,
p_body.FEE_MOBILENO ,
p_body.LONG_NO ,
p_body.MT_CONTENT ,
p_body.SERVICE_ID ,
p_body.MT_FLAG ,
p_body.MSG_FMT ,
p_body.FEE_TYPE ,
p_body.FEE_VALUE ,
p_body.BEG_REPORT ,
p_body.CLIENT_ID ,
p_body.CLIENT_NAME ,
p_body.MT_TIME ,
p_body.MT_ID ,
p_body.PLAN_ID ,
p_body.PUSH_TYPE ,
p_body.SEND_DATE );
--4设置属性和参数---
--指定异常队列
v_message_properties.exception_queue:='SMS_MT_QUEUE_EXCEPTION';
--设置优先级别
v_message_properties.priority :=p_level;
--设置延时时间--秒
v_message_properties.delay :=p_delay;
--5 开始入队
dbms_aqadm.start_queue(p_equeue_name,enqueue=>true, dequeue=> true);
dbms_aq.enqueue(queue_name=>p_equeue_name,
enqueue_options=>v_enqueue_options,
message_properties=>v_message_properties,
payload=>v_body,
msgid=>v_message_handle);
P_MSG_ID:=v_message_handle ;
--6 commit
Commit;
p_res_str:='0';
Exception
When Others Then
p_res_str:='-100';
vProTip:='入队过程异常!';
vErrorCode:=Sqlcode;
vErrorMsg:=SQLERRM;
p_pub_write_error_log(vPid,vProName,'','',
vProTip,vErrorCode,vErrorMsg,1);
Rollback;
end P100_MT_ENQUEUE;
create or replace procedure P101_MT_ENQUEUE_TEST
(
p_equeue_name In varchar2,--队列名单大写字母,主队列:SMS_MT_QUEUE 备份队列:SMS_MT_QUEUE_BACKUP
p_res_str out Varchar2,
p_msg_id OUT Varchar2
) /*
功能:测试入队
日期:2006-09-20
write by hancy
*/
Is
Cursor cur_push Is
Select * From t_outbox_push;
v_row_push t_outbox_push%Rowtype;
v_exe_res varchar2(200);
v_message_handle raw(16);
begin
p_res_str:='-1';
Open cur_push;
Loop
Fetch cur_push Into v_row_push;
Exit When cur_push%Notfound;
P100_MT_ENQUEUE(p_equeue_name,v_row_push,3,0,v_exe_res,v_message_handle);
End Loop;
Close cur_push;
p_res_str:=v_exe_res;
p_msg_id:=v_message_handle;
Exception
When Others Then
p_res_str:=Sqlerrm;
Rollback;
end P101_MT_ENQUEUE_TEST;
5.2出队
create or replace procedure p102_mt_dequeue
(
p_equeue_name In varchar2, --队列名单大写字母,主队列:SMS_MT_QUEUE 备份队列:SMS_MT_QUEUE_BACKUP
p_client_id Out Number, --短信内部网关
p_res_str Out varchar2, --返回值 0 成功 -2 队列为空其它失败
p_label out varchar2, --标签
p_body out varchar2) --数据包体
Is/*
功能: 出队列,返回给java程序
日期:2006-09-20
write by hancy
*/
v_Dequeue_Options Dbms_Aq.Dequeue_Options_t;
v_Message_Properties Dbms_Aq.Message_Properties_t;
v_Message_Handle Raw(16);
v_Body_queue Mt_Struc;
v_label Varchar2(200);
v_body varchar2(1000);
res_str varchar2(20);
---写日志区域
vPid NUMBER:=102;
vProName VARCHAR2(50):='p102_mt_dequeue';
vProTip VARCHAR2(255);
vErrorCode VARCHAR2(20);
vErrorMsg VARCHAR2(2000);
v_count Number;
Begin
--1 初始化
p_res_str:='-1' ;
--2 出队列设置属性
--???
--3 执行出队列操作
Select Count(0) Into v_count From sms_mt_tab Where q_name=p_equeue_name ;
If v_count >0 Then
dbms_aqadm.start_queue(p_equeue_name,enqueue=>true, dequeue=> true);
Dbms_Aq.Dequeue(Queue_Name => p_equeue_name,
Dequeue_Options => v_Dequeue_Options,
Message_Properties => v_Message_Properties,
Payload => v_Body_queue,
Msgid => v_Message_Handle);
-- 组和 label= 消息类型(CommandID)+运营商标识+地区标识(AreaID)+时间标识
-- 组合 body
--4 返回给java的包体
p103_change_label_body_str(v_Body_queue,res_str,v_label,v_body);
If res_str='0' Then
p_label :=v_label;
p_body :=v_body;
p_client_id:=v_Body_queue.CLIENT_ID;
--提交
Commit;
p_res_str:='0' ;
Else
p_res_str:='出队列后组串失败!' ;
End If;
Else
p_res_str:='队列里面没有数据,休息一下';
End If;
Exception
When Others Then
p_res_str:='-100';
vProTip:='出队过程异常!';
vErrorCode:=SQLCODE;
vErrorMsg:=SQLERRM;
p_pub_write_error_log(vPid,vProName,'','',
vProTip,vErrorCode,vErrorMsg,1);
Rollback;
end p102_mt_dequeue;
create or replace procedure p103_change_label_body_str
(
p_dequeue_body In Mt_Struc,
p_res_str Out Varchar2,
p_label Out varchar2,
p_body Out varchar2
)/*
功能:把出队列的字符串翻译成有规则的label,和body
日期:2006-09-20
write by hancy
*/
Is
---写日志区域
vPid NUMBER:=103;
vProName VARCHAR2(50):='p103_change_label_body_str';
vProTip VARCHAR2(255);
vErrorCode VARCHAR2(20);
vErrorMsg VARCHAR2(2000);
--label
v_label Varchar2(50);
v_client_id Number;
v_date_str varchar2(20);
--body
v_PRDID varchar2(10);
v_CHANNELID varchar2(10);
v_ActionID varchar2(20);
v_Result varchar2(20);
v_MSGID varchar2(20);
v_Pk_total varchar2(20);
v_Pk_Number varchar2(20);
v_RegisteredDelivery varchar2(20);
v_ServiceID varchar2(20);
v_FeeUserType varchar2(20);
v_FeeTerminalID varchar2(35);
v_MsgFmt varchar2(20);
v_FeeType varchar2(20);
v_FeeCode varchar2(20);
v_SrcID varchar2(30);
v_DestID varchar2(35);
v_LinkID varchar2(20);
v_MTFlag varchar2(20);
v_FixedCode varchar2(20);
v_RepeatNum varchar2(20);
v_InfoID varchar2(20);
v_MsgContent varchar2(150);
-------------------------------------
v_body varchar2(2000);
begin
p_res_str:='-1';
v_Result :='';
v_MSGID :='';
v_Pk_total :='';
v_Pk_Number:='';
--label
Select to_char(Sysdate,'YY-MM-DD HH:MM:SS') Into v_date_str From dual ;
If p_dequeue_body.CLIENT_ID>150 Then
v_label:='022000000'||v_date_str;
Else
v_label:='021000000'||v_date_str;
End If;
--body
v_PRDID:='0000' ; v_CHANNELID:='0000'; v_ActionID:='0';
v_Result :=lpad(' ',8,' ');
v_MSGID :=lpad(' ',16,' ');
v_Pk_total :=lpad(' ',1,' ');
v_Pk_Number:=lpad(' ',1,' ');
--v_RegisteredDelivery
v_RegisteredDelivery:=lpad(p_dequeue_body.BEG_REPORT,1,' ');
--v_ServiceID
v_ServiceID:=lpad(p_dequeue_body.SERVICE_ID,10,' ');
--v_FeeUserType
v_FeeUserType:='0';
--v_FeeTerminalID
v_FeeTerminalID:=lpad(p_dequeue_body.FEE_MOBILENO,32,' ');
--v_MsgFmt
v_MsgFmt:=lpad(p_dequeue_body.MSG_FMT,2,' ');
--v_FeeType
v_FeeType:=lpad(p_dequeue_body.FEE_TYPE,2,' ');
--v_FeeCode
v_FeeCode:=lpad(p_dequeue_body.FEE_VALUE,6,' ');
--v_SrcID
v_SrcID:=lpad(p_dequeue_body.LONG_NO,21,' ');
--v_DestID
v_DestID:=lpad(p_dequeue_body.DEST_MOBILENO,32,' ');
--v_LinkID
v_LinkID:=lpad(' ',20,' ');
--v_MTFlag varchar2(20);
v_MTFlag:=lpad(p_dequeue_body.MT_FLAG,2,' ');
---v_FixedCode varchar2(20);
v_FixedCode:=lpad(' ',6,' ');
--v_RepeatNum varchar2(20);
v_RepeatNum:='1';
--v_InfoID varchar2(20);
v_InfoID:=lpad(' ',16,' ');
--v_MsgContent varchar2(150);
v_MsgContent:=p_dequeue_body.MT_CONTENT;
--add
v_body:=
(v_PRDID
||v_CHANNELID
||v_ActionID
||v_Result
||v_MSGID
||v_Pk_total
||v_Pk_Number
||v_RegisteredDelivery
||v_ServiceID
||v_FeeUserType
||v_FeeTerminalID
||v_MsgFmt
||v_FeeType
||v_FeeCode
||v_SrcID
||v_DestID
||v_LinkID
||v_MTFlag
||v_FixedCode
||v_RepeatNum
||v_InfoID
||v_MsgContent) ;
--to value
p_label:=v_label;
p_body:=v_body ;
--result
p_res_str:='0';
Exception
When Others Then
p_res_str:=Sqlerrm;
vProTip:='把出队列的字符串翻译异常!';
vErrorCode:=SQLCODE;
vErrorMsg:=SQLERRM;
p_pub_write_error_log(vPid,vProName,'','',
vProTip,vErrorCode,vErrorMsg,1);
Rollback;
end p103_change_label_body_str;