分类:
2009-07-08 21:17:45
l 定义
l 基本队列
l 队列设定—qmadmin
l 高级队列
l 交易和管道
l ubbconfig
l 代码范例
当进程通过消息方式进行通讯时,消息在被取用前存储的地方称为消息队列。这些队列是一些存储空间,或者是内存(IPC消息队列),或在硬盘上。内存中的队列在写硬盘前可以清除;这样,在主机故障时,就有丢失信息的风险。基于硬盘的队列可以在主机或网络瘫痪时可靠地保存信息,代价是读写硬盘的开销。
商业上用队列方式使任务可以同步完成。一般地,商业系统需要保证系统总是稳定可靠,良好地完成任务。传统上是使用昂贵的容错系统达到此目标,花费主要是在冗于硬件上,使用队列,往往只要复制部分系统就能达到此效果。
作为请求/应答通讯方式的一种替代,可靠队列允许商业应用通过同步或时序方式使用可靠存储队列进行通讯。BEA TUXEDO /Q提供了这种机制。
其特性提供优点如下:
² 可靠传递
² 同步的、自由处理
² 审计、恢复
/Q ATMI通过按照2段式XA事务协议操作,保证可靠的传输,并且有内建的错误处理能力处理意外情况。
/Q 提供的API简单而功能强大,有以下两个函数:
² tpenqueue() – 将消息置入队列
² tpdequeue() – 从队列中得到消息
队列由BEA TUXEDO管理员创建,定义队列并指定大小。队列全部位于队列空间,队列空间的名字是函数参数之一。每个队列都有一个名字,函数通过名字调用队列。
tpenqueue()参数如下:
² 队列空间名
² 队列名
² 包含置入队列信息的结构
² 置入队列的数据缓冲
² 长度
² 控制标志
tpenqueue(qspace,qname,control_info,buffer,len,flags)
tpdequeue()参数如下:
² 队列空间名
² 队列名
² 包含取队列信息的结构
² 取队列的数据缓冲
² 长度
² 控制标志
tpdequeue(qspace,qname,control_info,buffer,len,flags)
qmadmin是一个命令行工具,TUXEDO /Q的一个部分,使管理员可以建立基于硬盘的的队列;qmadmin用于在设备上建立、初始化、列出和删除队列和队列空间。
$qmadmin
QMCONFIG=/usr/apps/atmapp/QUE
>crdl /usr/apps/atmapp/QUE
Starting Offset :0
Size in disk pages:400
Created decice /usr/apps/atmapp/QUE,offset 0,size 400
>qspacecreate
Queue space name:QSPACE
IPC Key for queue space:62639
Size of queue space in disk space:100
Number of queues in queue space:6
Number of concurrent transaction in queue space:4
Number of concurrent processes in queue space:9
Number of messages in queue space:3
Error queue name:errque
Initialize exter\nts(y,n[default=n]):y
Blocking factor[default=16]:16
>qopen QSPACE
>qcreate
Queue name:DEPOSIT
Queue order(priority,time,fifo,lifo):fifo
Out-of-ordering enqueuing(top,msgid):none
Retries[default=0]:2
Retry dela in seconds[default=0]:30
High limit for queue capacity warning:80%
Reset(low)limit for queue capacity warning:0%
Queue capacity command:
No default queue capacity command
Queue ‘DEPOSIT’ created
>qcreate
Queue name:REPLYQ
Queue order(priority,time,fifo,lifo):fifo
Out-of-ordering enqueuing(top,msgid):none
Retries[default=0]:2
Retry dela in seconds[default=0]:30
High limit for queue capacity warning:80%
Reset(low)limit for queue capacity warning:0%
Queue capacity command:
No default queue capacity command
Queue ‘REPLYQ’ created
说明:
用crdl命令在/usr/apps/atmapp/QUE上建立一个400页的设备
用qspacecreate命令在设备上建立一个100页的队列空间QSPACE,有6个队列。
用qopen命令打开队列空间QSPACE。
用qcreate命令在队列空间QSPACE上建立队列DEPOSIT,FIFO方式,消息可以取出2次。同样建立队列REPLYQ。
每个队列都有与之相关的一些控制信息;其中绝大多数内容由应用设定,但是有些由BEA TUXEDO队列软件直接设定。
队列次序
缺省的,消息进出队列的次序由管理员确定,一般是先入先出(FIFO-first in first out);但有时消息不需要立即处理,这样就需要其他的队列次序。BEA TUXEDO提供了几种方案解决之,有:
² 按优先级
² 按时序
² 先进先出
² 后进先出
管理员甚至可以合并使用上述几种方式决定队列次序。
按时释放
当一个消息进队列时,程序员可以决定其在某时间前不可出队列。这个时间可以是一个定值(如:1月7号下午2点)或相对于入队列的时间。当程序试图出队列时,那些‘还没到时间’的消息是不可见的。例:
control_info->deq_time=3600;/* 3600秒后才可以出队列*/
control_info->flags=TPQTIME_REL;/* 表示相对时间值被设定*/
TPQCTL结构
TPQCTL结构用来控制一个消息如何进/出队列
struct tpqctl{
long flags;
long deq_time;
long priority;
long diagnostic;
char msgid[TMMSGIDLEN];
char corrid[TMCORRIDLEN];
char replyqueue[TMQNAMELEN+1];
char failurequeue[TMQNAMELEN+1];
CLENTID cltid;
long urcode;
long appkey;
};
tpenqueue的flags有如下可能:
TPNOFLAGS,TPQPRIORITY,TPQCORRID,[TPQTOP|TPQBEFOREMSGID],[TPQTIME_ABS|TPQTIME_REL] , TPQREPLYQ,TPQFAILUREQ
tpdequeue的flags有如下可能:
TPNOFLAGS,TPQWAIT,[TPQGETBYMSGID|TPQGETBYCORID]
本节中有定义:
typedef struct tpqctl TPQCTL;
优先级
一条消息可以被设置一个优先级;优先级决定消息相对于队列中其他消息的位置。高优先级的消息较靠近队列头,可以较早出队列。例:
control_info->priority=100; /*设入队列优先级为最高级*/
control_info->flags=TPQPRIORITY; /* 表示优先级的值已经设定*/
返回和错误队列
在将消息入队列前,程序可以定义两个队列:返回队列和错误队列。当消息出队列时,这些队列中存有原消息的处理结果:返回或错误消息。
相关标识
应用可以在消息上加一个‘标记’,当消息被从一个队列移动到另一个队列并被应用的不同部分处理时可以识别它。这些标记被称为‘相关标识’。
超次序
虽然队列被定义了一个固定的出/入次序,但有时应用需要超越这个次序。BEA TUXEDO的队列工具允许管理员配置程序入队列时可以超越次序的队列。
² 可以将一条消息置于队列头
² 可以将一条消息置于特定消息前
错误诊断
如果tperrno被设成TPEDIAGNOSTIC,则TPQCTL结构中的诊断域将被设成一个错误状态,有以下:
² QMEINVAL 调用时使用了非法标志值
² QMEDADMSGID 非法错误消息ID
² QMENOMSG 队列没有可以取出的消息
² QMEINUSE 队列中有消息,但现在不可用
事务
事务是一种保证一系列操作全部成功或全部失败的机制;通常用来保证对数据库的多次操作全部完成。
BEA TUXEDO可以协调本队列修改、其他队列修改和数据库系统操作。当程序调用tpdequeue()将一条消息出队列时,事实上消息仅在程序事务确认时才从队列中除去。如果事务回滚或超时,消息仍在队列中;可以再次操作。系统从而有效地保障该消息仅被处理一次。
TMQUEUE是一个BEA TUXEDO系统提供的当程序调用tpenqueue()和tpenqueue()时将消息入、出队列的服务。tpenqueue()和tpenqueue()的第一个参数是队列空间名;该名字必须由TMQUEUE的一个交易发布过。
管道
TMQFORWARD是BEA TUXEDO提供的将消息通过调用tpenqueue()将消息前转的服务。
*GROUPS
QUE1 LMID=SITE1 GRPNO=2
TMSNAME=TMS_QM TMSCOUNT=2
OPENINFO=”TUXEDO/QM:/apps/atmapp/QUE:QSPACE”
*SERVERS
TMQUEUE SRVGRP=QUE1 SRVID=1
CLOPT=”-s QSPACE:TMQUEUE –“
TMQFORWARD SRVGRP=QUE1 SRVID=5
CLOPT=”-- -I 2 –q DEPOSIT”
说明:
每个队列空间需要单独创建一个组,本例中为QUE1。TMSNAME是事务管理服务的名字。TMCOUNT决定最少启动几个TMS_QM服务。
TMQUEUE是处理tpenqueue()将请求置入队列的/Q服务。TMQFORWARD是将请求出队列并通过tpcall()将请求送相应的交易。-i决定再次读队列前应该延迟的秒数。-q决定前转请求的来源队列。
#include
#include
main()
{
char *reqstr;
long len;
TPQCTL qctl;
/* 用tpinit()连接TUXEDO*/
qctl.flags = TPQREPLYQ;
strcpy(qctl.rplyqueue,”RPLYQ”);
if ( tpenqueue(“QSPACE”,”TOUPPER”,&qctl,reqstr,0,0)== -1)
{
printf(“tpenqueue(TOUPPER):%s\n”,tpstrerror(tperrno));
if(tperror == TPEDIAGNOSTIC)
{
printf(“Queue manager diagnostic %ld\n”,qctl.diagnostic);
}
tpfree(reqstr);
tpterm();
exit (-1);
}
sleep(10);
qctl.flags = TPQWAIT;
if (tpdequeue(“QSACE”, “REPLYQ”,&qctl,&reqstr,&len,TPNOTIME) == -1)
{
printf(“tpdequeue(REPLYQ): %s\n”, tpstrerror(tperrno));
if (tperrno == TPEDIAGNOSTIC)
{
printf(“Queue manager diagnotic %ld\n”, qctl.diagnostic);
}
tpfree(reqstr);
tpterm();
exit(-1);
}
printf(“after: %s\n”,reqstr);
tpfree(reqstr);
tpterm();
return (0);
}