Chinaunix首页 | 论坛 | 博客
  • 博客访问: 128406
  • 博文数量: 69
  • 博客积分: 2895
  • 博客等级: 少校
  • 技术积分: 710
  • 用 户 组: 普通用户
  • 注册时间: 2010-09-03 18:05
文章分类

全部博文(69)

文章存档

2010年(69)

我的朋友

分类:

2010-09-11 15:29:52

在上上篇文章中,我们了解了创建索引的一般流程,在上篇文章中,我们也已经明确的知道,一个node的哪些信息是需要加入到索引中去的,也就是 jackrabbit是如何来创建document的。那么接下来我们就是要了解jackrabbit是如何把这些document加入到索引文件中去的。

  接下来我们需要回顾一下索引概览中的最后几句话:

  那么代码的主要逻辑在哪里呢?

  在DeleteNode和AddNode类中,还有就是在flush方法中。

  这句话充分了说明本文需要描述的内容,那便是

  1 如何把document从index删除

  2 如何把document放到index中

  3 什么是flush

  在讨论这3个内容之前我们有必要说明一下executeAndLog方法的作用,从方法名,我们得到的信息是:1执行Node,而log一些数据,那么我们再来看看源代码

  executeAndLog(new DeleteNode(transactionId, (UUID) remove.next()));

  先进入了executeAndLog方法:

  Java代码   

private Action executeAndLog(Action a) 
      throws IOException { 
    a.execute(this); 
    redoLog.append(a); 
    // please note that flushing the redo log is only required on 
    // commit, but we also want to keep track of new indexes for sure. 
    // otherwise it might happen that unused index folders are orphaned 
    // after a crash. 
    if (a.getType() == Action.TYPE_COMMIT || a.getType() == Action.TYPE_ADD_INDEX) { 
      redoLog.flush(); 
      // also flush indexing queue 
      indexingQueue.commit(); 
    } 
    return a; 
}

在上面这个方法中,我们可以看到以下几个流程:

  1 执行DeleteNode的execute方法(是Action的实现类)

  2 记录redolog(redolog做啥呀,先留着)

  3 刷新redolog到磁盘同时commit indexingqueue(如果是事物提交或者添加index的时候,同时,这个indexingqueue是一个非常怪的设计)

  显然这个方法是用来执行Action的,DeleteNode只是Action之一,所以要理解整个体系,我们得先了解了解Action接口到底有多少个实现类:

深入浅出 jackrabbit 四 索引提交(上)

  从图中可以看到Action有很多的实现类,不过ahuaxuan已经在图中作了一些概要的说明以帮助我们理解这些实现类的作用,而且DeleteNode和AddNode类已经出现在我们要分析的范围之内了。

  在了解了Action之后,我们需要正视我们在文章开始提出的3个问题了

  1 如何把document从index删除

  首先,看看DeleteNode里面的逻辑:从上图中我们可以确切的知道DeleteNode就是用来把一个Node从index删除的action。它的主要逻辑都集中在execute方法中,那么下面我们来看看DeleteNode的execute方法(其中包含了ahuaxuan的一些注释和原有的注释,可以帮助我们快速理解它的功能):

  Java代码   

public void execute(MultiIndex index) throws IOException { 
//一个deleteNode代表了一个需要被删除的Node,它持有这个//node的uuid 
      String uuidString = uuid.toString(); 
      // check if indexing queue is still working on 
      // this node from a previous update 
//根据uuid把document从indexingqueue中删除 
      Document doc = index.indexingQueue.removeDocument(uuidString); 
      if (doc != null) { 
        Util.disposeDocument(doc); 
      } 
      Term idTerm = new Term(FieldNames.UUID, uuidString); 
      // if the document cannot be deleted from the volatile index 
      // delete it from one of the persistent indexes. 
//同时也要从document内存队列或者内存索引中删除,如果存在于volatieindex的document内存队列或者内存索引中,则表示不存在 
于persistentindex中,反之亦然。 
      int num = index.volatileIndex.removeDocument(idTerm); 
      if (num == 0) { 
        for (int i = index.indexes.size() - 1; i >= 0; i--) { 
//不存在于内存索引中,所以从注册过的persistentindex中删除 
          // only look in registered indexes 
          PersistentIndex idx = (PersistentIndex) index.indexes.get(i); 
          if (index.indexNames.contains(idx.getName())) { 
            num = idx.removeDocument(idTerm); 
            if (num > 0) { 
              return; 
            } 
          } 
        } 
      } 
    }

