Chinaunix首页 | 论坛 | 博客
  • 博客访问: 133582
  • 博文数量: 69
  • 博客积分: 2895
  • 博客等级: 少校
  • 技术积分: 710
  • 用 户 组: 普通用户
  • 注册时间: 2010-09-03 18:05
文章分类

全部博文(69)

文章存档

2010年(69)

我的朋友

分类:

2010-09-11 15:34:28

用lucene作过索引的同学一定对文本提取不会陌生,但凡有binary需要作索引的地方,那就离不开文本的提取,而jackrabbit是可以存储二进制数据的,而这些二进制数据中包含的文本也是需要被搜索到的,所以jackrabbit里建索引的过程中自然离不开文本提取。

  那么看到本文的标题,有经验的同学一定会问,前面都已经讲过索引的提交了,为什么到现在才讲文本提取呢。要回答这个问题,我们必须来回顾一下jackrabbit中建索引的流程(简单的划分为4个步骤,如下):

  1创建document-----2加入到volatileindex的pending缓存中---------3生成index数据到内存-------4将内存中的二进制数据刷到文件系统。

  那么哪里需要提取文本呢,很显然是在创建document的时候。那这个话题是不是应该放到创建document一文中阐述比较好呢,非也,这个问题之所以放到索引提交后面阐述是有原因的。

  一般来说,文本提取的耗时长度和文本的大小有直接关系,当文本的大小超过一定的量之后,提取的速度会比较慢。那么当前的WorkThread就会一直耗在这块地方,也就是上述流程中的第一步,这样WorkThread一定要等到文本提取之后才能返回,不知道有多少用户有这个耐性等待下去呢。而且这里还有另外一个问题,那就是WorkThread如果因为这种操作而阻塞住,那么它回线程池的时间就会延后,新的请求过来因为拿到不到线程池中的 WorkThread而不得不等待,一直到池中有可用线程为止,或者选择timeout返回,那么如果因为WorkThread都耗在文本提取这个地方,那么就太冤了,因为一般来说大文本做索引的实时性要求并不是很高,所以因为同步文本提取这个原因导致系统的并发量急剧下降,用户就会开始抱怨,你们这个破玩意怎么会这么慢?

 为了解决这个问题,在jackrabbit中引入了异步处理文本提取任务的功能(它必须是异步的吗?当然不,jackrabbit也提供了同步的选择,但是本文主要阐述异步提取的功能,所以在下面的文章中我们假设我们配置异步提取的功能),说到这里,我们不得不再说说他是怎么个异步法的呢?它是异步提取,然后由处理提取的线程负责把这个document重新加入到index中呢,还是提取的线程只负责提取,提取完成之后再把提取完成的document交给专门的线程去add到document中呢。

  后者,当然是后者,提取的线程只负责提取,有专门的线程来负责更新document到index中。如果说观察者模式是设计模式中的皇后,那么 ahuaxuan可以说,生产消费模型是编程模型中的皇后,它无处不在(您要是天天写crud的话,ahuaxuan很难保证它无处不在)。

  在前面的文章中,我们一直在提MultiIndex#indexingqueue,而且一直没有详细解释indexingqueue的功能。其实它就是在异步文本提取中充当着消费队列的职责,提取线程(生产者)再文本提取完成之后,会把document加入到indexingqueue中。那么消费者线程就负责定时的从indexingqueue中取出一件准备好的document,并执行multiindex#update方法。

  我们先来看看提取文本是怎么做的。再次回到NodeIndexer类的addBinary方法,我相信大家对这个方法还有印象,他负责创建一个名字叫jcr:data的field。他的值就是来自于二进制文件,比如pdf,doc等等:

  Java代码   

protected void addBinaryValue(Document doc, 
                 String fieldName, 
                 Object internalValue) { 
    ……………………………………………….. 
    
        Reader reader; 
        if (extractor instanceof PooledTextExtractor) { 
//从类名上我们也可以看出点端倪,这里貌似有个线程池。 
          reader = ((PooledTextExtractor)extractor).extractText(stream, type, encoding, node.getNodeId().getUUID().toString()); 
        } else { 
//忽视这个else,假设只进入if,          reader = extractor.extractText(stream, type, encoding); 
        } 
//这个createFulltextField方法非常之相当重要。后面会讲到 
        doc.add(createFulltextField(reader)); 
      …………………………………………………………….. 
  } 

