Chinaunix首页 | 论坛 | 博客
  • 博客访问: 232447
  • 博文数量: 46
  • 博客积分: 1400
  • 博客等级: 上尉
  • 技术积分: 620
  • 用 户 组: 普通用户
  • 注册时间: 2009-01-12 18:04
文章分类

全部博文(46)

文章存档

2010年(7)

2009年(39)

我的朋友

分类:

2009-03-18 14:05:39

分析

Qmail-send应该是整个qmail中最复杂的模块了。

3.3.1 进程环境

qmail-queue一样,qmail-send也有自己的进程环境。Qmail-send进程其实是由qmail-start进程托起来的。那qmail-start主要做了什么呢?

Qmail-start 主要将qmail-sendqmail-local, qmail-remote, qmail-clean之间的通信管道搭建起来。

具体通信方式如下图:

                                                                    qmail-send

 

qmail-log                  0        ------------------------        0

 

qmail-lspawn                0        ------------------------        1

                         1        ------------------------        2

                          

qmail-rspawn              0        ------------------------        3

                         1        ------------------------        4

                          

qmail-clean                0        -------------------------       5

                         1        -------------------------       6

3.3.2 接口定义

发送者

接受者

接口定义

说明

Qmail-send

Qmail-log

 

 

Qmail-send

Qmail-lspawn

\0\0\0

 

Qmail-send

Qmail-rspawn

\0\0\0

 

Qmail-send

Qmail-clean

Foop/id

删除 intdmess子目录下id文件

Todo/id

删除 intdtodo子目录下id文件。邮件预处理完成后,进行此操作。

Qmail-lspawn

Qmail-send

1.       单字符  local进程并发数

2.       以下列前两字符开头的字符串

第一个字符:

‘r’ recipient accecpted the message

‘h’recipient permanent reject the msg

‘s’recipient temporary reject the msg

第二字符:

‘K’ host accept the msg

‘Z’host temporary failed to deliver msg

‘D’host permanent failed to deliver msg

 

Qmail-rspawn

Qmail-send

1.       单字符  remote进程并发数

3.       以下列字符开头的字符串

第一个字符:

 ‘K’ host accept the msg

‘Z’host temporary failed to deliver msg

2.       ‘D’host permanent failed to deliver msg

 

Qmail-clean

Qmail-send

单字符

1.       ‘!’ 删除失败

2.       ‘+’ 成功删除

3.       ‘x’ 删除失败

 

 

3.3.3     模块结构

整个qmail-send的执行过程是通过一个select进行IO复用的方式,顺序执行不同fd上的任务。原始代码是没有模块的概念的,只是将不同的功能分布在几个具体的函数中进行操作。

为了更清楚的分析qmail-send的功能,进行以下模块的划分,功能结构图如下:

 

注:因为各模块是以函数的形式实现的,所以不具备传统模块功能,如接口等。

 

各模块并不是单独执行,而是通过io复用进行顺序执行。所以在循环开始前,做了很多设置描述字的操作。

 

整个执行过程如下:

 

3.3.4 子模块分析

3.3.3.1 优先级队列

优先级队列主要提供容器功能,存放待处理的邮件id与放入时间。同时提供三个功能接口:

1.       插入接口

2.       读取离现在时间点最远的邮件条目

3.       删除离现在时间点最远的邮件条目

 

优先级队列中每个元素对应的直接是一个邮件的id标识。

系统中一个有四个优先级队列,分别是:

1.       接受者是本地地址的队列

2.       接受者是远程地址的队列

3.       处理过程中出错的邮件队列

4.       处理完成的邮件队列

 

代码参考:

struct prioq_elt

{

         datetime_sec dt; //邮件待处理时间

         unsigned long id; //邮件id标志

};

 

struct prioq

{

         struct prioq_elt* p;

         unsigned int len;

         unsigned int a;         

}

prioq pqdone            =  { 0 };     

prioq pqchan[CHANNELS]  =  { {0}, {0} }

prioq pqfail              =  { 0 };

 

//功能接口

extern int prioq_insert();

extern int prioq_min();

extern void prioq_delmin();

 

//功能辅助接口

void pqstart()   //启动队列,直接恢复info子目录未处理完的邮件到队列

void pqfinish()  //停止队列,对队列中未处理完成的邮件,修改其访问,修改时间为队列中邮件待处理时间,让下次使用

void pqrun() //更新队列中所有邮件的待处理时间

 

3.3.3.2  io处理子模块 comm(communication)

参考代码:

stralloc comm_buf[CHANNELS]     = { {0}, {0} };

 

void comm_init()

void comm_selprep(int *nfds, fd_set *wfds)

int comm_canwrite(int c)

/*lspawn rspawn管道写入comm_buff信息*/

void comm_do(fd_set *wfds)

/*comm_buff写数据接口*/