这段方法的主要逻辑是从indexingqueue中删除node对应的document,并从volatileindex对象(或者 indexfile)中也删除对应的document,最后从persistentindex类中删除对应的document,所以这里有几件事情值得我们注意:

  1. indexingqueue中有可能保存着一个node对应的document,那么这个indexingqueue是做什么用的?

  Indexingqueue中的document其实是那些需要的extract的node,因为有些node包含的二级制文件,比如pdf,提炼文本的时候需要点时间,于是,jackrabbit就把提炼的过程给弄成异步的了,提炼完成之后,会把这个document放到indexingqueue中。所以当一个document需要删除的时候,肯定要检查这个异步的队列。

  2. Volatileindex是个什么玩意,内存索引,即把索引存放在memory中

  但是这里有点蹊跷的地方,需要我们注意一下,我们看看它逻辑:

  Java代码   

int removeDocument(Term idTerm) throws IOException { 
    Document doc = (Document) pending.remove(idTerm.text()); 
    int num; 
    if (doc != null) { 
      Util.disposeDocument(doc); 
      // pending document has been removed 
      num = 1; 
    } else { 
      // remove document from index 
      num = super.getIndexReader().deleteDocuments(idTerm); 
    } 
    numDocs -= num; 
    return num; 
}

  这个方法在DeleteNode的execute方法中被调用过了,从这个方法来看,这里面有两件事情值得我们关注,一个是volatileindex类中有一个pending队列,放着document,经查,当这个队列超过一定长度的时候,document中的数据就会被写成二进制索引数据放到内存中。

另外一个是如果pending中不存在这个document,那么就会调用indexreader来删除ramdirectory中符合条件的document。

  也就是不管怎么做,volatile都是在操作内存,它有一个document队列,当队列长度超过10的时候,这些document就会被转换成index的二进制数据放到ramdirectory。

  看到这里,童鞋们应该会问一个问题,就是内存中的index数据怎么写到磁盘中呢?后面我们会看到它有一个copy方法负责把内存中的数据copy到磁盘上。

  3. Persistentindex是什么玩意,持久化索引,把索引存放在持久化介质中,目前是local file system。而且看上去PersistentIndex有很多个。不同的persistentindex会使用不同的directory。这种设计是不是告诉我们,这些不同的fsdirectory在某个固定的点是需要合并的呢?带着两个疑问,我们可以继续往下看。

  总结成一句话就是,一个document可能存在于多个地方,当删除一个node时,所有的这些地方都需要清扫一遍。

  我们来看看第二个大问题:

  2 如何把document放到index中

  如果DeleteNode一样,AddNode的逻辑也在它的execute方法中,既然如此,我们不妨过去看看:

  Java代码   

public void execute(MultiIndex index) throws IOException { 
      if (doc == null) { 
        try { 
//如果doc为null再次创建document,创建document的流程之前已经讲得很清楚了。 
          doc = index.createDocument(new NodeId(uuid)); 
        } catch (RepositoryException e) { 
          // node does not exist anymore 
          log.debug(e.getMessage()); 
        } 
      } 
      if (doc != null) { 
        index.volatileIndex.addDocuments(new Document[]{doc}); 
      } 
    }

这个方法简单得不能再简单,简单得让人无法置信,呵呵,不要被假象所迷惑。我们来看看

  volatileIndex.addDocuments这个方法,因为逻辑都在这个方法中:

  Java代码   

void addDocuments(Document[] docs) throws IOException { 
    for (int i = 0; i < docs.length; i++) { 
      Document old = (Document) pending.put(docs[i].get(FieldNames.UUID), docs[i]); 
//这里的pending就是volatileindex中那个document内存队列 
      if (old != null) { 
        Util.disposeDocument(old); 
      } 
 
//如果队列长度超过10(bufferSize),那么执行commitPending,也就是说逻辑又跑到commitPending中去了 
      if (pending.size() >= bufferSize) { 
        commitPending(); 
      } 
      numDocs++; 
    } 
    invalidateSharedReader(); 
  }

  既然逻辑跑到commitPending中去了,我们就看commitPending,从名字上来看,commitPending就是把pending中的document处理掉:

  Java代码  

