lucene-java-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael McCandless <luc...@mikemccandless.com>
Subject Re: Index.close() infinite TIME_WAITING
Date Sun, 18 Oct 2009 13:10:38 GMT
Jamie did you ever get to the bottom of this?

Can you reduce this code down to a smaller example that shows the hang?

Also, can you post a thread stack dump when you hit the hang?

Is it possible you are adding documents from one thread while calling
IndexWriter.close in another?  I see you have "catch
AlreadyClosedException" when adding docs -- do you ever actually hit
that?

Mike

On Fri, Oct 9, 2009 at 4:47 AM, Jamie Band <jamie@stimulussoft.com> wrote:
> Hi Michael
>
> Thanks for your help. Here are the stacks:
>
> index processor [TIME_WAITING] CPU time: 33:01
> java.lang.Object.wait(long)
> org.apache.lucene.index.IndexWriter.doWait()
> org.apache.lucene.index.IndexWriter.shouldClose()
> org.apache.lucene.index.IndexWriter.close(boolean)
> org.apache.lucene.index.IndexWriter.close()
> com.stimulus.archiva.index.VolumeIndex.closeIndex()
> com.stimulus.archiva.index.VolumeIndex$IndexProcessor.run()
>
> The source code to our indexer is attached. As you can see, documents are
> added to a blocking queue. The index processor thread takes it out of the
> queue and processes it. After about 60k documents IndexWriter's close method
> enters TIME_WAITING indefinitely. It there any workaround to this problem?
>
>
> package com.stimulus.archiva.index;
>
> import java.io.File;
> import java.io.IOException;
> import java.io.PrintStream;
> import javax.mail.MessagingException;
> import org.apache.commons.logging.*;
> import org.apache.lucene.document.Document;
> import org.apache.lucene.index.*;
> import org.apache.lucene.store.FSDirectory;
> import com.stimulus.archiva.domain.Config;
> import com.stimulus.archiva.domain.Email;
> import com.stimulus.archiva.domain.EmailID;
> import com.stimulus.archiva.domain.Indexer;
> import com.stimulus.archiva.domain.Volume;
> import com.stimulus.archiva.exception.*;
> import com.stimulus.archiva.language.AnalyzerFactory;
> import com.stimulus.archiva.search.*;
> import java.util.*;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.ScheduledExecutorService;
> import java.util.concurrent.ScheduledFuture;
> import java.util.concurrent.TimeUnit;
> import org.apache.lucene.store.LockObtainFailedException;
> import org.apache.lucene.store.AlreadyClosedException;
> import java.util.concurrent.locks.ReentrantLock;
> import java.util.concurrent.*;
>
> public class VolumeIndex extends Thread {
>          protected ArrayBlockingQueue<IndexInfo> queue;
>        protected static final Log logger =
> LogFactory.getLog(VolumeIndex.class.getName());
>           IndexWriter writer = null;
>           Volume volume;
>           protected static ScheduledExecutorService scheduler;
>        protected static ScheduledFuture<?> scheduledTask;
>        protected static IndexInfo EXIT_REQ = new IndexInfo(null);
>        ReentrantLock indexLock = new ReentrantLock();
>        ArchivaAnalyzer analyzer     = new ArchivaAnalyzer();
>        Indexer indexer = null;
>        File indexLogFile;
>        PrintStream indexLogOut;
>        IndexProcessor indexProcessor;
>                         public VolumeIndex(Indexer indexer, Volume volume)
{
>               logger.debug("creating new volume index {"+volume+"}");
>               this.volume = volume;
>               this.indexer = indexer;
>               this.queue = new
> ArrayBlockingQueue<IndexInfo>(Config.getConfig().getIndex().getIndexBacklog());
>                           try {
>                   indexLogFile = getIndexLogFile(volume);
>                   if (indexLogFile!=null) {
>                       if (indexLogFile.length()>10485760)
>                           indexLogFile.delete();
>                       indexLogOut = new PrintStream(indexLogFile);
>                   }
>                   logger.debug("set index log file path
> {path='"+indexLogFile.getCanonicalPath()+"'}");
>               } catch (Exception e) {
>                   logger.error("failed to open index log
> file:"+e.getMessage(),e);
>               }
>               startup();
>           }
>               protected File getIndexLogFile(Volume volume) {
>              try {
>                   String indexpath = volume.getIndexPath();
>                   int lio = indexpath.lastIndexOf(File.separator)+1;
>                   String logfilepath =
> indexpath.substring(lio,indexpath.length()-1);
>                   logfilepath += ".log";
>                   logfilepath = "index_"+logfilepath;
>                   logfilepath =
> Config.getFileSystem().getLogPath()+File.separator+logfilepath;
>                   return new File(logfilepath);
>               } catch (Exception e) {
>                   logger.error("failed to open index log
> file:"+e.getMessage(),e);
>                   return null;
>               }
>         }
>                public void deleteMessages(List<String> ids) throws
> MessageSearchException {
>             if (ids == null)
>                   throw new MessageSearchException("assertion failure: null
> ids",logger);
>                         Term[] terms = new Term[ids.size()];
>             int c = 0;
>             StringBuffer deleteInfo = new StringBuffer();
>             for (String id : ids) {
>                 terms[c++] = new Term("uid",id);
>                 deleteInfo.append(id);
>                 deleteInfo.append(",");
>             }
>                         String deleteStr = deleteInfo.toString();
>             if (deleteStr.length()>0 &&
> deleteStr.charAt(deleteStr.length()-1)==',')
>                 deleteStr = deleteStr.substring(0,deleteStr.length()-1);
>                         logger.debug("delete messages {'"+deleteInfo+"'}");
>             try {
>                 indexLock.lock();
>                 openIndex();
>                 try {
>                     writer.deleteDocuments(terms);
>                     writer.expungeDeletes();
>                 } catch (Exception e) {
>                     throw new MessageSearchException("failed to delete email
> from index.",e,logger);
>                 } finally {
>                                     }
>             } finally {
>                 closeIndex();
>                 indexLock.unlock();
>             }
>       }
>               protected void openIndex() throws MessageSearchException {
>            Exception lastError = null;
>                     if (writer==null) {
>               logger.debug("openIndex() index will be opened. it is
> currently closed.");
>           } else {
>               logger.debug("openIndex() did not bother opening index. it is
> already open.");
>               return;
>           }
>           logger.debug("opening index for write {"+volume+"}");
>           indexer.prepareIndex(volume);
>           logger.debug("opening search index for write
> {indexpath='"+volume.getIndexPath()+"'}");
>           boolean writelock;
>           int attempt = 0;
>           int maxattempt = 10;
>                     if
> (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
>               maxattempt = 10000;
>            } else {
>               maxattempt = 10;
>            }
>                     do {
>               writelock = false;
>               try {
>                       FSDirectory fsDirectory =
> FSDirectory.getDirectory(volume.getIndexPath());
>                       int maxIndexChars =
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
>                       writer = new IndexWriter(fsDirectory,analyzer,new
> IndexWriter.MaxFieldLength(maxIndexChars));
>                       if (logger.isDebugEnabled() && indexLogOut!=null)
{
>                           writer.setInfoStream(indexLogOut);
>                       }
>               } catch (LockObtainFailedException lobfe) {
>                       logger.debug("write lock on index. will reopen in
> 50ms.");
>                       try { Thread.sleep(50); } catch (Exception e) {}
>                       attempt++;
>                       writelock = true;
>               } catch (CorruptIndexException cie) {
>                   throw new MessageSearchException("index appears to be
> corrupt. please reindex the active volume."+cie.getMessage(),logger);
>               } catch (IOException io) {
>                   throw new MessageSearchException("failed to write document
> to index:"+io.getMessage(),logger);
>               }
>          } while (writelock && attempt<maxattempt);
>          if (attempt>=10000)
>            throw new MessageSearchException("failed to open index writer
> {location='"+volume.getIndexPath()+"'}",lastError,logger);
>       }
>             public void indexMessage(Email message) throws
> MessageSearchException  {
>           logger.debug("index message {"+message+"}");
>           long s = (new Date()).getTime();
>           if (message == null)
>               throw new MessageSearchException("assertion failure: null
> message",logger);
>           Document doc = new Document();
>           IndexInfo indexInfo = new IndexInfo(doc);
>           try {
>              DocumentIndex docIndex = new DocumentIndex(indexer);
>              String language = doc.get("lang");
>              if (language==null)
>                  language = indexer.getIndexLanguage();
>              docIndex.write(message,doc,indexInfo);
>              queue.put(indexInfo);
>              logger.debug("message indexed successfully
> {"+message+",language='"+language+"'}");
>           } catch (MessagingException me) {
>              throw new MessageSearchException("failed to decode message
> during indexing",me,logger, ChainedException.Level.DEBUG);
>           } catch (IOException me) {
>               throw new MessageSearchException("failed to index
> message"+me.getMessage()+" {"+message+"}",me,logger,
> ChainedException.Level.DEBUG);
>           } catch (ExtractionException ee)
>           {
>               // we will want to continue indexing
>              //throw new MessageSearchException("failed to decode
> attachments in message {"+message+"}",ee,logger,
> ChainedException.Level.DEBUG);
>           } catch (AlreadyClosedException ace) {
>               indexMessage(message);
>           } catch (Throwable e) {
>               throw new MessageSearchException("failed to index
> message:"+e.getMessage(),e,logger, ChainedException.Level.DEBUG);
>           }
>           logger.debug("indexing message end {"+message+"}");
>           long e = (new Date()).getTime();
>           logger.debug("indexing time {time='"+(e-s)+"'}");
>       }
>         public class IndexProcessor extends Thread {
>                     public IndexProcessor() {
>               setName("index processor");
>           }
>               public void run() {
>               boolean exit = false;
>               //ExecutorService documentPool;
>               // we abandoned pool as it does not seem to offer any major
> performance benefit
>               IndexInfo indexInfo = null;
> LinkedList<IndexInfo> pushbacks = new LinkedList<IndexInfo>();
>                             while (!exit) {
>                                     try {
>                       int maxIndexDocs =
> Config.getConfig().getIndex().getMaxSimultaneousDocs();
>                       //documentPool =
> Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
>                       indexInfo = null;                        
indexInfo =
> (IndexInfo) queue.take();
>                       if (indexInfo==EXIT_REQ) {
>                           logger.debug("index exit req received. exiting");
>                           exit = true;
>                           continue;
>                       }
>                                         indexLock.lock();
>                       try {
>                            openIndex();
>                       } catch (Exception e) {
>                            logger.error("failed to open
> index:"+e.getMessage(),e);
>                            return;
>                       }
>                       if (indexInfo==null) {
>                           logger.debug("index info is null");
>                       }
>                       int i = 0;
>                       while(indexInfo!=null && i<maxIndexDocs)
{
>                           try {
>                               writer.addDocument(indexInfo.getDocument());
>                           } catch (IOException io) {
>                               logger.error("failed to add document to
> index:"+io.getMessage(),io);
>                           } catch (AlreadyClosedException e) {
>                               pushbacks.add(indexInfo);
>                           } finally {
>                               indexInfo.cleanup();
>                           }
>
> //documentPool.execute(new IndexDocument(indexInfo,pushbacks));
>                                                      i++;
>                                                        if
(i<maxIndexDocs) {
>                                indexInfo = (IndexInfo) queue.poll();
>                                                          
     if
> (indexInfo==null) {
>                                       logger.debug("index info is
null");
>                                }
>                                                          
     if
> (indexInfo==EXIT_REQ) {
>                                        logger.debug("index exit req
> received. exiting (2)");
>                                       exit = true;
>                                       break;
>                                 }
>                            }
>                                                   }
>                                             for (IndexInfo pushback
:
> pushbacks) {
>                           try {
>                               writer.addDocument(pushback.getDocument());
>                           } catch (IOException io) {
>                               logger.error("failed to add document to
> index:"+io.getMessage(),io);
>                           } catch (AlreadyClosedException e) {
>                               pushbacks.add(indexInfo);
>                           } finally {
>                               indexInfo.cleanup();
>                           }
>                           //documentPool.execute(new
> IndexDocument(pushback,pushbacks));
>                           i++;
>                       }
>                       //documentPool.shutdown();
>                       //documentPool.awaitTermination(30,TimeUnit.MINUTES);
>                                          } catch (Throwable ie)
{
>                        logger.error("index write
> interrupted:"+ie.getMessage());
>                    } finally {
>                          closeIndex();
>                         indexLock.unlock();
>                   }
>               }                }
>                     public class IndexDocument extends Thread {
>                                 IndexInfo indexInfo = null;
>                   List<IndexInfo> pushbacks = null;
>                                     public IndexDocument(IndexInfo
> indexInfo,List<IndexInfo> pushbacks) {
>                       this.indexInfo = indexInfo;
>                       this.pushbacks = pushbacks;
>                       setName("index document");
>                   }
>                                 public void run() {
>                       try {
>                           writer.addDocument(indexInfo.getDocument());
>                       } catch (IOException io) {
>                           logger.error("failed to add document to
> index:"+io.getMessage(),io);
>                       } catch (AlreadyClosedException e) {
>                           pushbacks.add(indexInfo);
>                       }
>                   }};
>           }
>           protected void closeIndex() {
>            try {
>                    if (writer!=null) {
>                       writer.close();
>                       logger.debug("writer closed");
>                       writer = null;
>                    }
>                } catch (Exception io) {
>                   logger.error("failed to close index
> writer:"+io.getMessage(),io);
>                }
>       }
>           public void deleteIndex() throws MessageSearchException {
>                  logger.debug("delete index
> {indexpath='"+volume.getIndexPath()+"'}");
>                  try {
>                     indexLock.lock();
>                    try {
>                       int maxIndexChars =
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
>                       writer = new
> IndexWriter(FSDirectory.getDirectory(volume.getIndexPath()),analyzer,true,new
> IndexWriter.MaxFieldLength(maxIndexChars));
>                    } catch (Exception cie) {
>                        logger.error("failed to delete index
> {index='"+volume.getIndexPath()+"'}",cie);
>                        return;
>                    }
>                    MessageIndex.volumeIndexes.remove(this);
>                 } finally {
>                       closeIndex();
>                     indexLock.unlock();
>               }
>         }
>               public void startup() {
>           logger.debug("volumeindex is starting up");
>           File lockFile = new File(volume.getIndexPath()+File.separatorChar
> + "write.lock");
>           if (lockFile.exists()) {
>               logger.warn("The server lock file already exists. Either
> another indexer is running or the server was not shutdown correctly.");
>               logger.warn("If it is the latter, the lock file must be
> manually deleted at "+lockFile.getAbsolutePath());
>               if (indexer.getMultipleIndexProcesses()) {
>                   logger.debug("index lock file detected on volumeindex
> startup.");
>               } else {
>                   logger.warn("index lock file detected. the server was
> shutdown incorrectly. automatically deleting lock file.");
>                   logger.warn("indexer is configured to deal with only one
> indexer process.");
>                   logger.warn("if you are running more than one indexer,
> your index could be subject to corruption.");
>                   lockFile.delete();
>               }
>           }
>           indexProcessor = new IndexProcessor();
>           indexProcessor.start();
>           Runtime.getRuntime().addShutdownHook(this);
>                   }
>                                 public void shutdown() {
>             logger.debug("volumeindex is shutting down");
>             queue.add(EXIT_REQ);
>             scheduler.shutdownNow();
>                   }
>                 @Override
>         public void run() {
>             queue.add(EXIT_REQ);
>         }
>            }
>
>
>
>
> Is it possible a large merge is running?  By default IW.close waits
> for outstanding merges to complete.  Can you post the stacktrace?
>
> Mike
>
> On Thu, Oct 8, 2009 at 5:22 PM, Jamie Band <jamie@stimulussoft.com> wrote:
>>
>> Hi All
>>
>> I have a long running situation where our indexing thread is getting stuck
>> indefinitely in IndexWriter's close method. Yourkit shows the thread to be
>> stuck in TIME_WAITING. Any idea's on what could be causing this?
>> Could it be one of the streams or readers we passed to the document?
>>
>> I am running Lucene 2.9.0.
>>
>> Many thanks in advance
>>
>> Jamie
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
>> For additional commands, e-mail: java-user-help@lucene.apache.org
>>
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
> For additional commands, e-mail: java-user-help@lucene.apache.org
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
> For additional commands, e-mail: java-user-help@lucene.apache.org
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
For additional commands, e-mail: java-user-help@lucene.apache.org


Mime
View raw message