void comm_write(int c, int delnum, unsigned long id, char *sender, char* recip)

 

3.3.3.3 邮件发送子模块 del(delivery)

参考代码:

struct del

{

         int used;

         int j;

         unsigned long delid;

         seek_pos mpos;

         stralloc recip;

};

/*根据并发数创建数组*/

struct del *d[CHANNELS];

 

void del_init()

int del_canexit()

int del_avail(int c)

 

/*将发送信息写到comm_buff*/

void del_start(int j, seek_pos mpos, char *recip)

/*获取lspawn rspawn返回的信息并处理*/

void del_do(fd_set *rfds)

void del_dochan(int c)

 

 

3.3.3.4 邮件预处理模块 todo(todo子目录处理)

该模块主要进行的操作就是处理邮件消息的第四个状态:消息排队与处理。

其实应该叫消息预处理与排队,更合适些。

 

Todo模块根据判断lock/trigger fd可读,得知在todo子目录下存在新的邮件文件。

处理流程如下:

1.       条件检查

检查mess下同名邮件主体文件存在。

检查info目录下是否存在同名文件,存在就直接删除。

检查localremote目录下是否存在同名文件,存在就直接删除。

2.       生成infolocalremote目录下同名文件

分析todo下文件内容,将mail from 信息写入info文件。将rcpt to信息写入local或者remote文件。

3.       通知qmail-clean删除对应文件。

此时通知qmail-clean格式:todo\id.

Qmail-clean读到此格式时,会删除todo, intd 两个子目录下的邮件文件。

 

4.       将该邮件id插入到内存队列中。

3.3.3.5 队列处理模块 pass

该模块主要负责处理系统中的四个优先级队列和job任务的映射。

1.       将错误优先级队列重新加入到local或者remote优先级队列

2.       localremote优先级队列映射成job任务

3.       执行job任务

4.       对完成优先级队列中的邮件进行完成清理工作。

 

步骤23源码分析

参考代码

步骤2代码

void pass_dochan(int c)

{

         datetime_sec birth;

         struct prioq_elt pe;

         static stralloc line = {0};

         int match;

 

         if (flagexitasap) return;

 

         if (!pass[c].id)

         {

                   if (!job_avail())

                            return;

 

                   if (!prioq_min(&pqchan[c],&pe))

                            return;

 

                   /*每次取一个待处理时间已过期邮件进行处理*/     

                   if (pe.dt > recent)

                            return;

                  

                   /*fn.s = local/remote /x/id */ 

                   fnmake_chanaddr(pe.id,c);

 

                   prioq_delmin(&pqchan[c]);

 

                   pass[c].mpos = 0;

                   pass[c].fd = open_read(fn.s);

 

                   if (pass[c].fd == -1)

                            goto trouble;

                  

                   /*get sender address and datatime from /info/id by id*/      

                   if (!getinfo(&line,&birth,pe.id)) {

                            logwarn((LGP_SND, "{pass_dochan} Get info of %ld Failed! ", pe.id));

                            close(pass[c].fd);

                            goto trouble;

                   }

 

                   pass[c].id = pe.id;

                   /*初始化pass[c].ss pass[c].fd的读出out对象*/

                   substdio_fdbuf(&pass[c].ss,read,pass[c].fd,pass[c].buf,sizeof(pass[c].buf));

                   pass[c].j = job_open(pe.id,c);

 

                   jo[pass[c].j].retry     = nextretry(birth,c);

                   jo[pass[c].j].flagdying = (recent > birth + lifetime);

 

                   while (!stralloc_copy(&jo[pass[c].j].sender,&line))

                            nomem();

 

                   /*reset sender*/

                   sender.s = 0;

                   if (!getinfo(&sender,&birth,pe.id)) {

                            logerror((LGP_SND, "{pass_dochan} Info of message #%ld is inavailable", pe.id));

                            _exit; /* XXX: print warning */

                   }

                   logdebug((LGP_SND, "{pass_dochan} sender before filter is %s",sender.s));

                  

                   /* owner-@host-@[] -> owner-@host */

                   if (sender.len >= 5) {

                            if (str_equal(sender.s + sender.len - 5,"-@[]")) {

                                     sender.len -= 4;

                                     sender.s[sender.len - 1] = 0;

                            }

                   }

                  

                   if (str_equal(sender.s,"#@[]")) {

                            stralloc_copys(&sender, doublebounceto);

                            sender.len = doublebouncehost.len;

                            sender.s[sender.len] = 0;

                   }

 

         }

 

         if (!del_avail(c)) return;

        

         /*local/remote /x/id format: Txxxx@xx.xx\0

        

           读错误, 恢复原始状态

         */

         if (getln(&pass[c].ss,&line,&match,'\0') == -1) {

                   fnmake_chanaddr(pass[c].id,c);

                   logwarn((LGP_SND, "trouble reading %s ;  will try again later", fn.s));

                   close(pass[c].fd);

                   job_close(pass[c].j);

                   pass[c].id = 0;

                   return;

         }

 

         /*

                   找不到需要发送的发件地址

                   结束任务

         */

         if (!match) {

                   close(pass[c].fd);

                   jo[pass[c].j].flaghiteof = 1;

                   job_close(pass[c].j);

                   pass[c].id = 0;

                   return;

         }

 

         switch(line.s[0]) {

                   case 'T':/*需要投递的地址*/

                            ++jo[pass[c].j].numtodo;

           

/*执行job ,步骤3*/

                            del_start(pass[c].j,pass[c].mpos,line.s + 1);

                            break;

                   case 'D':/*已经投递的地址或不需要投递了*/

                            break;

                   default:

                            fnmake_chanaddr(pass[c].id,c);

                            logwarn((LGP_SND, "unknown record type in %s", fn.s));

                            close(pass[c].fd);

                            job_close(pass[c].j);

                            pass[c].id = 0;

                            return;

         }

 

         pass[c].mpos += line.len;

         return;

 

trouble:

         logwarn((LGP_SND, "trouble opening %s ; will try again later", fn.s));

         pe.dt = recent + SLEEP_SYSFAIL;

 

         while (!prioq_insert(&pqchan[c],&pe))

                   nomem();

}

 