private void commitPending() throws IOException { 
    super.addDocuments((Document[]) pending.values().toArray( 
        new Document[pending.size()])); 
//从这里可以看出,一旦pending被处理,那么就把pending置空。 
    pending.clear(); 
  }

  逻辑还不在这个方法中,那么我们继续追踪旅程,来看看这个方法AbstractIndex.addDocuments,在这个方法中,我们终于看到我们想看到的:

 Java代码   

void addDocuments(Document[] docs) throws IOException { 
    final IndexWriter writer = getIndexWriter(); 
//一般情况下,我们的docs数组的长度为10,下面就创建10个线程 
    DynamicPooledExecutor.Command commands[] = 
        new DynamicPooledExecutor.Command[docs.length]; 
 
    for (int i = 0; i < docs.length; i++) { 
      // check if text extractor completed its work 
      final Document doc = getFinishedDocument(docs[i]); 
      // create a command for inverting the document 
      commands[i] = new DynamicPooledExecutor.Command() { 
        public Object call() throws Exception { 
          long time = System.currentTimeMillis(); 
//每个线程都使用往writer里加入document对象,这个时候,lucene开始解析document,并生产index数据 
          writer.addDocument(doc); 
          return new Long(System.currentTimeMillis() - time); 
        } 
      }; 
    } 
//并发执行 
    DynamicPooledExecutor.Result results[] = EXECUTOR.executeAndWait(commands); 
//置空readOnlyReader和sharedReader,为啥置空啊,index数据改了呗 
    invalidateSharedReader(); 
    IOException ex = null; 
 
//检查每个线程的执行情况,有一个出错就抛出异常,其他的异常保存到log中 
    for (int i = 0; i < results.length; i++) { 
      if (results[i].getException() != null) { 
        Throwable cause = results[i].getException().getCause(); 
        if (ex == null) { 
          // only throw the first exception 
          if (cause instanceof IOException) { 
            ex = (IOException) cause; 
          } else { 
            IOException e = new IOException(); 
            e.initCause(cause); 
            ex = e; 
          } 
        } else { 
          // all others are logged 
          log.warn("Exception while inverting document", cause); 
        } 
      } else { 
        log.debug("Inverted document in {} ms", results[i].get()); 
      } 
    } 
    if (ex != null) { 
      throw ex; 
    } 
  }

在看过方法中的注释,大家应该都明白了,在AddNode方法中执行的操作其实就是在内存中生成index数据的操作。而且这个操作是并发执行的。

  那么我们内存中的index数据就不需要写到磁盘上了吗,可能吗,要回答这个问题我们回到MultiIndex#update方法:

  Java代码  

executeAndLog(new AddNode(transactionId, doc)); 
          // commit volatile index if needed 
          flush |= checkVolatileCommit(); 

  我们看到这里有一个checkVolatileCommit,就它了,从名字上看,它Y滴就是想把内存中的数据刷到磁盘上,进去看看呗:

  Java代码  

private boolean checkVolatileCommit() throws IOException { 
    if (volatileIndex.getNumDocuments() >= handler.getMinMergeDocs()) { 
      commitVolatileIndex(); 
      return true; 
    } 
    return false; 
}

  果然,volatileIndex.getNumDocuments()会返回它处理的document的数量,而 handler.getMinMergeDocs()的默认值是100,也就是说当内存中的index数据对应的document超过100的话,就需要做commitVolatileIndex操作,而且返回true,否则就是返回false,这个true或false非常重要,因为它决定着下面的 flush操作是否需要操作,而flush就是我们后面要提到的第3个大问题,如果你已经忘记有flush这回事,请看看文章开头提到的3个大问题。那么,到这里理论上来讲内存中的数据现在就需要被刷到磁盘上,ahuaxuan之前也是这么想滴,但是jackrabbit往往出乎人的意料,那么我们一起进去看看(请注意ahuaxuan的注释):

