分类:
2009-03-18 14:05:39
Qmail-send应该是整个qmail中最复杂的模块了。
同qmail-queue一样,qmail-send也有自己的进程环境。Qmail-send进程其实是由qmail-start进程托起来的。那qmail-start主要做了什么呢?
Qmail-start 主要将qmail-send与qmail-local, qmail-remote, qmail-clean之间的通信管道搭建起来。
具体通信方式如下图:
qmail-send
qmail-log 0 ------------------------ 0
qmail-lspawn 0 ------------------------ 1
1 ------------------------ 2
qmail-rspawn 0 ------------------------ 3
1 ------------------------ 4
qmail-clean 0 ------------------------- 5
1 ------------------------- 6
发送者 |
接受者 |
接口定义 |
说明 |
Qmail-send |
Qmail-log |
|
|
Qmail-send |
Qmail-lspawn |
|
|
Qmail-send |
Qmail-rspawn |
|
|
Qmail-send |
Qmail-clean |
Foop/id |
删除 intd,mess子目录下id文件 |
Todo/id |
删除 intd,todo子目录下id文件。邮件预处理完成后,进行此操作。 |
||
Qmail-lspawn |
Qmail-send |
1. 单字符 local进程并发数 2. 以下列前两字符开头的字符串 第一个字符: ‘r’ recipient accecpted the message ‘h’recipient permanent reject the msg ‘s’recipient temporary reject the msg 第二字符: ‘K’ host accept the msg ‘Z’host temporary failed to deliver msg ‘D’host permanent failed to deliver msg |
|
Qmail-rspawn |
Qmail-send |
1. 单字符 remote进程并发数 3. 以下列字符开头的字符串 第一个字符: ‘K’ host accept the msg ‘Z’host temporary failed to deliver msg 2. ‘D’host permanent failed to deliver msg |
|
Qmail-clean |
Qmail-send |
单字符 1. ‘!’ 删除失败 2. ‘+’ 成功删除 3. ‘x’ 删除失败 |
|
整个qmail-send的执行过程是通过一个select进行IO复用的方式,顺序执行不同fd上的任务。原始代码是没有模块的概念的,只是将不同的功能分布在几个具体的函数中进行操作。
为了更清楚的分析qmail-send的功能,进行以下模块的划分,功能结构图如下:
注:因为各模块是以函数的形式实现的,所以不具备传统模块功能,如接口等。
各模块并不是单独执行,而是通过io复用进行顺序执行。所以在循环开始前,做了很多设置描述字的操作。
整个执行过程如下:
优先级队列主要提供容器功能,存放待处理的邮件id与放入时间。同时提供三个功能接口:
1. 插入接口
2. 读取离现在时间点最远的邮件条目
3. 删除离现在时间点最远的邮件条目
优先级队列中每个元素对应的直接是一个邮件的id标识。
系统中一个有四个优先级队列,分别是:
1. 接受者是本地地址的队列
2. 接受者是远程地址的队列
3. 处理过程中出错的邮件队列
4. 处理完成的邮件队列
代码参考:
struct prioq_elt { datetime_sec dt; //邮件待处理时间 unsigned long id; //邮件id标志 }; struct prioq { struct prioq_elt* p; unsigned int len; unsigned int a; } prioq pqdone = { 0 }; prioq pqchan[CHANNELS] = { {0}, {0} }; prioq pqfail = { 0 }; //功能接口 extern int prioq_insert(); extern int prioq_min(); extern void prioq_delmin(); //功能辅助接口 void pqstart() //启动队列,直接恢复info子目录未处理完的邮件到队列 void pqfinish() //停止队列,对队列中未处理完成的邮件,修改其访问,修改时间为队列中邮件待处理时间,让下次使用 void pqrun() //更新队列中所有邮件的待处理时间 |
参考代码:
stralloc comm_buf[CHANNELS] = { {0}, {0} }; void comm_init() void comm_selprep(int *nfds, fd_set *wfds) int comm_canwrite(int c) /*向lspawn ,rspawn管道写入comm_buff信息*/ void comm_do(fd_set *wfds) /*向comm_buff写数据接口*/ void comm_write(int c, int delnum, unsigned long id, char *sender, char* recip) |
参考代码:
struct del { int used; int j; unsigned long delid; seek_pos mpos; stralloc recip; }; /*根据并发数创建数组*/ struct del *d[CHANNELS]; void del_init() int del_canexit() int del_avail(int c) /*将发送信息写到comm_buff上*/ void del_start(int j, seek_pos mpos, char *recip) /*获取lspawn ,rspawn返回的信息并处理*/ void del_do(fd_set *rfds) void del_dochan(int c) |
该模块主要进行的操作就是处理邮件消息的第四个状态:消息排队与处理。
其实应该叫消息预处理与排队,更合适些。
Todo模块根据判断lock/trigger fd可读,得知在todo子目录下存在新的邮件文件。
处理流程如下:
1. 条件检查
检查mess下同名邮件主体文件存在。
检查info目录下是否存在同名文件,存在就直接删除。
检查local,remote目录下是否存在同名文件,存在就直接删除。
2. 生成info,local,remote目录下同名文件
分析todo下文件内容,将mail from 信息写入info文件。将rcpt to信息写入local或者remote文件。
3. 通知qmail-clean删除对应文件。
此时通知qmail-clean格式:todo\id.
Qmail-clean读到此格式时,会删除todo, intd 两个子目录下的邮件文件。
4. 将该邮件id插入到内存队列中。
该模块主要负责处理系统中的四个优先级队列和job任务的映射。
1. 将错误优先级队列重新加入到local或者remote优先级队列
2. 将local,remote优先级队列映射成job任务
3. 执行job任务
4. 对完成优先级队列中的邮件进行完成清理工作。
步骤2,3源码分析
参考代码
步骤2代码
void pass_dochan(int c) { datetime_sec birth; struct prioq_elt pe; static stralloc line = {0}; int match; if (flagexitasap) return; if (!pass[c].id) { if (!job_avail()) return; if (!prioq_min(&pqchan[c],&pe)) return; /*每次取一个待处理时间已过期邮件进行处理*/ if (pe.dt > recent) return;
/*fn.s = local/remote /x/id */ fnmake_chanaddr(pe.id,c); prioq_delmin(&pqchan[c]); pass[c].mpos = 0; pass[c].fd = open_read(fn.s); if (pass[c].fd == -1) goto trouble;
/*get sender address and datatime from /info/id by id*/ if (!getinfo(&line,&birth,pe.id)) { logwarn((LGP_SND, "{pass_dochan} Get info of %ld Failed! ", pe.id)); close(pass[c].fd); goto trouble; } pass[c].id = pe.id; /*初始化pass[c].ss 为pass[c].fd的读出out对象*/ substdio_fdbuf(&pass[c].ss,read,pass[c].fd,pass[c].buf,sizeof(pass[c].buf)); pass[c].j = job_open(pe.id,c); jo[pass[c].j].retry = nextretry(birth,c); jo[pass[c].j].flagdying = (recent > birth + lifetime); while (!stralloc_copy(&jo[pass[c].j].sender,&line)) nomem(); /*reset sender*/ sender.s = 0; if (!getinfo(&sender,&birth,pe.id)) { logerror((LGP_SND, "{pass_dochan} Info of message #%ld is inavailable", pe.id)); _exit; /* XXX: print warning */ } logdebug((LGP_SND, "{pass_dochan} sender before filter is %s",sender.s));
/* owner-@host-@[] -> owner-@host */ if (sender.len >= 5) { if (str_equal(sender.s + sender.len - 5,"-@[]")) { sender.len -= 4; sender.s[sender.len - 1] = 0; } }
if (str_equal(sender.s,"#@[]")) { stralloc_copys(&sender, doublebounceto); sender.len = doublebouncehost.len; sender.s[sender.len] = 0; } } if (!del_avail(c)) return;
/*local/remote /x/id format: Txxxx@xx.xx\0
读错误, 恢复原始状态 */ if (getln(&pass[c].ss,&line,&match,'\0') == -1) { fnmake_chanaddr(pass[c].id,c); logwarn((LGP_SND, "trouble reading %s ; will try again later", fn.s)); close(pass[c].fd); job_close(pass[c].j); pass[c].id = 0; return; } /* 找不到需要发送的发件地址 结束任务 */ if (!match) { close(pass[c].fd); jo[pass[c].j].flaghiteof = 1; job_close(pass[c].j); pass[c].id = 0; return; } switch(line.s[0]) { case 'T':/*需要投递的地址*/ ++jo[pass[c].j].numtodo;
/*执行job ,步骤3*/ del_start(pass[c].j,pass[c].mpos,line.s + 1); break; case 'D':/*已经投递的地址或不需要投递了*/ break; default: fnmake_chanaddr(pass[c].id,c); logwarn((LGP_SND, "unknown record type in %s", fn.s)); close(pass[c].fd); job_close(pass[c].j); pass[c].id = 0; return; } pass[c].mpos += line.len; return; trouble: logwarn((LGP_SND, "trouble opening %s ; will try again later", fn.s)); pe.dt = recent + SLEEP_SYSFAIL; while (!prioq_insert(&pqchan[c],&pe)) nomem(); } |
步骤3代码
/* * del_start will start up a delivery with given job * @param * j, job subsrcipt in the job array list * mpos, the postion to the recip * recip, the reciptor */ void del_start(int j, seek_pos mpos, char *recip) { int i; int c; c = jo[j].channel; if (!flagspawnalive[c]) return; if (!comm_canwrite(c)) return; for (i = 0;i < concurrency[c];++i) if (!d[c][i].used) break; if (i == concurrency[c]) return; if (!stralloc_copys(&d[c][i].recip,recip)) { nomem(); return; } if (!stralloc_0(&d[c][i].recip)) { nomem(); return; } d[c][i].j = j; ++jo[j].refs; d[c][i].delid = masterdelid++; d[c][i].mpos = mpos; d[c][i].used = 1; ++concurrencyused[c];
/*
* c (in), the channel indicator * delnum (in), the subscript of del item in the del array list on relative channel * id (in) , the message id * sender (in), the envelop sender * recip (in), the envelop recip
c = 0/1 i = */ comm_write(c, i, jo[j].id, jo[j]. sender.s, recip); loginfo((LGP_SND, "start delivery :%d msg: %d %s %s", d[c][i].delid, jo[j].id, tochan[c], recip)); del_status(); } |
代码参考:
/* 每个job对应 local/split/id或者remote/split/id 的一个文件 */ struct job { int refs; /* if 0, this struct is unused , refs (0 – n)*/ unsigned long id; /* id of message to deliver */ int channel; /* which channel the job on */ datetime_sec retry; /* last retry absolute time in second from 1970.1.1 00:00:00 */ stralloc sender; /* envelop sender */ stralloc subject; /* msg subject */ int size; /*msg size*/ int numtodo; /*根据邮件发送者数目进行累加*/ int flaghiteof; /*所有发送者全部发送完毕 1*/ int flagdying; /* job需要结束标签*/ }; int numjobs; /*job数目=local并发数 + remote并发数*/ struct job *jo;/*全局访问点*/ /*初始化job数组*/ void job_init() /*判断是否有可用的空闲job*/ int job_avail() /*按指定邮件id,channel分配一个新的job,返回job在数组中的位置,具体job参数需调用者设置*/ int job_open(unsigned long id, int channel) /*将指定位置job关闭*/ void job_close(int j) /* 关闭流程: 1. Job上的引用数为0时才能关闭 2. Job上所有接受者全都发送完毕时才能删除local或者remote子目录下的邮件文件 并将job指定的邮件id加入到已完成的优先级队列中 3. Job未全部发送完,将邮件id加入对应的待处理优先级队列中。 */ |
Qmail-send主要执行一个大循环,该循环操作是,通过select扫描fd(0-6)和lock/trigger fd,根据各描述符的事件执行相应操作。