分类:
2008-06-22 13:18:55
复合索引文件格式(.cfs)是如何产生的?从这个问题出发,研究索引文件是如何合并的,这都是IndexWriter类中定义的一些重要的方法。
在建立索引过程中,生成的索引文件的格式有很多种。
在文章 Lucene-2.2.0 源代码阅读学习(4) 中测试的那个例子,没有对IndexWriter进行任何的客户化设置,完全使用Lucene 2.2.0默认的设置(以及,对Field的设置使用了Lucene自带的Demo中的设置)。
运行程序以后,在本地磁盘的索引目录中生成了一些.扩展名为.cfs的索引文件,即复合索引格式文件。如图(该图在文章 Lucene-2.2.0 源代码阅读学习(4) 中介绍过)所示:
从上面生成的那些.cfs复合索引文件可以看出,Lucene 2.2.0版本,IndexWriter索引的一个成员useCompoundFile的设置起了作用,可以在IndexWriter类的内部看到定义和默认设置:
private boolean useCompoundFile = true;
即,默认使用复合索引文件格式来存储索引文件。
在IndexWriter类的addDocument(Document doc, Analyzer analyzer)方法中可以看到,最后调用了 maybeFlushRamSegments()方法,这个方法的作用可是很大的,看它的定义:
protected final void maybeFlushRamSegments() throws CorruptIndexException, IOException {
if (ramSegmentInfos.size() >= minMergeDocs || numBufferedDeleteTerms >= maxBufferedDeleteTerms) {
flushRamSegments();
}
}
这里,minMergeDocs是指:决定了合并索引段文件时指定的最小的Document的数量,在IndexWriter类中默认值为10,可以在IndexWriter类中查看到:
private int minMergeDocs = DEFAULT_MAX_BUFFERED_DOCS;
public final static int DEFAULT_MAX_BUFFERED_DOCS = 10;
其中SegmentInfos ramSegmentInfos中保存了Document的数量的信息,如果Document的数量小于10,则调用flushRamSegments()方法进行处理,flushRamSegments()方法的定义如下所示:
private final synchronized void flushRamSegments() throws CorruptIndexException, IOException {
flushRamSegments(true);
}
在flushRamSegments()方法中又调用到了该方法的一个重载的方法,带一个boolean型参数。该重载的方法定义如下:
protected final synchronized void flushRamSegments(boolean triggerMerge)
throws CorruptIndexException, IOException {
if (ramSegmentInfos.size() > 0 || bufferedDeleteTerms.size() > 0) {
mergeSegments(ramSegmentInfos, 0, ramSegmentInfos.size());
if (triggerMerge) maybeMergeSegments(minMergeDocs);
}
}
同样,如果Document的数量小于10,则调用mergeSegments()方法,先看一下该方法的参数:
private final int mergeSegments(SegmentInfos sourceSegments, int minSegment, int end)
第一个参数指定了一个SegmentInfos(上面调用传递了ramSegmentInfos) ;第二个参数是minSegment是最小的索引段数量(上面调用传递了0,说明如果存在>=0个索引段文件时就开始合并索引文件);第三个参数是end,指要合并索引段文件的个数(上面调用传递了ramSegmentInfos.size(),即对所有的索引段文件都执行合并操作)。
继续看mergeSegments()方法的实现:
private final int mergeSegments(SegmentInfos sourceSegments, int minSegment, int end)
throws CorruptIndexException, IOException {
// doMerge决定了是否执行合并操作,根据end的值,如果end为0说明要合并的索引段文件为0个,即不需要合并,doMerge=false
boolean doMerge = end > 0;
/* 生成合并的索引段文件名称,即根据SegmentInfos的counter值,如果counter=0,则返回的文件名为_0(没有指定扩展名)
final synchronized String newSegmentName() {
return "_" + Integer.toString(segmentInfos.counter++, Character.MAX_RADIX);
}
*/
final String mergedName = newSegmentName();
SegmentMerger merger = null; // 声明一个SegmentMerger变量
final List ramSegmentsToDelete = new ArrayList(); // ramSegmentsToDelete列表用于存放可能要在合并结束后删除的索引段文件,因为合并的过程中需要删除掉合并完以后存在于内存中的这些索引段文件
SegmentInfo newSegment = null;
int mergedDocCount = 0;
boolean anyDeletes = (bufferedDeleteTerms.size() != 0);
// This is try/finally to make sure merger's readers are closed:
try {
if (doMerge) { // 如果doMerge=true,即end>0,也就是说至少有1个以上的索引段文件存在,才能谈得上合并
if (infoStream != null) infoStream.print("merging segments"); // infoStream是一个PrintStream输出流对象,合并完成后要向索引目录中写入合并后的索引段文件,必须有一个打开的输出流
merger = new SegmentMerger(this, mergedName); // 构造一个SegmentMerger对象,通过参数:当前的打开的索引器this和合并后的索引段名称mergedName(形如_N,其中N为数)关于SegmentMerger类会在后面文章学习
for (int i = minSegment; i < end; i++) { // 循环遍历,从SegmentInfos sourceSegments中迭代出每个SegmentInfo对象
SegmentInfo si = sourceSegments.info(i);
if (infoStream != null)
infoStream.print(" " + si.name + " (" + si.docCount + " docs)"); // SegmentInfo si的name在索引目录中是唯一的;这里打印出每个 SegmentInfo si的名称和在这个索引段文件中Document的数量
IndexReader reader = SegmentReader.get(si, MERGE_READ_BUFFER_SIZE); // 调用SegmentReader类的静态方法get(),根据每个SegmentInfo si获取一个索引输入流对象;在IndexWriter类中定义了成员MERGE_READ_BUFFER_SIZE=4096
merger.add(reader); // 将获取到的SegmentReader reader加入到SegmentMerger merger中
if (reader.directory() == this.ramDirectory) { // 如果SegmentReader reader是当前的索引目录,与当前的RAMDirectory ramDirectory是同一个索引目录
ramSegmentsToDelete.add(si); // 将该SegmentInfo si加入到待删除的列表ramSegmentsToDelete中
}
}
}
SegmentInfos rollback = null;
boolean success = false;
// This is try/finally to rollback our internal state
// if we hit exception when doing the merge:
try {
if (doMerge) { // 如果doMerge=true
mergedDocCount = merger.merge(); // 通过SegmentMerger merger获取需要合并的索引段文件数量
if (infoStream != null) { // 打印出合并后的索引段文件的名称,及其合并了索引段文件的数量
infoStream.println(" into "+mergedName+" ("+mergedDocCount+" docs)");
}
newSegment = new SegmentInfo(mergedName, mergedDocCount,
directory, false, true); // 实例化一个SegmentInfo对象
}
if (sourceSegments != ramSegmentInfos || anyDeletes) {
// 通过克隆,存储一个用来回滚用的SegmentInfos实例,以防合并过程中发生异常
rollback = (SegmentInfos) segmentInfos.clone();
}
if (doMerge) { // 如果doMerge=true
if (sourceSegments == ramSegmentInfos) { // 如果传进来的sourceSegments和内存中的ramSegmentInfos是同一个
segmentInfos.addElement(newSegment); // 将合并后的新的SegmentInfo newSegment加入到segmentInfos中进行管理,以便之后再对其操作
} else { // 如果传进来的sourceSegments和内存中的ramSegmentInfos不是同一个
for (int i = end-1; i > minSegment; i--) // 删除旧的信息,同时添加新的信息
sourceSegments.remove(i);
segmentInfos.set(minSegment, newSegment);
}
}
if (sourceSegments == ramSegmentInfos) { // 如果传进来的sourceSegments和内存中的ramSegmentInfos是同一个,因为参数设置的原因,可能需要删除合并以后原来旧的索引段文件
maybeApplyDeletes(doMerge); // 调用 maybeApplyDeletes()方法执行合并后的删除处理
doAfterFlush();
}
checkpoint(); // 调用该方法 checkpoint()检查,确认并提交更新
success = true; // 如果检查没有发现异常,则置success=true
} finally {
if (success) { // 如果success=true,表示提交成功,要清理内存
if (sourceSegments == ramSegmentInfos) {
ramSegmentInfos.removeAllElements();
}
} else { // 如果发生异常,则需要回滚操作
if (sourceSegments == ramSegmentInfos && !anyDeletes) {
if (newSegment != null &&
segmentInfos.size() > 0 &&
segmentInfos.info(segmentInfos.size()-1) == newSegment) {
segmentInfos.remove(segmentInfos.size()-1);
}
} else if (rollback != null) {
segmentInfos.clear();
segmentInfos.addAll(rollback);
}
// Delete any partially created and now unreferenced files:
deleter.refresh();
}
}
} finally {
// 关闭所有的输入流(readers),尝试删除过时的废弃文件
if (doMerge) merger.closeReaders();
}
// 删除RAM中的索引段文件
deleter.deleteDirect(ramDirectory, ramSegmentsToDelete);
// 一个检查点,允许一个IndexFileDeleter deleter有机会在该时间点上去删除文件
deleter.checkpoint(segmentInfos, autoCommit);
if (useCompoundFile && doMerge) { // 如果IndexWriter索引器设置了useCompoundFile=true
boolean success = false;
try {
merger.createCompoundFile(mergedName + ".cfs"); // 创建复合索引文件(.cfs),即_N.cfs文件
newSegment.setUseCompoundFile(true); // 设置SegmentInfo newSegment为复合索引文件的信息
checkpoint(); // 调用该方法 checkpoint()检查,确认并提交更新
success = true;
} finally { // 如果检查过程中发生异常,则回滚
if (!success) {
newSegment.setUseCompoundFile(false);
deleter.refresh();
}
}
// 一个检查点,允许一个IndexFileDeleter deleter有机会在该时间点上去删除文件
deleter.checkpoint(segmentInfos, autoCommit);
}
return mergedDocCount; // 返回需合并的索引段文件数量
}
在不带参数的flushRamSegments()方法中,调用了带参数的flushRamSegments(boolean triggerMerge),也就是说,默认情况下,Lucene指定triggerMerge=true,可以在不带参数的flushRamSegments()方法中看到对该参数的设置:
private final synchronized void flushRamSegments() throws CorruptIndexException, IOException {
flushRamSegments(true);
}
所以,在带参数的flushRamSegments(boolean triggerMerge)方法中,一定会执行maybeMergeSegments()这个合并索引的方法,如下所示:
if (triggerMerge) maybeMergeSegments(minMergeDocs);
这里,传递的参数minMergeDocs=10(Lucene默认值),那么就应该有一个maxMergeDocs的成员与之对应,在Lucene 2.2.0版本中,在IndexWriter类中定义了该maxMergeDocs成员的默认值:
private int maxMergeDocs = DEFAULT_MAX_MERGE_DOCS;
public final static int DEFAULT_MAX_MERGE_DOCS = Integer.MAX_VALUE;
public static final int MAX_VALUE = 0x7fffffff;
maxMergeDocs是合并的最大的Document的数量,定义为最大的Integer。
因为一个索引目录中的索引段文件的数量可能大于minMergeDocs=10,如果也要对所有的索引段文件进行合并,则指定合并最小数量minMergeDocs的Docment是不能满足要求的,即使用mergeSegments()方法。
因此,maybeMergeSegments()就能实现合并性能的改善,它的声明就是需要一个起始的参数,从而进行增量地合并索引段文件。该方法的实现如下所示:
/** Incremental segment merger. */
private final void maybeMergeSegments(int startUpperBound) throws CorruptIndexException, IOException {
long lowerBound = -1;
long upperBound = startUpperBound; // 使用upperBound存放传递进来的startUpperBound
while (upperBound < maxMergeDocs) { // 如果upperBound < maxMergeDocs,一般来说,这个应该总成立的
int minSegment = segmentInfos.size(); // 设置minSegment的值为当前的SegmentInfos segmentInfos 的大小
int maxSegment = -1;
// 查找能够合并的索引段文件
while (--minSegment >= 0) { // 就是遍历SegmentInfos segmentInfos中的每个SegmentInfo si
SegmentInfo si = segmentInfos.info(minSegment); // 从索引位置号最大的开始往外取
if (maxSegment == -1 && si.docCount > lowerBound && si.docCount <= upperBound) { // maxSegment == -1;同时满足-1=lowerBound <(一个索引段文件中Dcoment的数量si.docCount)<=upperBound = startUpperBound
// start from the rightmost* segment whose doc count is in bounds
maxSegment = minSegment; // 设置maxSegment的值为当前SegmentInfos的大小
} else if (si.docCount > upperBound) {
// 直到segment中Document的数量超过了上限upperBound,则退出循环
break;
}
}
// 该while循环只执行了一次,执行过程中,将maxSegment赋值为segmentInfos.size()-1
minSegment++; // 上面循环中一直执行--minSegment,则到这里minSegment=-1,设置其值为0
maxSegment++; // 因为maxSegment=segmentInfos.size()-1,则设置为maxSegment=segmentInfos.size()
int numSegments = maxSegment - minSegment; // numSegments = maxSegment - minSegment = segmentInfos.size()
if (numSegments < mergeFactor) {
/* mergeFactor是合并因子,IndexWriter的成员,默认设置为10,mergeFactor的值越大,则内存中驻留的Document就越多,向索引目录中写入segment的次数就越少,虽然占用内存较多,但是速度应该很快的。每向索引文件中加入mergeFactor=10个Document的时候,就会在索引目录中生成一个索引段文件(segment) */
break; // numSegments < mergeFactor则没有达到合并所需要的数量,不需要合并,直接退出
} else {
boolean exceedsUpperLimit = false; // 设置一个没有超过上限的boolean型标志(false)
// 能够合并的segments的数量>=mergeFactor时
while (numSegments >= mergeFactor) {
// 调用mergeSegments(即上面的学习到的那个合并的方法)方法,合并从minSegment开始的mergeFactor个segment
int docCount = mergeSegments(segmentInfos, minSegment, minSegment + mergeFactor);
numSegments -= mergeFactor; // mergeFactor个segment已经合并完成,剩下需要合并的数量要减去mergeFactor,在下一次循环的时候继续合并
if (docCount > upperBound) { // 如果上次合并返回的合并后的Document的数量大于上限
// 继续在该层次合并剩余的segment
minSegment++;
exceedsUpperLimit = true; // 设置已经超过上限,不能再进行深一层次的的合并,即本轮合并就是最深层次的合并了
} else { // 如果上次合并返回的合并后的Document的数量没有超过上限
// 考虑进行更深层次的合并
numSegments++;
}
}
if (!exceedsUpperLimit) {// 如果上次合并返回的合并后的Document的数量大于上限,则终止执行本层次合并
break;
}
}
lowerBound = upperBound;
upperBound *= mergeFactor; // 如果一个层次的合并成功后,还可以进一步合并,则,上限变为原来的10倍
}
}
合并索引段文件就是这样实现的,并非只是在一个层次上合并:
第一层次合并时,每次只能将10个segment索引段文件合并为1个新的segment,假设在这一层生成了500个经过合并以后生成的索引段文件;
第二层次合并时,每次能合并10*mergeFactor=10*10=100个segment,经判断,上一层次生成了500个segment还可以进行第二层次的合并,现在每次100个segment文件才可能合并为1个,可见,只能合并生成5个新的segment;
第三层次合并时,每次能合并10*mergeFactor*mergeFactor=10*10*10=1000个segment,但是上一层次只是生成了5个,不够数量(1000个),不能继续合并了,到此终止。
就是上面的那种原理,实现索引段文件的合并。如果希望进行更深层次的合并,把mergeFactor的值设置的非常小就可以了,但是I/O操作过于频繁,速度会很慢很慢的。
提高合并的速度,是以内存空间开销为代价的。
通过第一个合并的方法可以看出,只有当为一个IndexWriter索引器设置了useCompoundFile=true的时候,才能生成复合索引文件_N.cfs,如下所示:
if (useCompoundFile && doMerge) { // 如果IndexWriter索引器设置了useCompoundFile=true
boolean success = false;
try {
merger.createCompoundFile(mergedName + ".cfs"); // 创建复合索引文件(.cfs),即_N.cfs文件
newSegment.setUseCompoundFile(true); // 设置SegmentInfo newSegment为复合索引文件的信息
checkpoint(); // 调用该方法 checkpoint()检查,确认并提交更新
success = true;
} finally { // 如果检查过程中发生异常,则回滚
if (!success) {
newSegment.setUseCompoundFile(false);
deleter.refresh();
}
}
// 一个检查点,允许一个IndexFileDeleter deleter有机会在该时间点上去删除文件
deleter.checkpoint(segmentInfos, autoCommit);
}
现在知道了,那些_N.cfs文件是合并的索引段文件。