现在我们要做的就是跟到PooledTextExtractor# extractText方法中去,其他能做的也不多:

  Java代码   

public Reader extractText(InputStream stream, 
               String type, 
               String encoding, 
               String jobId) throws IOException { 
    if (null == jobId || "".equals(jobId)) { 
//结合上面的方法,我们得知,jobid的值是一个node的id 
      return extractText(stream, type, encoding); 
    } else { 
/*在这里,创建了一个TextExtractorJob,它是Runnable的一个实现类*/ 
      TextExtractorJob job = new TextExtractorJob(extractor, stream, type, encoding, jobId); 
      return new TextExtractorReader(job, executor, timout); 
    } 
  } 

  从上面的方法中,我们可以看到一个简单的逻辑,创建一个thread,然后放到池中执行,并返回结果。

  先看看创建线程的流程:

  Java代码   

public TextExtractorJob(final TextExtractor extractor, 
              final InputStream stream, 
              final String type, 
              final String encoding, 
              final String jobId) { 
    this.type = type; 
    this.jobId = jobId; 
    this.cmd = setter(new Callable() { 
      public Object call() throws Exception { 
        Reader r = extractor.extractText(stream, type, encoding); 
        if (r != null) { 
          if (discarded) { 
            r.close(); 
            r = null; 
          } else if (timedOut) { 
            // spool a temp file to save memory 
/*这里的逻辑非常重要,这里的逻辑表明,一旦方法执行到这里,表示上面的extractor.extractText 已经超过了我们预设的时间,所以为了避免占用过多的内存,需要把内存中的数据拷贝到磁盘上,以减少内存的消耗。*/ 
            r = getSwappedOutReader(r); 
          } 
        } 
        return r; 
      } 
    }); 
  }

 在上面的这个方法中,我们看到在TextExtractorJob这个线程类内部还有一个线程类,就是cmd,而且这个线程类有一个奇怪的地方,cmd这个线程的run方法中其实是执行一个Callable接口的实现类(这个逻辑写在了setter(Callable cc)中)。cmd的逻辑其实就是执行真正的文本提取操作。而TextExtractorJob的run方法其实就是执行cmd的run方法。也就是说 cmd作为一个线程类实现压根没有被真正的作为一个线程启动过,所以cmd定义为runnable只是给人多一份疑惑,其他作用没有看出来。

  创建完TextExtractorJob之后,就要用它来创建TextExtractorReader。我们看看TextExtractorReader的构造方法:

  Java代码   

TextExtractorReader(TextExtractorJob job, Executor executor, long timeout) { 
    this.job = job; 
    this.executor = executor; 
    this.timeout = timeout; 
}

  很简单,什么都没有,也就是说在这个操作的过程中,并没有触发文本提取的操作,那么这个操作是什么时候做的呢?这就需要我们回到addBinary方法,看看它是怎么用这个TextExtractorReader类的:

  doc.add(createFulltextField(reader));

  原来是又调用了createFulltextField方法,那么我们得进去看一下,说不定触发文本提取的操作就在这里哦:

  Java代码   

protected Fieldable createFulltextField(Reader value) { 
    if (supportHighlighting) { 
      return new LazyTextExtractorField(FieldNames.FULLTEXT, value, true, true); 
    } else { 
      return new LazyTextExtractorField(FieldNames.FULLTEXT, value, false, false); 
    } 
  }

