分类:
2010-09-11 15:41:08
我们从文本提取的逻辑中走出来,回到主体流程。
在前面的文章中,我们可以看到一次索引创建的操作,可能会产生多个 persistentindex 对象,而这些对象其实代表着一个索引目录。随着创建索引的次数越来越多,那么索引目录也在增多,但是索引目录中的数据却不是很多,所以我们需要把多个目录合并,其实也就是索引的合并。
执行这个操作的类是 IndexMerger ,看其定义为:
Java代码
class IndexMerger extends Thread implements IndexListener
/*由此可见它是一个线程,并且同时充当着listener的角色,看看它的构造方法:
*/
IndexMerger(MultiIndex multiIndex) {
this.multiIndex = multiIndex;
setName("IndexMerger");
setDaemon(true);
try {
mergerIdle.acquire();
} catch (InterruptedException e) {
// will never happen, lock is free upon construction
throw new InternalError("Unable to acquire mutex after construction");
}
}
还是一个 deamon 线程。而且一构造就来了一个 mergerIdle.acquire(); 真是迫不及待啊。啥意思啊?得到一把锁,一把非阻塞的锁。
在创建完 IndexMerger ,那么就有可能把 PersistentIndex 加进来了,因为 Merger 类必须知道哪些 PersistentIndex 是需要 Merger 的,那么我们看看负责这段逻辑的代码:这段代码主要负责 3 个功能,一个是初始化 indexBuckets ,这个一个 ArrayList ,其中放的是需要 Merger 的 PersistentIndex 的列表,也就是我们可以认为 indexBucket 里放的还是 list ,这里有一个非常奇怪的设计,就是在初始化的时候将 PersistentIndex 按照 docnums 的范围分组了,一组就是一个 indexBucket 。
第二个是把需要加入的 PersistentIndex 加入到对应的分组中。 第三个是判断是否需要合并,如果需要就加到一个队列中,等待被合并。 先看第一段代码: Java代码 仔细阅读代码,我们发现,在初始化 indexBuckets 的代码中,其实按照范围来初始化的,比如当添加第一 IndexBucket 的时候 lower=0 , upper=100 即 new IndexBucket(0 100 , rue ) 第二个则为: new IndexBucket(101, 100*10, t rue ) 第三个则为: new IndexBucket(1001, 100*10*10, t rue ) 第四个则为: new IndexBucket(10001, 100*10*10*10, t rue ) 第五个则为: new IndexBucket(100001, 100*10*10*10*10, t rue ) ```````````` 一直持续下去直到 upper 小于 2147483647 ,且是 10 的最大幂。那么就是说 10 亿,当一个目录中有 10 亿个 document 的 index 数据时,这个目录将不再参与 merge 过程, indexBuckets 中总共有 8 个 IndexBucket, 不过在循环外面还有两个创建 IndexBucket 的语句,不过这两个都是不允许参加合并的,所以第 3 个参数是 false ,也就是说一共有 10 个,第九个是: Java代码 那么第十个是: Java代码 搞清楚 indexBuckets 的 初始化之后,我们再来看看第二个步骤,把根据 docNums 把对应的persistentindex 加入到 IndexBucket 中 : Java代码 这段代码的主要功能是把 indexbucket 里的 persistentindex 信息拿出来,而且量超过 2 的话就把他们加入到一个队列中,并将它们从该 indexbucket 里删除。通过这个步骤,那么 mergeTasks 队列中就存在一些需要合并的 index 了。 中场总结: 通过上面的方法和前面的索引提交的文章我们得到一些重要信息:当用户把 ramdirectory 中超过 100 的 docs 的 index data 刷到 fsdirectory 中时,新建一个目录,作为这个新 fsdirectory 的目录,接着把这个 fsdirectory 对应的 PersistentIndex 加到 IndexMerger 类的某个 IndexBucket 中,接着当某个 IndexBucket 中的 PersistentIndex 数量(即这些目录的数量)超过 10 ( mergefactor )的时候,就会执行合并的操作。 那么下面的问题是,合并之后,这 10 个目录将会何去何从,它们是把另外 9 个合并到其中一个中去呢还是怎么滴?接着看吧。 显然,这里又用到生产消费模型,任何调用 indexAdded 方法的都属性生产者,生产者根据一些条件,有选择的把需要合并的 persistentindex 放到 mergeTasks 的队列中,有了生产者肯定存在消费者,文章开头提过, IndexMerger 类是一个 deamon 线程,看看它的 run 方法,那么就发现,其实它就是消费者。它主要完成以下几个功能: 1 判断消费者是否空闲 2 判断队列中是否有退出命令 3 如果空闲则进入 wait 状态 4 根据 persistentindex 的名字取到所有的 persistentindex 的 IndexReader 对象 5 再创建一个新的 PersistentIndex, , 原来的 index 文件合并到这个新的目录中 6 将前面的 IndexReader 对象添加到 PersistentIndex 的 indexwriter 方法中,并执行optimize。 7 关闭这些 readers 8 根据名字删除已经被合并的 PersistentIndex 的索引文件和目录等。 我们再来看看代码,代码中已经加入了 ahuaxuan 的注释: Java代码 synchronized (lock) {
// initially create buckets
if (indexBuckets.size() == 0) {
long lower = 0;
// default minMergeDocs is 100
long upper = minMergeDocs;
//default maxMergeDocs is 2147483647
// IndexBucket实际上就是一个ArrayList
while (upper < maxMergeDocs) {
indexBuckets.add(new IndexBucket(lower, upper, true));
lower = upper + 1;
//default mergeFactor is 10
upper *= mergeFactor;
}
// one with upper = maxMergeDocs
indexBuckets.add(new IndexBucket(lower, maxMergeDocs, false));
// and another one as overflow, just in case...
indexBuckets.add(new IndexBucket(maxMergeDocs + 1, Long.MAX_VALUE, false));
}
············new IndexBucket(1000000001, 2147483647, false)
new IndexBucket(2147483648, 0x7fffffffffffffffL, false)
// put index in bucket
IndexBucket bucket = (IndexBucket) indexBuckets.get(indexBuckets.size() - 1);
for (int i = 0; i < indexBuckets.size(); i++) {
bucket = (IndexBucket) indexBuckets.get(i);
if (bucket.fits(numDocs)) {
break;
}
}
/*如果indexBuckets 没有值,那么就把Index 添加到第10个IndexBucket中,否则就从indexBuckets 的第一IndexBucket开始匹配,根据numDocs的值放到对应的IndexBucket中。*/
bucket.add(new Index(name, numDocs));
if (log.isDebugEnabled()) {
log.debug("index added: name=" + name + ", numDocs=" + numDocs);
}
// if bucket does not allow merge, we don't have to continue
//如果是最后两个IndexBucket,那么即刻退出
if (!bucket.allowsMerge()) {
return;
}
/*这段代码没有什么难的,接着看第3个步骤:
*/ // check if we need a merge
//超过indexbucket中超过10个元素<其实就是10个目录>则开始合并
if (bucket.size() >= mergeFactor) {
long targetMergeDocs = bucket.upper;
targetMergeDocs = Math.min(targetMergeDocs * mergeFactor, maxMergeDocs);
// sum up docs in bucket
List indexesToMerge = new ArrayList();
int mergeDocs = 0;
for (Iterator it = bucket.iterator(); it.hasNext() && mergeDocs <= targetMergeDocs;) {
indexesToMerge.add(it.next());
}
/* 结合上下文,indexesToMerge.size()这值会小于2吗?????*/
if (indexesToMerge.size() > 2) {
// found merge
Index[] idxs = (Index[]) indexesToMerge.toArray(new Index[indexesToMerge.size()]);
bucket.removeAll(indexesToMerge);
if (log.isDebugEnabled()) {
log.debug("requesting merge for " + indexesToMerge);
}
mergeTasks.add(new Merge(idxs));
log.debug("merge queue now contains " + mergeTasks.size() + " tasks.");
}
}public void run() {
for (;;) {
boolean isIdle = false;
//队列长度为0,表示消费者处于空闲状态,那么会进入wait状态
if (mergeTasks.size() == 0) {
mergerIdle.release();
isIdle = true;
}
/*2判断队列中是否有退出命令
*/
Merge task = (Merge) mergeTasks.remove();
if (task == QUIT) {
mergerIdle.release();
break;
}
if (isIdle) {
try {
mergerIdle.acquire();
} catch (InterruptedException e) {
Thread.interrupted();
log.warn("Unable to acquire mergerIdle sync");
}
}
log.debug("accepted merge request");
// reset deleted documents
deletedDocuments.clear();
// get readers
/*4 根据persistentindex的名字取到所有的persistentindex
的IndexReader对象
*/
String[] names = new String[task.indexes.length];
for (int i = 0; i < task.indexes.length; i++) {
names[i] = task.indexes[i].name;
}
try {
log.debug("create new index");
/*再创建一个新的PersistentIndex,原来的index文件合并到这个新的目录中
*/
PersistentIndex index = multiIndex.getOrCreateIndex(null);
boolean success = false;
try {
log.debug("get index readers from MultiIndex");
IndexReader[] readers = multiIndex.getIndexReaders(names, this);
try {
// do the merge
long time = System.currentTimeMillis();
/*6 将前面的IndexReader对象添加到PersistentIndex的indexwriter方法中,并执行optimize。
*/
index.addIndexes(readers);
time = System.currentTimeMillis() - time;
int docCount = 0;
for (int i = 0; i < readers.length; i++) {
docCount += readers[i].numDocs();
}
log.info("merged " + docCount + " documents in " + time + " ms into " + index.getName() + ".");
} finally {
for (int i = 0; i < readers.length; i++) {
/*7 关闭这些readers
*/
try {
readers[i].close();
} catch (IOException e) {
log.warn("Unable to close IndexReader: " + e);
}
}
}
// inform multi index
// if we cannot get the sync immediately we have to quit
if (!indexReplacement.attempt(0)) {
log.debug("index merging canceled");
break;
}
try {
log.debug("replace indexes");
multiIndex.replaceIndexes(names, index, deletedDocuments);
} finally {
indexReplacement.release();
}
success = true;
} finally {
if (!success) {
// delete index
log.debug("deleting index " + index.getName());
/*8 根据名字删除已经被合并的PersistentIndex的索引文件和目录等。
*/
multiIndex.deleteIndex(index);
}
}
} catch (Throwable e) {
log.error("Error while merging indexes: " + e);
}
}
log.info("IndexMerger terminated");
}