步骤3代码

/*

 * del_start will start up a delivery with given job

 * @param

 *      j, job subsrcipt in the job array list

 *      mpos, the postion to the recip

 *      recip, the reciptor

 */

void del_start(int j, seek_pos mpos, char *recip)

{

         int i;

         int c;

 

         c = jo[j].channel;

 

         if (!flagspawnalive[c]) return;

         if (!comm_canwrite(c)) return;

 

         for (i = 0;i < concurrency[c];++i)

                   if (!d[c][i].used)

                            break;

 

         if (i == concurrency[c]) return;

 

         if (!stralloc_copys(&d[c][i].recip,recip)) {

                   nomem();

                   return;

         }

 

         if (!stralloc_0(&d[c][i].recip)) {

                   nomem();

                   return;

         }

 

         d[c][i].j = j; ++jo[j].refs;

         d[c][i].delid   = masterdelid++;

         d[c][i].mpos    = mpos;

         d[c][i].used    = 1; ++concurrencyused[c];

        

         /*

          

          *      c (in), the channel indicator

          *      delnum (in), the subscript of del item in the del array list on relative channel

          *      id (in) , the message id

          *      sender (in), the envelop sender

          *      recip (in), the envelop recip

          

                 c = 0/1

                 i =

         */

         comm_write(c, i, jo[j].id, jo[j]. sender.s, recip);

 

         loginfo((LGP_SND, "start delivery :%d msg: %d %s %s", d[c][i].delid, jo[j].id, tochan[c], recip));

 

         del_status();

}

 

3.3.3.6 job任务模块

代码参考:

/*

   每个job对应 local/split/id或者remote/split/id 的一个文件

 */

struct job

{

         int refs;               /* if 0, this struct is unused refs (0 – n)*/

         unsigned long id;       /* id of message to deliver */

         int channel;            /* which channel the job on */

         datetime_sec retry;     /* last retry absolute time in second from 1970.1.1 00:00:00 */

         stralloc sender;        /* envelop sender */

         stralloc subject;        /* msg subject */

         int size;              /*msg size*/

         int numtodo;          /*根据邮件发送者数目进行累加*/

         int flaghiteof;         /*所有发送者全部发送完毕 1*/

         int flagdying;         /* job需要结束标签*/

};

 

int     numjobs; /*job数目=local并发数 + remote并发数*/

struct job *jo;/*全局访问点*/

 

/*初始化job数组*/

void job_init()

/*判断是否有可用的空闲job*/

int job_avail()

 

/*按指定邮件idchannel分配一个新的job,返回job在数组中的位置,具体job参数需调用者设置*/

int job_open(unsigned long id, int channel)

 

/*将指定位置job关闭*/

void job_close(int j)

/*

  关闭流程:

1.       Job上的引用数为0时才能关闭

2.       Job上所有接受者全都发送完毕时才能删除local或者remote子目录下的邮件文件

并将job指定的邮件id加入到已完成的优先级队列中

3.       Job未全部发送完,将邮件id加入对应的待处理优先级队列中。
这种情况主要是系统停机的时候,job未处理完,这样job就会通过优先级队列finish接口保存当前的状态,等待下次继续执行。

 

*/

 

 

3.3.5 操作流程

Qmail-send主要执行一个大循环,该循环操作是,通过select扫描fd0-6)和lock/trigger fd,根据各描述符的事件执行相应操作。

阅读(882) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~