Java代码   

private void commitVolatileIndex() throws IOException { 
 
    // check if volatile index contains documents at all 
    if (volatileIndex.getNumDocuments() > 0) { 
 
      long time = System.currentTimeMillis(); 
      // create index 
/*这里创建的是一个Action的实现类CreateIndex,它的作用是创建一个persistentindex对象,很显然,创建的逻辑应该在它的execute方法中,如果你深入的看下去,会发现,这里的传进去的第二参数null,其实是表示创建一个新的persistentindex,而创建一个persistentindex意味着需要一个fsdiretory,创建一个fsdirectory又意味着需要一个目录,于是我们可以看到其实创建index directory的命名方法*/ 
      CreateIndex create = new CreateIndex(getTransactionId(), null); 
      executeAndLog(create); 
 
      // commit volatile index 
/*从VolatileCommit这个名字上看来,这个action的主要操作就是把内存里的index数据写到刚创建的 persistentindex对象中,因为需要一个persistentindex的name作为构造参数,恰巧它是新建的这个 persistentindex对象,鉴于方法的重要性,ahuaxuan还是再多罗嗦一下,它的主要功能是把volatile中index的二进制数据拷贝到persistentindex,其实就是数据从ramdirectory向fsdirectory转移的过程 */ 
 
      executeAndLog(new VolatileCommit(getTransactionId(), create.getIndexName())); 
 
      // add new index 
/*这个AddIndex类也是非常重要的类,它存在的目的是什么呢?*/ 
      AddIndex add = new AddIndex(getTransactionId(), create.getIndexName()); 
      executeAndLog(add); 
 
      // create new volatile index 
/*volatileindex 的使命到这里就结束了,下面就重新创建一个volatileindex吧,放弃过去,重新开始 */ 
      resetVolatileIndex(); 
 
      time = System.currentTimeMillis() - time; 
      log.debug("Committed in-memory index in " + time + "ms."); 
    } 
  }

从代码和ahuaxuan的注释上看,要把内存中的index数据刷到磁盘上还真不是一件简单的事情。不过从这个过程中,我们得到一个重要的信息,最开始的时候,persistentindex中只包含了100个document的index数据。不过聪明的你一定已经看出点端倪,冥冥之中,好像在告诉我们,所有的这些persistentindex其实都缺少一个操作,它们需要合并index文件,这个操作和AddIndex息息相关,我们貌似对 AddIndex的左右已经看出一点端倪了。

  要解开这个谜团,非得到AddIndex中看看不可:

  Java代码   

public void execute(MultiIndex index) throws IOException { 
      PersistentIndex idx = index.getOrCreateIndex(indexName); 
/*index.indexNames是个什么东西?其实就是一个列表,任何一个新的persistentindex都需要被注册到这个列表中,它的作用很多,比如注册之后,只需要通过persistentindex类的名字就是可以取到对应的persistentindex,在上面的方法中并没有把新建的persistentindex注册到indexNames中去,所以这个方法先判断有没有注册,没有注册那么就注册进去,接着把这个 indexName传给了merger类,从名字上来看,这个merger的功能应该就是合并persistentindex的数据了*/ 
      if (!index.indexNames.contains(indexName)) { 
        index.indexNames.addName(indexName); 
        // now that the index is in the active list let the merger know about it 
        index.merger.indexAdded(indexName, idx.getNumDocuments()); 
      } 
    }

  那么既然知道了有这么一个merger类,那不看看他做了点什么也过意不去啊。

  看看它的类注释:Merges indexes in a separate deamon thread.

  原来是一个deamon线程啊,那它一定是不停的在后台执行merge操作,而且理论上来讲,也应该有一个临界值,超过多少document被写到 persistentindex的时候就执行merger操作。这里面的逻辑应该有很多值得我们学习的地方,抱着这样的想法,ahuaxuan决定把它的研究放到后面。

  接下来,让我们说说flush这个操作,这是索引提交的第三部重要流程

3 什么是flush

  大家可能已经沉浸于AddNode逻辑而不能自拔了,清醒一点,我们再次回到MultiIndex#update看看,有这么一段代码:

