lucene-java-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jamie Band <ja...@stimulussoft.com>
Subject Re: Index.close() infinite TIME_WAITING
Date Fri, 09 Oct 2009 08:47:53 GMT
Hi Michael

Thanks for your help. Here are the stacks:

index processor [TIME_WAITING] CPU time: 33:01
java.lang.Object.wait(long)
org.apache.lucene.index.IndexWriter.doWait()
org.apache.lucene.index.IndexWriter.shouldClose()
org.apache.lucene.index.IndexWriter.close(boolean)
org.apache.lucene.index.IndexWriter.close()
com.stimulus.archiva.index.VolumeIndex.closeIndex()
com.stimulus.archiva.index.VolumeIndex$IndexProcessor.run()

The source code to our indexer is attached. As you can see, documents 
are added to a blocking queue. The index processor thread takes it out 
of the queue and processes it. After about 60k documents IndexWriter's 
close method enters TIME_WAITING indefinitely. It there any workaround 
to this problem?


package com.stimulus.archiva.index;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import javax.mail.MessagingException;
import org.apache.commons.logging.*;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.*;
import org.apache.lucene.store.FSDirectory;
import com.stimulus.archiva.domain.Config;
import com.stimulus.archiva.domain.Email;
import com.stimulus.archiva.domain.EmailID;
import com.stimulus.archiva.domain.Indexer;
import com.stimulus.archiva.domain.Volume;
import com.stimulus.archiva.exception.*;
import com.stimulus.archiva.language.AnalyzerFactory;
import com.stimulus.archiva.search.*;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.AlreadyClosedException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.*;

public class VolumeIndex extends Thread {
   