额的神啊,啥都没有,就创建了一个LazyTextExtractorField类(还记得supportHighlighting参数吗,不记得的话再回头看看前面的文章),也就是说我们不得不再去看看LazyTextExtractorField这个类,但是我们有种感觉,快要看到我们想看到的东西了,这个继承实现了lucene中的AbstractField,结合上面的doc.add方法,我们可以想象得到,真正触发提取线程工作的应该是 lucene,lucene一定会调用LazyTextExtractorField类中的某个方法,以得到值,而这个时候,如果有文本提取的任务,那么应该会触发它,然后改方法先返回空字符串,把残缺版的document先加入索引中,等待提取完成后,再由前面提到的消费者把残缺版的document数据从索引中删除,再把完整版的document加入到索引中。

  上面说到的索引时触发文本提取操作只是一种触发方式,通过阅读源代码,我们还可以发现第二种触发方式,那就是在AbstractIndex的 addDocument方法中,会判断,如果提取超过100ms,就把document拷贝一份,把copy的document加入到index中,然后把原始的document加入到indexingQueue队列中。这个时候,提取线程继续为这个原始的document执行提取操作。

  那么我们先来看第一种触发,lucene触发,下面这个方法是LazyTextExtractorField中的方法,这个方法被lucene调用,用以分词, ahuaxuan已经在关键的代码中加入了注释:

  Java代码   

public String stringValue() { 
    if (extract == null) { 
      StringBuffer textExtract = new StringBuffer(); 
      char[] buffer = new char[1024]; 
      int len; 
      try { 
  /*这里的读取操作有一定的迷惑性,其实它是调用了,考虑到这里的reader是TextExtractorReader,所以耗无疑问,这里的方法一定是调用了TextExtractorReader#read */        
while ((len = reader.read(buffer)) > -1) { 
          textExtract.append(buffer, 0, len); 
        } 
      } catch (IOException e) { 
        log.warn("Exception reading value for field: " 
            + e.getMessage()); 
        log.debug("Dump:", e); 
      } finally { 
        try { 
          reader.close(); 
        } catch (IOException e) { 
          // ignore 
        } 
      } 
      extract = textExtract.toString(); 
    } 
    return extract; 
  }

方法很简单,就是调用reader的读取,只是它调用的是Reader类的read()方法:

  Java代码   

public int read(char cbuf[]) throws IOException { 
  return read(cbuf, 0, cbuf.length); 
  } 
而其实目标的方法是子类的read(cbuf, 0, cbuf.length);也就是说应该是TextExtractorReader# read(cbuf, 0, cbuf.length),那么我们来看一下这个方法TextExtractorReader# read:这个read方法才是真正为LazyTextExtractorField#stringValue返回数据的地方 
  public int read(char cbuf[], int off, int len) throws IOException { 
/*事实上,这里的extractedText应该是不会等于空的,因为如果是异步提取,那么在lucene调用read方法之前,extract线程早就把extractedText提取出来,或者将其置为new StringReader(“”)了,除非一个document在isExtractorFinished返回false的时候,也被加入了 indexwriter。在正常流程的debug过程中,ahuaxuan并没有发现程序执行到这个if块里面*/ 
 
    if (extractedText == null) { 
      // no reader present 
      // check if job is started already 
      if (jobStarted) { 
        // wait until available 
/*如果任务已经开始,那么等待读取文本,那么如果任务开始,这个开始是谁开始的呢?其实就是后面要讲到的第二种触发方式,这里是无限时等待,太恐怖了,所以这个代码段是存在危险的,实际上,如果我们如果在每个单独的Extractor类中设置专门的超时时间,那么这里就没有问题了。*/ 
        extractedText = job.getReader(Long.MAX_VALUE); 
      } else { 
        // execute with current thread 
/*如果提取任务还没有被触发,则把该线程实例放到DIRECT_EXECUTOR这个线程池中运行,注意,其实这个类很诡异,它并不是一个线程池,它拿到线程对象之后会调用run方法,而不是start,这也意味着这个操作是非异步的,这里不会有多线程的问题 */ 
        try { 
          DIRECT_EXECUTOR.execute(job); 
        } catch (InterruptedException e) { 
          // current thread is in interrupted state 
          // -> ignore (job will not return a reader, which is fine) 
        } 
/*同步执行,立即取结果,取不到就返回null*/ 
        extractedText = job.getReader(0); 
      } 
 
      if (extractedText == null) { 
        // exception occurred 
        extractedText = new StringReader(""); 
      } 
    } 
    return extractedText.read(cbuf, off, len); 
  }

