分类:
2008-06-21 16:07:59
在接触到索引删除的策略IndexDeletionPolicy 的时候,提到一个提交点(IndexCommitPoint)的概念。在合适的时机,根据策略需求,需要对这些提交点(IndexCommitPoint)执行删除操作。
这些个提交点(IndexCommitPoint)究竟具有怎样的特征呢?
IndexCommitPoint是一个索引提交点的接口类,定义非常简单,如下所示:
package org.apache.lucene.index;
public interface IndexCommitPoint {
/**
* 获取与指定的索引提交点相关的索引段文件(这些索引段文件的名称形如segments_N)
* 例如,我们在测试实例化一个IndexWriter索引器的时候,在创建索引的过程中就生成了索引段文件
* 参考文章 Lucene-2.2.0 源代码阅读学习(11) ,可以看到生成的索引段文件为segments_1,大小为1K
*/
public String getSegmentsFileName();
// 删除指定的索引提交点相关的索引段文件
public void delete();
}
实现IndexCommitPoint接口的类为CommitPoint类。CommitPoint类是一个最终类,而且它是作为一个内部类来定义的,那么它的外部类为IndexFileDeleter类。由此可以看出,一些索引提交点(IndexCommitPoint)的存在,是依赖于IndexFileDeleter类的,只有选择了某种索引文件删除策略,才能够构造一个IndexFileDeleter类的实例。倘若初始化了一个IndexFileDeleter类的实例,没有索引删除策略,则这个IndexFileDeleter类的实例根本就没有应用的价值,更不必谈什么索引提交点(IndexCommitPoint)了。
在IndexWriter索引器类中,定义了一个内部成员:
private IndexFileDeleter deleter;
也就是说,一个索引器的实例化必然要初始化一个IndexFileDeleter类的实例,然后在索引器初始化的时候,初始化索引器主要是调用IndexWriter的init方法,而IndexWriter类只定义了两个重载的init方法,他们的声明如下:
private void init(Directory d, Analyzer a, boolean closeDir, IndexDeletionPolicy deletionPolicy, boolean autoCommit)
throws CorruptIndexException, LockObtainFailedException, IOException ;
private void init(Directory d, Analyzer a, final boolean create, boolean closeDir, IndexDeletionPolicy deletionPolicy, boolean autoCommit)
throws CorruptIndexException, LockObtainFailedException, IOException;
这里面,最重要的是第二个init方法,该方法才真正地实现了一些索引器的初始化工作,而第一个init方法只是在通过调用IndexReader类的静态方法:
public static boolean indexExists(Directory directory) throws IOException
来判断指定的索引目录中是否存在索引文件,从而间接地调用第二个init方法来初始化一个IndexWriter索引器。
然后,IndexWriter索引器类不同的构造方法根据构造需要,调用上面的两个init方法实现初始化工作。
在上面的第二个init方法中,根据指定的索引文件删除策略,实例化一个IndexFileDeleter:
deleter = new IndexFileDeleter(directory, deletionPolicy == null ? new KeepOnlyLastCommitDeletionPolicy() : deletionPolicy,segmentInfos, infoStream);
其中infoStream是PrintStream类的一个实例,而PrintStream类继承自FilterOutputStream类,即PrintStream是一个文件输出流类。
这里,如果deletionPolicy=null,即构造一个索引器没有指定删除策略,则自动指派其删除策略为KeepOnlyLastCommitDeletionPolicy,否则使用指定的删除策略deletionPolicy。
一个IndexWriter索引器与IndexFileDeleter索引文件删除工具相关,有必要关注一下IndexFileDeleter类的定义,先把它的一个重要的内部类CommitPoint类放在后面学习:
package org.apache.lucene.index;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.Directory;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
// 该类对建立索引过程中指定的Directory目录中的索引文件的删除操作进行管理
// 注意:在IndexFileDeleter实例化之前,必须持有write.lock锁
final class IndexFileDeleter {
// 在删除索引文件过程中可能会由于一些I/O等异常删除失败,将删除失败的文件放到deletable列表中,以期待再次尝试删除它们
private List deletable;
// 存储了与一个索引段文件相关的源数据中的文件的个数,即通过这个索引可以检索到的文件的数目,这里refCount的Key是索引文件的名称,Value就是该索引文件被引用的次数
private Map refCounts = new HashMap();
// 当前索引目录下的索引文件列表
private List commits = new ArrayList();
// 在某个检查点(checkpoint)处可能存在修改了引用计数,但是没有生成提交点,要暂时把这些索引文件存放到lastFiles列表中
private List lastFiles = new ArrayList();
// 提交删除指定索引策略下的索引文件列表
private List commitsToDelete = new ArrayList();
private PrintStream infoStream;
private Directory directory;
private IndexDeletionPolicy policy;
void setInfoStream(PrintStream infoStream) {
this.infoStream = infoStream;
}
private void message(String message) {
infoStream.println(this + " " + Thread.currentThread().getName() + ": " + message);
}
//================IndexFileDeleter()方法开始================
// 初始化一个IndexFileDeleter实例,初始化要做大量工作
public IndexFileDeleter(Directory directory, IndexDeletionPolicy policy, SegmentInfos segmentInfos, PrintStream infoStream)
throws CorruptIndexException, IOException {
this.infoStream = infoStream;
this.policy = policy;
this.directory = directory;
// 第一次遍历索引目录下的索引文件,初始化索引文件索引的文件计数为0
long currentGen = segmentInfos.getGeneration(); // 获取下一次提交时索引段文件segments_N的版本号
// 初始化一个对索引文件进行过滤的IndexFileNameFilter实例
IndexFileNameFilter filter = IndexFileNameFilter.getFilter();
String[] files = directory.list();
if (files == null)
throw new IOException("cannot read directory " + directory + ": list() returned null");
CommitPoint currentCommitPoint = null;
for(int i=0;i String fileName = files[i]; if (filter.accept(null, fileName) && !fileName.equals(IndexFileNames.SEGMENTS_GEN)) { // IndexFileNames.SEGMENTS_GEN常量的值为segments.gen,可以在Lucene-2.2.0 源代码阅读学习(11) 看到生成的segments.gen文件 // 如果生成的索引文件合法,则添加到一个初始化索引计数为0的RefCount中 if (fileName.startsWith(IndexFileNames.SEGMENTS)) { // This is a commit (segments or segments_N), and if (currentCommitPoint == null) { // 对索引目录中所有的索引段文件进行排序 // 删除引用计数为0的索引文件. // 在索引器启动的时刻根据指定删除策略删除索引文件 // 索引器启动的时刻成功地删除了索引文件,之后还要盘点当前驻留内存中的SegmentInfos,避免它们仍然使用删除的索引文件 //================IndexFileDeleter()方法结束================ // 根据索引文件删除策略决定删除的提交点,将commitsToDelete列表中的提交点从每个SegmentInfos中删除掉 int size = commitsToDelete.size(); if (size > 0) { // First decref all files that had been referred to by // Now compact commits to remove deleted ones (保持有序): while(size > writeTo) { /** /** if (infoStream != null) { // Try again now to delete any previously un-deletable // Incref the files: if (isCommit) { // Tell policy so it can remove commits: // Decref files for commits that were deleted by the policy: // DecRef old files from the last checkpoint, if any: if (!isCommit) { void incRef(SegmentInfos segmentInfos, boolean isCommit) throws IOException { if (isCommit) { // 对列表files中的索引文件,进行批量引用计数加1操作 private void incRef(List files) throws IOException { // 对列表files中的索引文件,进行批量引用计数减1操作 private void decRef(List files) throws IOException { // 指定索引文件的引用计数减1 private void decRef(String fileName) throws IOException { void decRef(SegmentInfos segmentInfos) throws IOException { // 根据指定的索引文件的名称,获取用于管理该索引文件的引用计数RefCount实例 private RefCount getRefCount(String fileName) { // 从Directory directory目录中删除指定索引文件fileName private void deleteFile(String fileName) // 删除失败索引文件还残留于索引目录中,并且,如果输出流关闭,则提示稍后删除 if (infoStream != null) { /** // RefCount类是用于管理一个索引文件的引用计数的,当然,一个索引文件可能没有被引用过,这时引用计数this.count=0,应该删除掉这个没有意义的索引文件 int count; final private int IncRef() { // 计数加1 final private int DecRef() { // 计数减1 } 将静态内部类CommitPoint(是IndexCommitPoint接口的一个具体实现类)单独拿出来看: /** final private class CommitPoint implements Comparable, IndexCommitPoint { long gen; // 下次提交索引段segments_N的版本 public CommitPoint(SegmentInfos segmentInfos) throws IOException { /** /** public int compareTo(Object obj) {
getRefCount(fileName);
// it's valid (<= the max gen). Load it, then
// incref all files it refers to:
if (SegmentInfos.generationFromSegmentsFileName(fileName) <= currentGen) {
if (infoStream != null) {
message("init: load commit \"" + fileName + "\"");
}
SegmentInfos sis = new SegmentInfos();
sis.read(directory, fileName);
CommitPoint commitPoint = new CommitPoint(sis);
if (sis.getGeneration() == segmentInfos.getGeneration()) {
currentCommitPoint = commitPoint;
}
commits.add(commitPoint);
incRef(sis, true);
}
}
}
}
throw new CorruptIndexException("failed to locate current segments_N file");
}
Collections.sort(commits);
Iterator it = refCounts.keySet().iterator();
while(it.hasNext()) {
String fileName = (String) it.next();
RefCount rc = (RefCount) refCounts.get(fileName);
if (0 == rc.count) {
if (infoStream != null) {
message("init: removing unreferenced file \"" + fileName + "\"");
}
deleteFile(fileName);
}
}
policy.onInit(commits);
if (currentCommitPoint.deleted) {
checkpoint(segmentInfos, false);
}
deleteCommits(); // 提交删除
}
private void deleteCommits() throws IOException {
// the now-deleted commits:
for(int i=0;i
if (infoStream != null) {
message("deleteCommits: now remove commit \"" + commit.getSegmentsFileName() + "\"");
}
int size2 = commit.files.size();
for(int j=0;j
}
decRef(commit.getSegmentsFileName());
}
commitsToDelete.clear();
size = commits.size();
int readFrom = 0;
int writeTo = 0;
while(readFrom < size) {
CommitPoint commit = (CommitPoint) commits.get(readFrom);
if (!commit.deleted) {
if (writeTo != readFrom) {
commits.set(writeTo, commits.get(readFrom));
}
writeTo++;
}
readFrom++;
}
commits.remove(size-1);
size--;
}
}
}
* 用于检查优化的方法
* 因为在复杂的操作过程中,可能发生异常,索引目录中可能存在不被引用的索引文件,
* 应该删除这些无用的索引文件,释放磁盘空间
*/
public void refresh() throws IOException {
String[] files = directory.list();
if (files == null)
throw new IOException("cannot read directory " + directory + ": list() returned null");
IndexFileNameFilter filter = IndexFileNameFilter.getFilter();
for(int i=0;i
if (filter.accept(null, fileName) && !refCounts.containsKey(fileName) && !fileName.equals(IndexFileNames.SEGMENTS_GEN)) {
// 经过过滤、检查,找出残留的无用索引文件,删除他们
if (infoStream != null) {
message("refresh: removing newly created unreferenced file \"" + fileName + "\"");
}
deleteFile(fileName);
}
}
}
* For definition of "check point" see IndexWriter comments:
* removed, we decref their files as well.
*/
public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
message("now checkpoint \"" + segmentInfos.getCurrentSegmentFileName() + "\" [isCommit = " + isCommit + "]");
}
// files (because they were in use, on Windows):
if (deletable != null) {
List oldDeletable = deletable;
deletable = null;
int size = oldDeletable.size();
for(int i=0;i
}
}
incRef(segmentInfos, isCommit);
// Append to our commits list:
commits.add(new CommitPoint(segmentInfos));
policy.onCommit(commits);
deleteCommits();
}
int size = lastFiles.size();
if (size > 0) {
for(int i=0;i
}
lastFiles.clear();
}
// Save files so we can decr on next checkpoint/commit:
size = segmentInfos.size();
for(int i=0;i
if (segmentInfo.dir == directory) {
lastFiles.add(segmentInfo.files());
}
}
}
}
int size = segmentInfos.size();
for(int i=0;i
if (segmentInfo.dir == directory) {
incRef(segmentInfo.files());
}
}
// Since this is a commit point, also incref its
// segments_N file:
getRefCount(segmentInfos.getCurrentSegmentFileName()).IncRef();
}
}
int size = files.size();
for(int i=0;i
RefCount rc = getRefCount(fileName);
if (infoStream != null) {
message(" IncRef \"" + fileName + "\": pre-incr count is " + rc.count);
}
rc.IncRef();
}
}
int size = files.size();
for(int i=0;i
}
}
RefCount rc = getRefCount(fileName);
if (infoStream != null) {
message(" DecRef \"" + fileName + "\": pre-decr count is " + rc.count);
}
if (0 == rc.DecRef()) {
// 一个索引文件的引用计数为0了,即该索引文件已变成垃圾索引,要删除该索引文件
deleteFile(fileName);
refCounts.remove(fileName);
}
}
final int size = segmentInfos.size();
for(int i=0;i
if (segmentInfo.dir == directory) {
decRef(segmentInfo.files());
}
}
}
RefCount rc;
if (!refCounts.containsKey(fileName)) {
rc = new RefCount();
refCounts.put(fileName, rc);
} else {
rc = (RefCount) refCounts.get(fileName);
}
return rc;
}
throws IOException {
try {
if (infoStream != null) { // 如果输出流保持打开状态
message("delete \"" + fileName + "\"");
}
directory.deleteFile(fileName);
} catch (IOException e) { // 如果删除失败
if (directory.fileExists(fileName)) {
message("IndexFileDeleter: unable to remove file \"" + fileName + "\": " + e.toString() + "; Will re-try later.");
}
if (deletable == null) { // 将删除失败的索引文件添加到列表deletable中
deletable = new ArrayList();
}
deletable.add(fileName);
}
}
}
* Blindly delete the files used by the specific segments,
* with no reference counting and no retry. This is only
* currently used by writer to delete its RAM segments
* from a RAMDirectory.
*/
public void deleteDirect(Directory otherDir, List segments) throws IOException {
int size = segments.size();
for(int i=0;i
int size2 = filestoDelete.size();
for(int j=0;j
}
}
}
final private static class RefCount {
return ++count;
}
return --count;
}
}
* 保存每个提交点的详细信息,为了更好地在应用删除策略时进行应用提供方便。
* 该类实现了Comparable接口;该类的实例,即提交点,在放到一个List中的时候,不能有重复的
*/
List files; // 属于当前索引目录的索引段的一个列表
String segmentsFileName; // 一个索引段
boolean deleted; // 删除标志
segmentsFileName = segmentInfos.getCurrentSegmentFileName();
int size = segmentInfos.size(); // segmentInfos是一个索引段SegmentInfo的向量
files = new ArrayList(size);
gen = segmentInfos.getGeneration(); // 获取下次提交索引段segments_N的版本号
for(int i=0;i
if (segmentInfo.dir == directory) {
files.add(segmentInfo.files()); // 如果该索引段segmentInfo属于该索引目录,则加入到列表files中
}
}
}
* 获取与该提交点相关的segments_N索引段
*/
public String getSegmentsFileName() {
return segmentsFileName;
}
* 删除一个提交点
*/
public void delete() {
if (!deleted) {
deleted = true;
commitsToDelete.add(this);
}
}
CommitPoint commit = (CommitPoint) obj;
if (gen < commit.gen) {
return -1;
} else if (gen > commit.gen) {
return 1;
} else {
return 0;
}
}
}