if (flush) { 
        flush(); 
      }

  不知道是否有人想起flush怎么才能为true呢,当VolatileIndex处理的document超过100的时候,需要把内存中的index数据copy到persistentindex中,这个时候返回true,也就是当VolatileIndex处理的document数量超过100时,就需要执行flush操作,这个就是执行flush的一个前提,不过有一点需要注意的是:flush并不只是在update方法中调用,后面我们会阐述这个问题,请记住这个问题:还有什么操作需要调用flush呢?

  那么flush方法中具体做了一些什么事情呢?让我们深入到方法的内部查看一下:

  Java代码   

void flush() throws IOException { 
/*注意这里的同步*/ 
    synchronized (this) { 
      // commit volatile index 
/*开启一个内部事务*/ 
      executeAndLog(new Start(Action.INTERNAL_TRANSACTION)); 
/*这里又执行了一遍commitVolatileIndex方法,这个方法的功能我们已经在本文的前半部分详细的分析过了,即把ramdirectory中的数据拷贝到fsdirectory中*/ 
      commitVolatileIndex(); 
 
      // commit persistent indexes 
      /*所有的persistentindex对象在创建完成之后,都会放到indexes这个list中,这个list*/ 
      for (int i = indexes.size() - 1; i >= 0; i--) { 
        PersistentIndex index = (PersistentIndex) indexes.get(i); 
        // only commit indexes we own 
        // index merger also places PersistentIndex instances in indexes, 
        // but does not make them public by registering the name in indexNames 
        if (indexNames.contains(index.getName())) { 
          index.commit(); 
          // check if index still contains documents 
/*如果一个PersistentIndex中已经没有任何一个document的数据,那就代表它已经没有存在的必要了。*/ 
  if (index.getNumDocuments() == 0) { 
            executeAndLog(new DeleteIndex(getTransactionId(), index.getName())); 
          } 
        } 
      } 
      executeAndLog(new Commit(getTransactionId())); 
/*将有效的PersistentIndex写到一个文件中,一旦当机,那么便可以在重启的时候得知哪些目录是有效的索引目录,这样做是为了防止其读取需要被删除的目录*/ 
      indexNames.write(indexDir); 
      
 
      // reset redo log 
      redoLog.clear(); 
 
      lastFlushTime = System.currentTimeMillis(); 
    } 
 
    // delete obsolete indexes 
/* 删除那些需要被删除的索引目录,delete其实是索引目录的一个文件,这个文件中保存着需要被删除的目录,包含小目录合并成大目录之后需要被删掉。 */ 
    attemptDelete(); 
  } 

在上面的flush中,我们已经看到,只有index.commit()有一点点的神秘感,其他的逻辑ahuaxuan已经写的非常详细了。其实commit就是把这个PersistentIndex对应的indexwriter关闭掉。

  总结 :

  从这篇文章(上,下篇)中,我们可以看到delete,add和flush的主要逻辑,那么再简单回顾一下:

  1.    Delete:需要把所有可能保存document的地方都检查一遍,有就删除。这些地方包括,

  1)    VolatileIndex中的pendding队列

  2)    VolatileIndex(ramdirectory)中的document的indexdata

  3)    IndexQueue

  4)    PersistentIndex(fsdirectory)

  2. Add and Flush:

  1) 创建document

  2) 将document加入到VolatileIndex的pending中

  3) pending 中的document大于10就多线程生产indexdata到ram中

  4) ram中的document的index数据大于100份就新建一个persistentIndex,并把这些数据拷贝到PersistentIndex对应的磁盘目录中。   

  3 indexNames和deletables这两个变量分别对应两个文件,一个表示有效的索引目录,还有一个表示需要删除的索引目录。在整个update 的过程中,PersistentIndex可能会新建,成为一个有效的目录,再后面的合并过程中又可能会被删除,所以用这两个变量来记录有效索引目录和需要被删除的索引目录。

  通过这篇文章,我们可以得到了两个疑问:

  1.    indexqueue到底是怎么使用的?

  2.    indexmerger的逻辑是怎么样的?

  同样,ahuaxuan将会在后面的文章中阐述这两个话题。

阅读(628) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~