分类:
2010-09-11 15:34:28
用lucene作过索引的同学一定对文本提取不会陌生,但凡有binary需要作索引的地方,那就离不开文本的提取,而jackrabbit是可以存储二进制数据的,而这些二进制数据中包含的文本也是需要被搜索到的,所以jackrabbit里建索引的过程中自然离不开文本提取。
那么看到本文的标题,有经验的同学一定会问,前面都已经讲过索引的提交了,为什么到现在才讲文本提取呢。要回答这个问题,我们必须来回顾一下jackrabbit中建索引的流程(简单的划分为4个步骤,如下):
1创建document-----2加入到volatileindex的pending缓存中---------3生成index数据到内存-------4将内存中的二进制数据刷到文件系统。
那么哪里需要提取文本呢,很显然是在创建document的时候。那这个话题是不是应该放到创建document一文中阐述比较好呢,非也,这个问题之所以放到索引提交后面阐述是有原因的。
一般来说,文本提取的耗时长度和文本的大小有直接关系,当文本的大小超过一定的量之后,提取的速度会比较慢。那么当前的WorkThread就会一直耗在这块地方,也就是上述流程中的第一步,这样WorkThread一定要等到文本提取之后才能返回,不知道有多少用户有这个耐性等待下去呢。而且这里还有另外一个问题,那就是WorkThread如果因为这种操作而阻塞住,那么它回线程池的时间就会延后,新的请求过来因为拿到不到线程池中的 WorkThread而不得不等待,一直到池中有可用线程为止,或者选择timeout返回,那么如果因为WorkThread都耗在文本提取这个地方,那么就太冤了,因为一般来说大文本做索引的实时性要求并不是很高,所以因为同步文本提取这个原因导致系统的并发量急剧下降,用户就会开始抱怨,你们这个破玩意怎么会这么慢?
为了解决这个问题,在jackrabbit中引入了异步处理文本提取任务的功能(它必须是异步的吗?当然不,jackrabbit也提供了同步的选择,但是本文主要阐述异步提取的功能,所以在下面的文章中我们假设我们配置异步提取的功能),说到这里,我们不得不再说说他是怎么个异步法的呢?它是异步提取,然后由处理提取的线程负责把这个document重新加入到index中呢,还是提取的线程只负责提取,提取完成之后再把提取完成的document交给专门的线程去add到document中呢。 后者,当然是后者,提取的线程只负责提取,有专门的线程来负责更新document到index中。如果说观察者模式是设计模式中的皇后,那么 ahuaxuan可以说,生产消费模型是编程模型中的皇后,它无处不在(您要是天天写crud的话,ahuaxuan很难保证它无处不在)。 在前面的文章中,我们一直在提MultiIndex#indexingqueue,而且一直没有详细解释indexingqueue的功能。其实它就是在异步文本提取中充当着消费队列的职责,提取线程(生产者)再文本提取完成之后,会把document加入到indexingqueue中。那么消费者线程就负责定时的从indexingqueue中取出一件准备好的document,并执行multiindex#update方法。 我们先来看看提取文本是怎么做的。再次回到NodeIndexer类的addBinary方法,我相信大家对这个方法还有印象,他负责创建一个名字叫jcr:data的field。他的值就是来自于二进制文件,比如pdf,doc等等: Java代码 现在我们要做的就是跟到PooledTextExtractor# extractText方法中去,其他能做的也不多: Java代码 从上面的方法中,我们可以看到一个简单的逻辑,创建一个thread,然后放到池中执行,并返回结果。 先看看创建线程的流程: Java代码 在上面的这个方法中,我们看到在TextExtractorJob这个线程类内部还有一个线程类,就是cmd,而且这个线程类有一个奇怪的地方,cmd这个线程的run方法中其实是执行一个Callable接口的实现类(这个逻辑写在了setter(Callable cc)中)。cmd的逻辑其实就是执行真正的文本提取操作。而TextExtractorJob的run方法其实就是执行cmd的run方法。也就是说 cmd作为一个线程类实现压根没有被真正的作为一个线程启动过,所以cmd定义为runnable只是给人多一份疑惑,其他作用没有看出来。 创建完TextExtractorJob之后,就要用它来创建TextExtractorReader。我们看看TextExtractorReader的构造方法: Java代码 很简单,什么都没有,也就是说在这个操作的过程中,并没有触发文本提取的操作,那么这个操作是什么时候做的呢?这就需要我们回到addBinary方法,看看它是怎么用这个TextExtractorReader类的: doc.add(createFulltextField(reader)); 原来是又调用了createFulltextField方法,那么我们得进去看一下,说不定触发文本提取的操作就在这里哦: Java代码 额的神啊,啥都没有,就创建了一个LazyTextExtractorField类(还记得supportHighlighting参数吗,不记得的话再回头看看前面的文章),也就是说我们不得不再去看看LazyTextExtractorField这个类,但是我们有种感觉,快要看到我们想看到的东西了,这个继承实现了lucene中的AbstractField,结合上面的doc.add方法,我们可以想象得到,真正触发提取线程工作的应该是 lucene,lucene一定会调用LazyTextExtractorField类中的某个方法,以得到值,而这个时候,如果有文本提取的任务,那么应该会触发它,然后改方法先返回空字符串,把残缺版的document先加入索引中,等待提取完成后,再由前面提到的消费者把残缺版的document数据从索引中删除,再把完整版的document加入到索引中。 上面说到的索引时触发文本提取操作只是一种触发方式,通过阅读源代码,我们还可以发现第二种触发方式,那就是在AbstractIndex的 addDocument方法中,会判断,如果提取超过100ms,就把document拷贝一份,把copy的document加入到index中,然后把原始的document加入到indexingQueue队列中。这个时候,提取线程继续为这个原始的document执行提取操作。 那么我们先来看第一种触发,lucene触发,下面这个方法是LazyTextExtractorField中的方法,这个方法被lucene调用,用以分词, ahuaxuan已经在关键的代码中加入了注释: Java代码 方法很简单,就是调用reader的读取,只是它调用的是Reader类的read()方法: Java代码 需要注意的是getSwappedOutReader这个方法,该方法用临时文件的方式来节约内存。但不是说什么地方都可以使用这种方法的,因为提取之后的问题可能在后面才能被用到,所以暂时存放在临时文件中是可行的,这种方式在下载工具中非常常见,一般下载工具下载文件时会创建临时文件也是同样的道理。 接着,我们再来看看第二中方式触发(事实上,最开始触发这个方法的地方才是真正的第二种触发,它在什么地方呢,请参考AbstractIndex#getFinishedDocument(xx),再文本提取的第二篇文章中也有比较详细的说明): Java代码 由此可见,这里的逻辑是当地一次提取的时候,等待100毫秒之后返回,如果100毫秒之内搞定问题,那事情很顺利,如果没有搞定呢,那就等下次,下次再进入这个方法,如果还没有搞定,那么reader就是new StringReader("")了,这样二进制就没有能被成功提取出来。 这个方法是谁来调用的呢,还是那个indexingqueue的消费者。在下文中,让我们来分析一下它。 好了说到这里,大家应该对jackrabbit中异步文本提取的功能算是比较了解了,之前ahuaxuan说过,jackrabbit也是支持同步文本提取的,这个就比较简单了,相信大家都可以想象得出来,在一个hashmap中存在了一组extractor,比 mswordextractor,pdfextractor,那么当需要提取文本时,只要把inputstream和filetype交给处理器,那么,处理器就会为该filetype选择合适的extractor并执行extractText方法,这些逻辑都在 CompositeTextExtractor.java类中,根据filetype委派给对应的extractor,其代码也只有90行,大家可以自行查看。 总结,在本文中,ahuaxuan主要描述了jackrabbit文本提取的两种方式,同步和异步,而且在异步的方式中,触发提取操作的有两个点,一个是消费者检查哪些document已经提取完成的时候,还有一个是lucene调用field的stringValue时候,即从document生成二进制index数据的时候。这样通过阅读源代码,我们详细的知道了jackrabbit中提取文件的逻辑。不过这个过程并不是无可挑剔的,比如存在二进制文件过大,那么提取文本过多,占用过多内存,而且占用很长的cpu时间,如何修改才能使之满足我们的需求呢。 从本文中我们得到如下信息,文本提取的主要逻辑是什么,文本提取的触发点是什么。当然我们也得到了一个疑问,在文件提取的异步模型中,谁是生产者,谁是消费者,具体的流程是怎么样的。 在下文中,ahuaxuan将把本文提出的细节和整个流程串起来,形成一幅完整的流程。 我们已经明确的知道,有一个队列,它的名字叫indexingqueue,它中存放的是待提取的document,下面我们就来看看它的生产者是谁: 1. 生产者 我们在前面的文章中提到过,VolatileIndex(内存数据)的pending队列中document数量超过10(默认值)的时候,会触发一个操作,一个多线程并发生成索引的数据,而且这个数据是存在于RamDirectory中,显然,当一个binary需要做文本提取的时候,应该也是在这个时候。我们来回顾一下,那个方法: Java代码 这个方法之前已经说过,就是多线程生成document的索引数据,不过这次我们的重点并不是在多线程生成document,而是在getFinishedDocument()方法上,首先让我们来看看它的注释: Returns a document that is finished with text extraction and is ready to be added to the index 也就是说只有提取完成的document才会被返回,那么如果是一个新的document,还没有执行提取操作呢,只能深入其中才能窥探它的奥秘了。 Java代码 从上面的逻辑,我们可以看出,一旦一个二进制文本的提取超过100毫秒(默认值,可以修改)之后,那么这个document就被加入了消费队列,意味着,有消费者回来收拾它。 2. 消费者 去哪里找消费者呢,只要看indexingQueue被用在了什么地方就可以了,经过几个ctrl+shift+G,我们终于发现,在MultiIndex的构造方法里,有以下逻辑。 Java代码 从上面的方法可以看出,主体逻辑在checkIndexingQueue中,那么接着,让我们到checkIndexingQueue的方法中走走。 Java代码 由此可见,一个document很有可能因为提取操作过长而二进宫,第二次进宫的时候对于一个document来说会有两个操作,一个delete,一个 add,delete的原因是因为之前已经放进去一个copy对象,这个对象的fulltext的field是””,所以必须先删除掉,然后再把提取完成的document放进索引里去。 由此可见,在整体逻辑上还是比较清晰的,关键还是上文分析的TextExtractorReader类中存在一部分比较绕的逻辑,但是和本文结合起来看就非常容易理解了。 通过两篇文章的分析,我们终于对jackrabbit中文本提取这块内容有比较深入的理解了,当然很有可能它还藏着玄机,等待着我们去发现,等待着我们去挖掘。protected void addBinaryValue(Document doc,
String fieldName,
Object internalValue) {
………………………………………………..
Reader reader;
if (extractor instanceof PooledTextExtractor) {
//从类名上我们也可以看出点端倪,这里貌似有个线程池。
reader = ((PooledTextExtractor)extractor).extractText(stream, type, encoding, node.getNodeId().getUUID().toString());
} else {
//忽视这个else,假设只进入if, reader = extractor.extractText(stream, type, encoding);
}
//这个createFulltextField方法非常之相当重要。后面会讲到
doc.add(createFulltextField(reader));
……………………………………………………………..
} public Reader extractText(InputStream stream,
String type,
String encoding,
String jobId) throws IOException {
if (null == jobId || "".equals(jobId)) {
//结合上面的方法,我们得知,jobid的值是一个node的id
return extractText(stream, type, encoding);
} else {
/*在这里,创建了一个TextExtractorJob,它是Runnable的一个实现类*/
TextExtractorJob job = new TextExtractorJob(extractor, stream, type, encoding, jobId);
return new TextExtractorReader(job, executor, timout);
}
} public TextExtractorJob(final TextExtractor extractor,
final InputStream stream,
final String type,
final String encoding,
final String jobId) {
this.type = type;
this.jobId = jobId;
this.cmd = setter(new Callable() {
public Object call() throws Exception {
Reader r = extractor.extractText(stream, type, encoding);
if (r != null) {
if (discarded) {
r.close();
r = null;
} else if (timedOut) {
// spool a temp file to save memory
/*这里的逻辑非常重要,这里的逻辑表明,一旦方法执行到这里,表示上面的extractor.extractText 已经超过了我们预设的时间,所以为了避免占用过多的内存,需要把内存中的数据拷贝到磁盘上,以减少内存的消耗。*/
r = getSwappedOutReader(r);
}
}
return r;
}
});
}TextExtractorReader(TextExtractorJob job, Executor executor, long timeout) {
this.job = job;
this.executor = executor;
this.timeout = timeout;
}protected Fieldable createFulltextField(Reader value) {
if (supportHighlighting) {
return new LazyTextExtractorField(FieldNames.FULLTEXT, value, true, true);
} else {
return new LazyTextExtractorField(FieldNames.FULLTEXT, value, false, false);
}
}public String stringValue() {
if (extract == null) {
StringBuffer textExtract = new StringBuffer();
char[] buffer = new char[1024];
int len;
try {
/*这里的读取操作有一定的迷惑性,其实它是调用了,考虑到这里的reader是TextExtractorReader,所以耗无疑问,这里的方法一定是调用了TextExtractorReader#read */
while ((len = reader.read(buffer)) > -1) {
textExtract.append(buffer, 0, len);
}
} catch (IOException e) {
log.warn("Exception reading value for field: "
+ e.getMessage());
log.debug("Dump:", e);
} finally {
try {
reader.close();
} catch (IOException e) {
// ignore
}
}
extract = textExtract.toString();
}
return extract;
}public int read(char cbuf[]) throws IOException {
return read(cbuf, 0, cbuf.length);
}
而其实目标的方法是子类的read(cbuf, 0, cbuf.length);也就是说应该是TextExtractorReader# read(cbuf, 0, cbuf.length),那么我们来看一下这个方法TextExtractorReader# read:这个read方法才是真正为LazyTextExtractorField#stringValue返回数据的地方
public int read(char cbuf[], int off, int len) throws IOException {
/*事实上,这里的extractedText应该是不会等于空的,因为如果是异步提取,那么在lucene调用read方法之前,extract线程早就把extractedText提取出来,或者将其置为new StringReader(“”)了,除非一个document在isExtractorFinished返回false的时候,也被加入了 indexwriter。在正常流程的debug过程中,ahuaxuan并没有发现程序执行到这个if块里面*/
if (extractedText == null) {
// no reader present
// check if job is started already
if (jobStarted) {
// wait until available
/*如果任务已经开始,那么等待读取文本,那么如果任务开始,这个开始是谁开始的呢?其实就是后面要讲到的第二种触发方式,这里是无限时等待,太恐怖了,所以这个代码段是存在危险的,实际上,如果我们如果在每个单独的Extractor类中设置专门的超时时间,那么这里就没有问题了。*/
extractedText = job.getReader(Long.MAX_VALUE);
} else {
// execute with current thread
/*如果提取任务还没有被触发,则把该线程实例放到DIRECT_EXECUTOR这个线程池中运行,注意,其实这个类很诡异,它并不是一个线程池,它拿到线程对象之后会调用run方法,而不是start,这也意味着这个操作是非异步的,这里不会有多线程的问题 */
try {
DIRECT_EXECUTOR.execute(job);
} catch (InterruptedException e) {
// current thread is in interrupted state
// -> ignore (job will not return a reader, which is fine)
}
/*同步执行,立即取结果,取不到就返回null*/
extractedText = job.getReader(0);
}
if (extractedText == null) {
// exception occurred
extractedText = new StringReader("");
}
}
return extractedText.read(cbuf, off, len);
}public boolean isExtractorFinished() {
if (!jobStarted) {
try {
/*如果任务还没有开始,那么则把job放到线程池中执行,并且将jobStarted设置为true, */
executor.execute(job);
jobStarted = true;
} catch (InterruptedException e) {
// this thread is in interrupted state
return false;
}
/*然后开始提取的操作,同时,提取的时候有一个超时时间,代表超过多长时间就直接返回,默认时间是100毫秒,在repository.xml中的 SearchIndex节点中可以配置,一旦超时之后,reader就会被写入到临时文件中,下次读取就从临时文件中读取*/
extractedText = job.getReader(timeout);
} else {
// job is already running, check for immediate result
/*如果任务已经在上次检查中被触发过了,那么就直接获取提取之后的结果,如果还没有提取完,0表示没有提取完则抛出TimeoutException,但是getReader会捕获这个异常,并返回一个null*/
extractedText = job.getReader(0);
}
/*如果extractedText并且提取的时候出现异常,超时等,那么就置其为空*/
if (extractedText == null && job.getException() != null) {
// exception occurred
extractedText = new StringReader("");
}
return extractedText != null;
}void addDocuments(Document[] docs) throws IOException {
final IndexWriter writer = getIndexWriter();
DynamicPooledExecutor.Command commands[] =
new DynamicPooledExecutor.Command[docs.length];
for (int i = 0; i < docs.length; i++) {
// check if text extractor completed its work
/*尤其是要注意这个方法,这个方法预示着什么,到底是什么呢?这个方法预示着一个document在进入这个方法之前已经触发了文本提取的操作,奇怪哦,其实不奇怪,需要文本提取的document是会二进宫的,这个由消费者逻辑来控制的,不过还是让我们先来看看生产者的逻辑吧。*/
final Document doc = getFinishedDocument(docs[i]);
// create a command for inverting the document
commands[i] = new DynamicPooledExecutor.Command() {
public Object call() throws Exception {
long time = System.currentTimeMillis();
writer.addDocument(doc);
return new Long(System.currentTimeMillis() - time);
}
};
}
}private Document getFinishedDocument(Document doc) throws IOException {
/* Util.isDocumentReady(doc)方法非常之十分重要,如果一眼带过(新成语)我们就会错过精彩的细节,正是在这个方法中,我们的提取工作开始了,还记得上一篇文章中的TextExtractorReader#isExtractorFinished方法吗,这个方法会判断,如果开始就等100毫秒,等待返回,否则就返回false,那么返回的flase就是用在了下面的if方法中。代表还没有提取完成。如果没有提取完成,就进入了if 的代码块*/
if (!Util.isDocumentReady(doc)) {
/*从这里可以看出,超过100毫秒,那么就创建另外一个document对象,然后把这个原始的document的值拷贝给这个新对象,需要注意的是如果field是LazyTextExtractorField 的话,那么就先把这个field置空*/
Document copy = new Document();
for (Iterator fields = doc.getFields().iterator(); fields.hasNext(); ) {
Fieldable f = (Fieldable) fields.next();
Fieldable field = null;
Field.TermVector tv = getTermVectorParameter(f);
Field.Store stored = getStoreParameter(f);
Field.Index indexed = getIndexParameter(f);
if (f instanceof LazyTextExtractorField || f.readerValue() != null) {
// replace all readers with empty string reader
field = new Field(f.name(), new StringReader(""), tv);
} else if (f.stringValue() != null) {
field = new Field(f.name(), f.stringValue(),
stored, indexed, tv);
} else if (f.isBinary()) {
field = new Field(f.name(), f.binaryValue(), stored);
}
if (field != null) {
field.setOmitNorms(f.getOmitNorms());
copy.add(field);
}
}
// schedule the original document for later indexing
/*在这里,生产者终于把原始的document对象加入了indexingQueue队列。*/
Document existing = indexingQueue.addDocument(doc);
if (existing != null) {
/*如果之前这个nodeId在做索引的时候由于异常原因,jvm退出,那么在redolog和indexingqueuelog中都存在这个nodeid,那么在这个地方,可能就返回一个indexingqueue中已经存在的document了 */
// the queue already contained a pending document for this
// node. -> dispose the document
Util.disposeDocument(existing);
}
// use the stripped down copy for now
doc = copy;
}
return doc;
} Public MultiIndex() {
flushTask = new Timer();
flushTask.schedule(new TimerTask() {
public void run() {
// check if there are any indexing jobs finished
/*英语注释写得还是比较清楚的,就是用来检查是否有提取的任务完成了,很显然这个timer背后的线程就是一个消费者,专门用来处理indexingQueue中的数据。接着,让我们到checkIndexingQueue的方法中走走*/
checkIndexingQueue();
// check if volatile index should be flushed
checkFlush();
}
}, 0, 1000);
}private synchronized void checkIndexingQueue() {
/*找到所有提取完成的document的列表,那么如果提出还没有完成,咋办呢,不等待,直接返回new StringReader(""),这个逻辑在TextExtractorReader#isExtractorFinished*/
Document[] docs = indexingQueue.getFinishedDocuments();
Map finished = new HashMap();
for (int i = 0; i < docs.length; i++) {
String uuid = docs[i].get(FieldNames.UUID);
finished.put(UUID.fromString(uuid), docs[i]);
}
// now update index with the remaining ones if there are any
if (!finished.isEmpty()) {
log.debug("updating index with {} nodes from indexing queue.",
new Long(finished.size()));
// remove documents from the queue
for (Iterator it = finished.keySet().iterator(); it.hasNext(); ) {
try {
indexingQueue.removeDocument(it.next().toString());
} catch (IOException e) {
log.error("Failed to remove node from indexing queue", e);
}
}
/*这里又是调用update方法,在前面的文章中,我们已经详细的分析过了update方法会执行哪些重要的操作,他们分别是deleteNode,addNode,flush*/
try {
update(finished.keySet().iterator(),
finished.values().iterator());
} catch (IOException e) {
// update failed
log.warn("Failed to update index with deferred text extraction", e);
}
}
}