Oracle AQ Demo,Step by Step 我准备用AQ来做一个数据仓库系统,提交分析任务队列。有以下需求: 1.利用通知异步的执行存储过程 2.设定队列大小极限 3.出列即删除 OK,let's go for it step 1:创建用户 --create user -- Create the user create user PHS identified by "" default tablespace PHSDATA temporary tablespace TEMP profile DEFAULT; -- 赋予AQ管理权限 grant execute on DBMS_AQ to PHS; grant execute on DBMS_AQ_BQVIEW to PHS; grant execute on Dbms_Aqadm to PHS; -- Grant/Revoke role privileges grant connect to PHS; grant resource to PHS; -- Grant/Revoke system privileges grant create procedure to PHS; grant create table to PHS; grant create view to PHS; grant unlimited tablespace to PHS; step 2:创建一个队列载体对象,一个没有body的type create or replace type task_c as object ( -- Author : WANGWJ -- Created : 2008-1-8 16:00:14 -- Purpose : infomation carrier for analyse Clone-PHS -- Attributes 业务逻辑相关 begindate DATE, enddate DATE, area VARCHAR2(12), taskid NUMBER, phscodex VARCHAR2(20) -- Member functions and procedures ) --创建队列表 begin -- Call the procedure sys.dbms_aqadm.create_queue_table(queue_table => 'QT_CLONE', queue_payload_type => 'task_c',--这就是我们定义的type sort_list => 'priority,enq_time',--按优先级和入列时间排序 multiple_consumers => TRUE, --多消费者 comment => 'queue for analyse CLONE-PHS', auto_commit => FALSE --手动控制事务--create queue ); end; --创建队列 begin sys.dbms_aqadm.create_queue( queue_name => 'q_clone', queue_table => 'qt_clone',--刚刚建立的queue表 queue_type => sys.dbms_aqadm.normal_queue, max_retries => 3,--dequeue失败后重试次数 retry_delay => 1,--重试前等待 retention_time => 0 --dequeue后保持时间,不保持 ); end; step 3:启动队列 execute dbms_aqadm.start_queue('q_clone',true,true); step 4:创建消息订阅者 SQL> execute dbms_aqadm.add_subscriber ( queue_name => 'q_clone', subscriber => sys.aq$_agent ('analyst',null,null)); PL/SQL procedure successfully completed SQL> step 5:入列和出列测试 SQL> --入列 SQL> declare 2 v_Message task_c; 3 v_MsgId RAW(16); 4 v_options DBMS_AQ.ENQUEUE_OPTIONS_T; 5 v_properties DBMS_AQ.MESSAGE_PROPERTIES_T; 6 v_Recipients DBMS_AQ.AQ$_RECIPIENT_LIST_T; 7 begin 8 v_Message:=task_c(begindate => SYSDATE,enddate => SYSDATE-1,area => '028', 9 taskid =>100,phscodex => 'test#$'); 10 v_properties.priority := 1; --该消息的优先级别 11 v_options.visibility :=DBMS_AQ.IMMEDIATE; 12 dbms_aq.enqueue(queue_name => 'q_clone',enqueue_options => v_options,message_properties => v_properties,payload => v_Message,msgid => v_MsgId); 13 dbms_output.put_line('encode success,msgid is '||v_MsgId); 14 15 end; 16 / PL/SQL procedure successfully completed --入列成功 SQL> select t.q_name,t.msgid,t.priority from qt_clone t; Q_NAME MSGID PRIORITY ------------------------------ -------------------------------- ---------- Q_CLONE 7466C75477954808B7E10BC50738845B 1 --改变 v_properties.priority的值为3,2,再入列两次,现在入列的先后顺序为1 3 2,我们希望的出列顺序 --为1 2 3 --出列 declare v_Message task_c; v_MsgId RAW(16); v_options DBMS_AQ.DEQUEUE_OPTIONS_T; v_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_Recipients DBMS_AQ.AQ$_RECIPIENT_LIST_T; begin -- v_Recipients(0) := sys.aq$_agent('NOTE','MTQ',0); -- v_properties.recipient_list := v_Recipients; v_options.visibility :=DBMS_AQ.IMMEDIATE; v_options.consumer_name := 'analyst'; dbms_aq.dequeue(queue_name => 'q_clone',dequeue_options => v_options,message_properties => v_properties,payload => v_Message,msgid => v_MsgId); dbms_output.put_line('decode success,msgid is '||v_MsgId); dbms_output.put_line('subject is '||v_Message.area); end; --测试结果略,可以看出出列的顺序 1 2 3 step 6:创建测试过程,并注册通知 创建测试表 -- Create table 用于在接到通知的时候插入一条消息 create table TEST_AQ ( INFO VARCHAR2(100), MESSAGE TASK_C ) --创建测试过程,插入一条消息,并出列 create or replace procedure plsqlnotif AS v_Message task_c; v_MsgId RAW(16); v_options DBMS_AQ.DEQUEUE_OPTIONS_T; v_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_Recipients DBMS_AQ.AQ$_RECIPIENT_LIST_T; BEGIN v_options.visibility :=DBMS_AQ.IMMEDIATE; v_options.consumer_name := 'analyst'; dbms_aq.dequeue(queue_name => 'q_clone',dequeue_options => v_options,message_properties => v_properties,payload => v_Message,msgid => v_MsgId); dbms_output.put_line('decode success,msgid is '||v_MsgId); dbms_output.put_line('subject is '||v_Message.area); INSERT INTO test_aq VALUES('Get message on ',v_Message); END; --注册 declare reginfolist sys.aq$_reg_info_list; begin reginfolist := sys.aq$_reg_info_list( sys.aq$_reg_info('phs.q_clone:analyst', DBMS_AQ.NAMESPACE_AQ, 'plsql://phs.plsqlnotif', null)); dbms_aq.register(reginfolist, 1); end; step 7:测试情况 --入列 略 --接到通知后,插入test表,并出列 SQL> execute plsqlnotif; decode success,msgid is 59578D93BD55477994D8C9C6B672242B subject is 028 PL/SQL procedure successfully completed SQL> select * from test_aq; INFO MESSAGE -------------------------------------------------------------------------------- ------- Get message on 测试完毕。2008-01-09 特别补充!!: 本例的通知测试第一次并未成功,后来我将接收通知和出列的行为,在另外一个user下面注册和执行,才成功获得通知。