需要注意的是getSwappedOutReader这个方法,该方法用临时文件的方式来节约内存。但不是说什么地方都可以使用这种方法的,因为提取之后的问题可能在后面才能被用到,所以暂时存放在临时文件中是可行的,这种方式在下载工具中非常常见,一般下载工具下载文件时会创建临时文件也是同样的道理。

  接着,我们再来看看第二中方式触发(事实上,最开始触发这个方法的地方才是真正的第二种触发,它在什么地方呢,请参考AbstractIndex#getFinishedDocument(xx),再文本提取的第二篇文章中也有比较详细的说明):

  Java代码   

public boolean isExtractorFinished() { 
    if (!jobStarted) { 
      try { 
/*如果任务还没有开始,那么则把job放到线程池中执行,并且将jobStarted设置为true, */ 
        executor.execute(job); 
        jobStarted = true; 
      } catch (InterruptedException e) { 
        // this thread is in interrupted state 
        return false; 
      } 
/*然后开始提取的操作,同时,提取的时候有一个超时时间,代表超过多长时间就直接返回,默认时间是100毫秒,在repository.xml中的 SearchIndex节点中可以配置,一旦超时之后,reader就会被写入到临时文件中,下次读取就从临时文件中读取*/ 
      extractedText = job.getReader(timeout); 
    } else { 
      // job is already running, check for immediate result 
/*如果任务已经在上次检查中被触发过了,那么就直接获取提取之后的结果,如果还没有提取完,0表示没有提取完则抛出TimeoutException,但是getReader会捕获这个异常,并返回一个null*/ 
      extractedText = job.getReader(0); 
    } 
/*如果extractedText并且提取的时候出现异常,超时等,那么就置其为空*/ 
    if (extractedText == null && job.getException() != null) { 
      // exception occurred 
      extractedText = new StringReader(""); 
    } 
 
    return extractedText != null; 
  }

 由此可见,这里的逻辑是当地一次提取的时候,等待100毫秒之后返回,如果100毫秒之内搞定问题,那事情很顺利,如果没有搞定呢,那就等下次,下次再进入这个方法,如果还没有搞定,那么reader就是new StringReader("")了,这样二进制就没有能被成功提取出来。

  这个方法是谁来调用的呢,还是那个indexingqueue的消费者。在下文中,让我们来分析一下它。

  好了说到这里,大家应该对jackrabbit中异步文本提取的功能算是比较了解了,之前ahuaxuan说过,jackrabbit也是支持同步文本提取的,这个就比较简单了,相信大家都可以想象得出来,在一个hashmap中存在了一组extractor,比 mswordextractor,pdfextractor,那么当需要提取文本时,只要把inputstream和filetype交给处理器,那么,处理器就会为该filetype选择合适的extractor并执行extractText方法,这些逻辑都在 CompositeTextExtractor.java类中,根据filetype委派给对应的extractor,其代码也只有90行,大家可以自行查看。

  总结,在本文中,ahuaxuan主要描述了jackrabbit文本提取的两种方式,同步和异步,而且在异步的方式中,触发提取操作的有两个点,一个是消费者检查哪些document已经提取完成的时候,还有一个是lucene调用field的stringValue时候,即从document生成二进制index数据的时候。这样通过阅读源代码,我们详细的知道了jackrabbit中提取文件的逻辑。不过这个过程并不是无可挑剔的,比如存在二进制文件过大,那么提取文本过多,占用过多内存,而且占用很长的cpu时间,如何修改才能使之满足我们的需求呢。

  从本文中我们得到如下信息,文本提取的主要逻辑是什么,文本提取的触发点是什么。当然我们也得到了一个疑问,在文件提取的异步模型中,谁是生产者,谁是消费者,具体的流程是怎么样的。

  在下文中,ahuaxuan将把本文提出的细节和整个流程串起来,形成一幅完整的流程。

我们已经明确的知道,有一个队列,它的名字叫indexingqueue,它中存放的是待提取的document,下面我们就来看看它的生产者是谁:

  1. 生产者

  我们在前面的文章中提到过,VolatileIndex(内存数据)的pending队列中document数量超过10(默认值)的时候,会触发一个操作,一个多线程并发生成索引的数据,而且这个数据是存在于RamDirectory中,显然,当一个binary需要做文本提取的时候,应该也是在这个时候。我们来回顾一下,那个方法:

  Java代码   

void addDocuments(Document[] docs) throws IOException { 
 
    final IndexWriter writer = getIndexWriter(); 
 
    DynamicPooledExecutor.Command commands[] = 
 
        new DynamicPooledExecutor.Command[docs.length]; 
 
    for (int i = 0; i < docs.length; i++) { 
 
      // check if text extractor completed its work 
 
/*尤其是要注意这个方法,这个方法预示着什么,到底是什么呢?这个方法预示着一个document在进入这个方法之前已经触发了文本提取的操作,奇怪哦,其实不奇怪,需要文本提取的document是会二进宫的,这个由消费者逻辑来控制的,不过还是让我们先来看看生产者的逻辑吧。*/ 
 
      final Document doc = getFinishedDocument(docs[i]); 
 
      // create a command for inverting the document 
 
      commands[i] = new DynamicPooledExecutor.Command() { 
 
        public Object call() throws Exception { 
 
          long time = System.currentTimeMillis(); 
 
          writer.addDocument(doc); 
 
          return new Long(System.currentTimeMillis() - time); 
 
        } 
 
      }; 
 
    } 
 
}

这个方法之前已经说过,就是多线程生成document的索引数据,不过这次我们的重点并不是在多线程生成document,而是在getFinishedDocument()方法上,首先让我们来看看它的注释:

  Returns a document that is finished with text extraction and is ready to be added to the index

  也就是说只有提取完成的document才会被返回,那么如果是一个新的document,还没有执行提取操作呢,只能深入其中才能窥探它的奥秘了。

  Java代码   

private Document getFinishedDocument(Document doc) throws IOException { 
 
/* Util.isDocumentReady(doc)方法非常之十分重要,如果一眼带过(新成语)我们就会错过精彩的细节,正是在这个方法中,我们的提取工作开始了,还记得上一篇文章中的TextExtractorReader#isExtractorFinished方法吗,这个方法会判断,如果开始就等100毫秒,等待返回,否则就返回false,那么返回的flase就是用在了下面的if方法中。代表还没有提取完成。如果没有提取完成,就进入了if 的代码块*/ 
 
    if (!Util.isDocumentReady(doc)) { 
 
/*从这里可以看出,超过100毫秒,那么就创建另外一个document对象,然后把这个原始的document的值拷贝给这个新对象,需要注意的是如果field是LazyTextExtractorField 的话,那么就先把这个field置空*/ 
 
      Document copy = new Document(); 
 
      for (Iterator fields = doc.getFields().iterator(); fields.hasNext(); ) { 
 
        Fieldable f = (Fieldable) fields.next(); 
 
        Fieldable field = null; 
 
        Field.TermVector tv = getTermVectorParameter(f); 
 
        Field.Store stored = getStoreParameter(f); 
 
        Field.Index indexed = getIndexParameter(f); 
 
        if (f instanceof LazyTextExtractorField || f.readerValue() != null) { 
 
          // replace all readers with empty string reader 
 
          field = new Field(f.name(), new StringReader(""), tv); 
 
        } else if (f.stringValue() != null) { 
 
          field = new Field(f.name(), f.stringValue(), 
 
              stored, indexed, tv); 
 
        } else if (f.isBinary()) { 
 
          field = new Field(f.name(), f.binaryValue(), stored); 
 
        } 
 
        if (field != null) { 
 
          field.setOmitNorms(f.getOmitNorms()); 
 
          copy.add(field); 
 
        } 
 
      } 
 
      // schedule the original document for later indexing 
 
/*在这里,生产者终于把原始的document对象加入了indexingQueue队列。*/ 
 
      Document existing = indexingQueue.addDocument(doc); 
 
      if (existing != null) { 
 
/*如果之前这个nodeId在做索引的时候由于异常原因,jvm退出,那么在redolog和indexingqueuelog中都存在这个nodeid,那么在这个地方,可能就返回一个indexingqueue中已经存在的document了 */ 
 
        // the queue already contained a pending document for this 
 
        // node. -> dispose the document 
 
        Util.disposeDocument(existing); 
 
      } 
 
      // use the stripped down copy for now 
 
      doc = copy; 
 
    } 
 
    return doc; 
 
} 

 从上面的逻辑,我们可以看出,一旦一个二进制文本的提取超过100毫秒(默认值,可以修改)之后,那么这个document就被加入了消费队列,意味着,有消费者回来收拾它。

  2. 消费者

  去哪里找消费者呢,只要看indexingQueue被用在了什么地方就可以了,经过几个ctrl+shift+G,我们终于发现,在MultiIndex的构造方法里,有以下逻辑。

  Java代码  

Public MultiIndex() { 
 
flushTask = new Timer(); 
 
    flushTask.schedule(new TimerTask() { 
 
 
 
    public void run() { 
 
      // check if there are any indexing jobs finished 
 
/*英语注释写得还是比较清楚的,就是用来检查是否有提取的任务完成了,很显然这个timer背后的线程就是一个消费者,专门用来处理indexingQueue中的数据。接着,让我们到checkIndexingQueue的方法中走走*/ 
 
        checkIndexingQueue(); 
 
        // check if volatile index should be flushed 
 
        checkFlush(); 
 
    } 
 
     
 
    }, 0, 1000); 
 
}
 

  从上面的方法可以看出,主体逻辑在checkIndexingQueue中,那么接着,让我们到checkIndexingQueue的方法中走走。

  Java代码  

private synchronized void checkIndexingQueue() { 
 
/*找到所有提取完成的document的列表,那么如果提出还没有完成,咋办呢,不等待,直接返回new StringReader(""),这个逻辑在TextExtractorReader#isExtractorFinished*/ 
 
    Document[] docs = indexingQueue.getFinishedDocuments(); 
 
    Map finished = new HashMap(); 
 
    for (int i = 0; i < docs.length; i++) { 
 
      String uuid = docs[i].get(FieldNames.UUID); 
 
      finished.put(UUID.fromString(uuid), docs[i]); 
 
    } 
 
 
 
    // now update index with the remaining ones if there are any 
 
    if (!finished.isEmpty()) { 
 
      log.debug("updating index with {} nodes from indexing queue.", 
 
          new Long(finished.size())); 
 
 
 
      // remove documents from the queue 
 
      for (Iterator it = finished.keySet().iterator(); it.hasNext(); ) { 
 
        try { 
 
          indexingQueue.removeDocument(it.next().toString()); 
 
        } catch (IOException e) { 
 
          log.error("Failed to remove node from indexing queue", e); 
 
        } 
 
      } 
 
/*这里又是调用update方法,在前面的文章中,我们已经详细的分析过了update方法会执行哪些重要的操作,他们分别是deleteNode,addNode,flush*/ 
 
      try { 
 
        update(finished.keySet().iterator(), 
 
            finished.values().iterator()); 
 
      } catch (IOException e) { 
 
        // update failed 
 
        log.warn("Failed to update index with deferred text extraction", e); 
 
      } 
 
    } 
 
} 
 

  由此可见,一个document很有可能因为提取操作过长而二进宫,第二次进宫的时候对于一个document来说会有两个操作,一个delete,一个 add,delete的原因是因为之前已经放进去一个copy对象,这个对象的fulltext的field是””,所以必须先删除掉,然后再把提取完成的document放进索引里去。

  由此可见,在整体逻辑上还是比较清晰的,关键还是上文分析的TextExtractorReader类中存在一部分比较绕的逻辑,但是和本文结合起来看就非常容易理解了。

  通过两篇文章的分析,我们终于对jackrabbit中文本提取这块内容有比较深入的理解了,当然很有可能它还藏着玄机,等待着我们去发现,等待着我们去挖掘。

阅读(1097) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~