         protected ArrayBlockingQueue<IndexInfo> queue;
         protected static final Log logger = 
LogFactory.getLog(VolumeIndex.class.getName());
            IndexWriter writer = null;
            Volume volume;
            protected static ScheduledExecutorService scheduler;
         protected static ScheduledFuture<?> scheduledTask;
         protected static IndexInfo EXIT_REQ = new IndexInfo(null);
         ReentrantLock indexLock = new ReentrantLock();
         ArchivaAnalyzer analyzer     = new ArchivaAnalyzer();
         Indexer indexer = null;
         File indexLogFile;
         PrintStream indexLogOut;
         IndexProcessor indexProcessor;
       
         
            public VolumeIndex(Indexer indexer, Volume volume) {
                logger.debug("creating new volume index {"+volume+"}");
                this.volume = volume;
                this.indexer = indexer;
                this.queue = new 
ArrayBlockingQueue<IndexInfo>(Config.getConfig().getIndex().getIndexBacklog());
             
                try {
                    indexLogFile = getIndexLogFile(volume);
                    if (indexLogFile!=null) {
                        if (indexLogFile.length()>10485760)
                            indexLogFile.delete();
                        indexLogOut = new PrintStream(indexLogFile);
                    }
                    logger.debug("set index log file path 
{path='"+indexLogFile.getCanonicalPath()+"'}");
                } catch (Exception e) {
                    logger.error("failed to open index log 
file:"+e.getMessage(),e);
                }
                startup();
            }
       
          protected File getIndexLogFile(Volume volume) {
               try {
                    String indexpath = volume.getIndexPath();
                    int lio = indexpath.lastIndexOf(File.separator)+1;
                    String logfilepath = 
indexpath.substring(lio,indexpath.length()-1);
                    logfilepath += ".log";
                    logfilepath = "index_"+logfilepath;
                    logfilepath = 
Config.getFileSystem().getLogPath()+File.separator+logfilepath;
                    return new File(logfilepath);
                } catch (Exception e) {
                    logger.error("failed to open index log 
file:"+e.getMessage(),e);
                    return null;
                }
          }
         
         public void deleteMessages(List<String> ids) throws 
MessageSearchException {
              if (ids == null)
                    throw new MessageSearchException("assertion failure: 
null ids",logger);
             
              Term[] terms = new Term[ids.size()];
              int c = 0;
              StringBuffer deleteInfo = new StringBuffer();
              for (String id : ids) {
                  terms[c++] = new Term("uid",id);
                  deleteInfo.append(id);
                  deleteInfo.append(",");
              }
             
              String deleteStr = deleteInfo.toString();
              if (deleteStr.length()>0 && 
deleteStr.charAt(deleteStr.length()-1)==',')
                  deleteStr = deleteStr.substring(0,deleteStr.length()-1);
             
              logger.debug("delete messages {'"+deleteInfo+"'}");
              try {
                  indexLock.lock();
                  openIndex();
                  try {
                      writer.deleteDocuments(terms);
                      writer.expungeDeletes();
                  } catch (Exception e) {
                      throw new MessageSearchException("failed to delete 
email from index.",e,logger);
                  } finally {
                     
                  }
              } finally {
                  closeIndex();
                  indexLock.unlock();
              }
        }
       
          protected void openIndex() throws MessageSearchException {
             Exception lastError = null;
           
            if (writer==null) {
                logger.debug("openIndex() index will be opened. it is 
currently closed.");
            } else {
                logger.debug("openIndex() did not bother opening index. 
it is already open.");
                return;
            }
            logger.debug("opening index for write {"+volume+"}");
            indexer.prepareIndex(volume);
            logger.debug("opening search index for write 
{indexpath='"+volume.getIndexPath()+"'}");
            boolean writelock;
            int attempt = 0;
            int maxattempt = 10;
           
            if (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
                maxattempt = 10000;
             } else {
                maxattempt = 10;
             }
           
            do {
                writelock = false;
                try {
                        FSDirectory fsDirectory = 
FSDirectory.getDirectory(volume.getIndexPath());
                        int maxIndexChars = 
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
                        writer = new 
IndexWriter(fsDirectory,analyzer,new 
IndexWriter.MaxFieldLength(maxIndexChars));
                        if (logger.isDebugEnabled() && indexLogOut!=null) {
                            writer.setInfoStream(indexLogOut);
                        }
                } catch (LockObtainFailedException lobfe) {
                        logger.debug("write lock on index. will reopen 
in 50ms.");
                        try { Thread.sleep(50); } catch (Exception e) {}
                        attempt++;
                        writelock = true;
                } catch (CorruptIndexException cie) {
                    throw new MessageSearchException("index appears to 
be corrupt. please reindex the active volume."+cie.getMessage(),logger);
                } catch (IOException io) {
                    throw new MessageSearchException("failed to write 
document to index:"+io.getMessage(),logger);
                }
           } while (writelock && attempt<maxattempt);
           if (attempt>=10000)
             throw new MessageSearchException("failed to open index 
writer {location='"+volume.getIndexPath()+"'}",lastError,logger);
        }
       
        public void indexMessage(Email message) throws 
MessageSearchException  {
            logger.debug("index message {"+message+"}");
            long s = (new Date()).getTime();
            if (message == null)
                throw new MessageSearchException("assertion failure: 
null message",logger);
            Document doc = new Document();
            IndexInfo indexInfo = new IndexInfo(doc);
            try {
               DocumentIndex docIndex = new DocumentIndex(indexer);
               String language = doc.get("lang");
               if (language==null)
                   language = indexer.getIndexLanguage();
               docIndex.write(message,doc,indexInfo);
               queue.put(indexInfo);
               logger.debug("message indexed successfully 
{"+message+",language='"+language+"'}");
            } catch (MessagingException me) {
               throw new MessageSearchException("failed to decode 
message during indexing",me,logger, ChainedException.Level.DEBUG);
            } catch (IOException me) {
                throw new MessageSearchException("failed to index 
message"+me.getMessage()+" {"+message+"}",me,logger, 
ChainedException.Level.DEBUG);
            } catch (ExtractionException ee)
            {
                // we will want to continue indexing
               //throw new MessageSearchException("failed to decode 
attachments in message {"+message+"}",ee,logger, 
ChainedException.Level.DEBUG);
            } catch (AlreadyClosedException ace) {
                indexMessage(message);
            } catch (Throwable e) {
                throw new MessageSearchException("failed to index 
message:"+e.getMessage(),e,logger, ChainedException.Level.DEBUG);
            }
            logger.debug("indexing message end {"+message+"}");
            long e = (new Date()).getTime();
            logger.debug("indexing time {time='"+(e-s)+"'}");
        }
   
        public class IndexProcessor extends Thread {
           
            public IndexProcessor() {
                setName("index processor");
            }
     
            public void run() {
                boolean exit = false;
                //ExecutorService documentPool;
                // we abandoned pool as it does not seem to offer any 
major performance benefit
                IndexInfo indexInfo = null;   
                LinkedList<IndexInfo> pushbacks = new 
LinkedList<IndexInfo>();
               
                while (!exit) {
                   
                    try {
                        int maxIndexDocs = 
Config.getConfig().getIndex().getMaxSimultaneousDocs();
                        //documentPool = 
Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
                        indexInfo = null;   
                        indexInfo = (IndexInfo) queue.take();
                        if (indexInfo==EXIT_REQ) {
                            logger.debug("index exit req received. 
exiting");
                            exit = true;
                            continue;
                        }
                   
                        indexLock.lock();
                        try {
                             openIndex();
                        } catch (Exception e) {
                             logger.error("failed to open 
index:"+e.getMessage(),e);
                             return;
                        }
                        if (indexInfo==null) {
                            logger.debug("index info is null");
                        }
                        int i = 0;
                        while(indexInfo!=null && i<maxIndexDocs) {
                            try {
                                writer.addDocument(indexInfo.getDocument());
                            } catch (IOException io) {
                                logger.error("failed to add document to 
index:"+io.getMessage(),io);
                            } catch (AlreadyClosedException e) {
                                pushbacks.add(indexInfo);
                            } finally {
                                indexInfo.cleanup();
                            }
                           
                              //documentPool.execute(new 
IndexDocument(indexInfo,pushbacks));
                           
                             i++;
                             
                             if (i<maxIndexDocs) {
                                 indexInfo = (IndexInfo) queue.poll();
                                 
                                 if (indexInfo==null) {
                                        logger.debug("index info is null");
                                 }
                                 
                                 if (indexInfo==EXIT_REQ) {
                                         logger.debug("index exit req 
received. exiting (2)");
                                        exit = true;
                                        break;
                                  }
                             }
                             
                        }
                       
                        for (IndexInfo pushback : pushbacks) {
                            try {
                                writer.addDocument(pushback.getDocument());
                            } catch (IOException io) {
                                logger.error("failed to add document to 
index:"+io.getMessage(),io);
                            } catch (AlreadyClosedException e) {
                                pushbacks.add(indexInfo);
                            } finally {
                                indexInfo.cleanup();
                            }
                            //documentPool.execute(new 
IndexDocument(pushback,pushbacks));
                            i++;
                        }
                        //documentPool.shutdown();
                        
//documentPool.awaitTermination(30,TimeUnit.MINUTES);
                       
                     } catch (Throwable ie) {
                         logger.error("index write 
interrupted:"+ie.getMessage());
                     } finally {
                           closeIndex();
                          indexLock.unlock();
                    }
                }   
               }
           
            public class IndexDocument extends Thread {
               
                    IndexInfo indexInfo = null;
                    List<IndexInfo> pushbacks = null;
                   
                    public IndexDocument(IndexInfo 
indexInfo,List<IndexInfo> pushbacks) {
                        this.indexInfo = indexInfo;
                        this.pushbacks = pushbacks;
                        setName("index document");
                    }
               
                    public void run() {
                        try {
                            writer.addDocument(indexInfo.getDocument());
                        } catch (IOException io) {
                            logger.error("failed to add document to 
index:"+io.getMessage(),io);
                        } catch (AlreadyClosedException e) {
                            pushbacks.add(indexInfo);
                        }
                    }};
            }
     
        protected void closeIndex() {
             try {
                     if (writer!=null) {
                        writer.close();
                        logger.debug("writer closed");
                        writer = null;
                     }
                 } catch (Exception io) {
                    logger.error("failed to close index 
writer:"+io.getMessage(),io);
                 }
        }
   
          public void deleteIndex() throws MessageSearchException {
                   logger.debug("delete index 
{indexpath='"+volume.getIndexPath()+"'}");
                   try {
                      indexLock.lock();
                     try {
                        int maxIndexChars = 
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
                        writer = new 
IndexWriter(FSDirectory.getDirectory(volume.getIndexPath()),analyzer,true,new 
IndexWriter.MaxFieldLength(maxIndexChars));
                     } catch (Exception cie) {
                         logger.error("failed to delete index 
{index='"+volume.getIndexPath()+"'}",cie);
                         return;
                     }
                     MessageIndex.volumeIndexes.remove(this);
                  } finally {
                        closeIndex();
                      indexLock.unlock();
                }
          }
       
          public void startup() {
            logger.debug("volumeindex is starting up");
            File lockFile = new 
File(volume.getIndexPath()+File.separatorChar + "write.lock");
            if (lockFile.exists()) {
                logger.warn("The server lock file already exists. Either 
another indexer is running or the server was not shutdown correctly.");
                logger.warn("If it is the latter, the lock file must be 
manually deleted at "+lockFile.getAbsolutePath());
                if (indexer.getMultipleIndexProcesses()) {
                    logger.debug("index lock file detected on 
volumeindex startup.");
                } else {
                    logger.warn("index lock file detected. the server 
was shutdown incorrectly. automatically deleting lock file.");
                    logger.warn("indexer is configured to deal with only 
one indexer process.");
                    logger.warn("if you are running more than one 
indexer, your index could be subject to corruption.");
                    lockFile.delete();
                }
            }
            indexProcessor = new IndexProcessor();
            indexProcessor.start();
            Runtime.getRuntime().addShutdownHook(this);
           
          }
         
         
         
          public void shutdown() {
              logger.debug("volumeindex is shutting down");
              queue.add(EXIT_REQ);
              scheduler.shutdownNow();
            
          }
         
          @Override
          public void run() {
              queue.add(EXIT_REQ);
          }
         
     
}

   



Is it possible a large merge is running?  By default IW.close waits
for outstanding merges to complete.  Can you post the stacktrace?

Mike

On Thu, Oct 8, 2009 at 5:22 PM, Jamie Band <jamie@stimulussoft.com> wrote:
> Hi All
>
> I have a long running situation where our indexing thread is getting stuck
> indefinitely in IndexWriter's close method. Yourkit shows the thread to be
> stuck in TIME_WAITING. Any idea's on what could be causing this?
> Could it be one of the streams or readers we passed to the document?
>
> I am running Lucene 2.9.0.
>
> Many thanks in advance
>
> Jamie
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
> For additional commands, e-mail: java-user-help@lucene.apache.org
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
For additional commands, e-mail: java-user-help@lucene.apache.org




---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
For additional commands, e-mail: java-user-help@lucene.apache.org


Mime
View raw message