lucene-java-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jamie Band <ja...@stimulussoft.com>
Subject Re: Index.close() infinite TIME_WAITING (repost)
Date Fri, 09 Oct 2009 09:04:12 GMT
HI Michael / Uwe / others

Sorry for the repost... it just does not look like the earlier message I 
sent go through.
FYI: there are no large Lucene merges taking place.

Jamie Band wrote:
> Hi Michael
>
> Thanks for your help. Here are the stacks:
>
> index processor [TIME_WAITING] CPU time: 33:01
> java.lang.Object.wait(long)
> org.apache.lucene.index.IndexWriter.doWait()
> org.apache.lucene.index.IndexWriter.shouldClose()
> org.apache.lucene.index.IndexWriter.close(boolean)
> org.apache.lucene.index.IndexWriter.close()
> com.stimulus.archiva.index.VolumeIndex.closeIndex()
> com.stimulus.archiva.index.VolumeIndex$IndexProcessor.run()
>
> The source code to our indexer is attached. As you can see, documents 
> are added to a blocking queue. The index processor thread takes it out 
> of the queue and processes it. After about 60k documents IndexWriter's 
> close method enters TIME_WAITING indefinitely. It there any workaround 
> to this problem?
>
>
> package com.stimulus.archiva.index;
>
> import java.io.File;
> import java.io.IOException;
> import java.io.PrintStream;
> import javax.mail.MessagingException;
> import org.apache.commons.logging.*;
> import org.apache.lucene.document.Document;
> import org.apache.lucene.index.*;
> import org.apache.lucene.store.FSDirectory;
> import com.stimulus.archiva.domain.Config;
> import com.stimulus.archiva.domain.Email;
> import com.stimulus.archiva.domain.EmailID;
> import com.stimulus.archiva.domain.Indexer;
> import com.stimulus.archiva.domain.Volume;
> import com.stimulus.archiva.exception.*;
> import com.stimulus.archiva.language.AnalyzerFactory;
> import com.stimulus.archiva.search.*;
> import java.util.*;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.ScheduledExecutorService;
> import java.util.concurrent.ScheduledFuture;
> import java.util.concurrent.TimeUnit;
> import org.apache.lucene.store.LockObtainFailedException;
> import org.apache.lucene.store.AlreadyClosedException;
> import java.util.concurrent.locks.ReentrantLock;
> import java.util.concurrent.*;
>
> public class VolumeIndex extends Thread {
>           protected ArrayBlockingQueue<IndexInfo> queue;
>         protected static final Log logger = 
> LogFactory.getLog(VolumeIndex.class.getName());
>            IndexWriter writer = null;
>            Volume volume;
>            protected static ScheduledExecutorService scheduler;
>         protected static ScheduledFuture<?> scheduledTask;
>         protected static IndexInfo EXIT_REQ = new IndexInfo(null);
>         ReentrantLock indexLock = new ReentrantLock();
>         ArchivaAnalyzer analyzer     = new ArchivaAnalyzer();
>         Indexer indexer = null;
>         File indexLogFile;
>         PrintStream indexLogOut;
>         IndexProcessor indexProcessor;
>                          public VolumeIndex(Indexer indexer, Volume 
> volume) {
>                logger.debug("creating new volume index {"+volume+"}");
>                this.volume = volume;
>                this.indexer = indexer;
>                this.queue = new 
> ArrayBlockingQueue<IndexInfo>(Config.getConfig().getIndex().getIndexBacklog());

>
>                            try {
>                    indexLogFile = getIndexLogFile(volume);
>                    if (indexLogFile!=null) {
>                        if (indexLogFile.length()>10485760)
>                            indexLogFile.delete();
>                        indexLogOut = new PrintStream(indexLogFile);
>                    }
>                    logger.debug("set index log file path 
> {path='"+indexLogFile.getCanonicalPath()+"'}");
>                } catch (Exception e) {
>                    logger.error("failed to open index log 
> file:"+e.getMessage(),e);
>                }
>                startup();
>            }
>                protected File getIndexLogFile(Volume volume) {
>               try {
>                    String indexpath = volume.getIndexPath();
>                    int lio = indexpath.lastIndexOf(File.separator)+1;
>                    String logfilepath = 
> indexpath.substring(lio,indexpath.length()-1);
>                    logfilepath += ".log";
>                    logfilepath = "index_"+logfilepath;
>                    logfilepath = 
> Config.getFileSystem().getLogPath()+File.separator+logfilepath;
>                    return new File(logfilepath);
>                } catch (Exception e) {
>                    logger.error("failed to open index log 
> file:"+e.getMessage(),e);
>                    return null;
>                }
>          }
>                 public void deleteMessages(List<String> ids) throws 
> MessageSearchException {
>              if (ids == null)
>                    throw new MessageSearchException("assertion 
> failure: null ids",logger);
>                          Term[] terms = new Term[ids.size()];
>              int c = 0;
>              StringBuffer deleteInfo = new StringBuffer();
>              for (String id : ids) {
>                  terms[c++] = new Term("uid",id);
>                  deleteInfo.append(id);
>                  deleteInfo.append(",");
>              }
>                          String deleteStr = deleteInfo.toString();
>              if (deleteStr.length()>0 && 
> deleteStr.charAt(deleteStr.length()-1)==',')
>                  deleteStr = deleteStr.substring(0,deleteStr.length()-1);
>                          logger.debug("delete messages 
> {'"+deleteInfo+"'}");
>              try {
>                  indexLock.lock();
>                  openIndex();
>                  try {
>                      writer.deleteDocuments(terms);
>                      writer.expungeDeletes();
>                  } catch (Exception e) {
>                      throw new MessageSearchException("failed to 
> delete email from index.",e,logger);
>                  } finally {
>                                      }
>              } finally {
>                  closeIndex();
>                  indexLock.unlock();
>              }
>        }
>                protected void openIndex() throws MessageSearchException {
>             Exception lastError = null;
>                      if (writer==null) {
>                logger.debug("openIndex() index will be opened. it is 
> currently closed.");
>            } else {
>                logger.debug("openIndex() did not bother opening index. 
> it is already open.");
>                return;
>            }
>            logger.debug("opening index for write {"+volume+"}");
>            indexer.prepareIndex(volume);
>            logger.debug("opening search index for write 
> {indexpath='"+volume.getIndexPath()+"'}");
>            boolean writelock;
>            int attempt = 0;
>            int maxattempt = 10;
>                      if 
> (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
>                maxattempt = 10000;
>             } else {
>                maxattempt = 10;
>             }
>                      do {
>                writelock = false;
>                try {
>                        FSDirectory fsDirectory = 
> FSDirectory.getDirectory(volume.getIndexPath());
>                        int maxIndexChars = 
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
>                        writer = new 
> IndexWriter(fsDirectory,analyzer,new 
> IndexWriter.MaxFieldLength(maxIndexChars));
>                        if (logger.isDebugEnabled() && 
> indexLogOut!=null) {
>                            writer.setInfoStream(indexLogOut);
>                        }
>                } catch (LockObtainFailedException lobfe) {
>                        logger.debug("write lock on index. will reopen 
> in 50ms.");
>                        try { Thread.sleep(50); } catch (Exception e) {}
>                        attempt++;
>                        writelock = true;
>                } catch (CorruptIndexException cie) {
>                    throw new MessageSearchException("index appears to 
> be corrupt. please reindex the active volume."+cie.getMessage(),logger);
>                } catch (IOException io) {
>                    throw new MessageSearchException("failed to write 
> document to index:"+io.getMessage(),logger);
>                }
>           } while (writelock && attempt<maxattempt);
>           if (attempt>=10000)
>             throw new MessageSearchException("failed to open index 
> writer {location='"+volume.getIndexPath()+"'}",lastError,logger);
>        }
>              public void indexMessage(Email message) throws 
> MessageSearchException  {
>            logger.debug("index message {"+message+"}");
>            long s = (new Date()).getTime();
>            if (message == null)
>                throw new MessageSearchException("assertion failure: 
> null message",logger);
>            Document doc = new Document();
>            IndexInfo indexInfo = new IndexInfo(doc);
>            try {
>               DocumentIndex docIndex = new DocumentIndex(indexer);
>               String language = doc.get("lang");
>               if (language==null)
>                   language = indexer.getIndexLanguage();
>               docIndex.write(message,doc,indexInfo);
>               queue.put(indexInfo);
>               logger.debug("message indexed successfully 
> {"+message+",language='"+language+"'}");
>            } catch (MessagingException me) {
>               throw new MessageSearchException("failed to decode 
> message during indexing",me,logger, ChainedException.Level.DEBUG);
>            } catch (IOException me) {
>                throw new MessageSearchException("failed to index 
> message"+me.getMessage()+" {"+message+"}",me,logger, 
> ChainedException.Level.DEBUG);
>            } catch (ExtractionException ee)
>            {
>                // we will want to continue indexing
>               //throw new MessageSearchException("failed to decode 
> attachments in message {"+message+"}",ee,logger, 
> ChainedException.Level.DEBUG);
>            } catch (AlreadyClosedException ace) {
>                indexMessage(message);
>            } catch (Throwable e) {
>                throw new MessageSearchException("failed to index 
> message:"+e.getMessage(),e,logger, ChainedException.Level.DEBUG);
>            }
>            logger.debug("indexing message end {"+message+"}");
>            long e = (new Date()).getTime();
>            logger.debug("indexing time {time='"+(e-s)+"'}");
>        }
>          public class IndexProcessor extends Thread {
>                      public IndexProcessor() {
>                setName("index processor");
>            }
>                public void run() {
>                boolean exit = false;
>                //ExecutorService documentPool;
>                // we abandoned pool as it does not seem to offer any 
> major performance benefit
>                IndexInfo indexInfo = null;                  
> LinkedList<IndexInfo> pushbacks = new LinkedList<IndexInfo>();
>                              while (!exit) {
>                                      try {
>                        int maxIndexDocs = 
> Config.getConfig().getIndex().getMaxSimultaneousDocs();
>                        //documentPool = 
> Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads()); 
>
>                        indexInfo = null;                          
> indexInfo = (IndexInfo) queue.take();
>                        if (indexInfo==EXIT_REQ) {
>                            logger.debug("index exit req received. 
> exiting");
>                            exit = true;
>                            continue;
>                        }
>                                          indexLock.lock();
>                        try {
>                             openIndex();
>                        } catch (Exception e) {
>                             logger.error("failed to open 
> index:"+e.getMessage(),e);
>                             return;
>                        }
>                        if (indexInfo==null) {
>                            logger.debug("index info is null");
>                        }
>                        int i = 0;
>                        while(indexInfo!=null && i<maxIndexDocs) {
>                            try {
>                                
> writer.addDocument(indexInfo.getDocument());
>                            } catch (IOException io) {
>                                logger.error("failed to add document to 
> index:"+io.getMessage(),io);
>                            } catch (AlreadyClosedException e) {
>                                pushbacks.add(indexInfo);
>                            } finally {
>                                indexInfo.cleanup();
>                            }
>                                                        
> //documentPool.execute(new IndexDocument(indexInfo,pushbacks));
>                                                       i++;
>                                                         if 
> (i<maxIndexDocs) {
>                                 indexInfo = (IndexInfo) queue.poll();
>                                                                 if 
> (indexInfo==null) {
>                                        logger.debug("index info is 
> null");
>                                 }
>                                                                 if 
> (indexInfo==EXIT_REQ) {
>                                         logger.debug("index exit req 
> received. exiting (2)");
>                                        exit = true;
>                                        break;
>                                  }
>                             }
>                                                    }
>                                              for (IndexInfo pushback : 
> pushbacks) {
>                            try {
>                                
> writer.addDocument(pushback.getDocument());
>                            } catch (IOException io) {
>                                logger.error("failed to add document to 
> index:"+io.getMessage(),io);
>                            } catch (AlreadyClosedException e) {
>                                pushbacks.add(indexInfo);
>                            } finally {
>                                indexInfo.cleanup();
>                            }
>                            //documentPool.execute(new 
> IndexDocument(pushback,pushbacks));
>                            i++;
>                        }
>                        //documentPool.shutdown();
>                        
> //documentPool.awaitTermination(30,TimeUnit.MINUTES);
>                                           } catch (Throwable ie) {
>                         logger.error("index write 
> interrupted:"+ie.getMessage());
>                     } finally {
>                           closeIndex();
>                          indexLock.unlock();
>                    }
>                }                 }
>                      public class IndexDocument extends Thread {
>                                  IndexInfo indexInfo = null;
>                    List<IndexInfo> pushbacks = null;
>                                      public IndexDocument(IndexInfo 
> indexInfo,List<IndexInfo> pushbacks) {
>                        this.indexInfo = indexInfo;
>                        this.pushbacks = pushbacks;
>                        setName("index document");
>                    }
>                                  public void run() {
>                        try {
>                            writer.addDocument(indexInfo.getDocument());
>                        } catch (IOException io) {
>                            logger.error("failed to add document to 
> index:"+io.getMessage(),io);
>                        } catch (AlreadyClosedException e) {
>                            pushbacks.add(indexInfo);
>                        }
>                    }};
>            }
>            protected void closeIndex() {
>             try {
>                     if (writer!=null) {
>                        writer.close();
>                        logger.debug("writer closed");
>                        writer = null;
>                     }
>                 } catch (Exception io) {
>                    logger.error("failed to close index 
> writer:"+io.getMessage(),io);
>                 }
>        }
>            public void deleteIndex() throws MessageSearchException {
>                   logger.debug("delete index 
> {indexpath='"+volume.getIndexPath()+"'}");
>                   try {
>                      indexLock.lock();
>                     try {
>                        int maxIndexChars = 
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
>                        writer = new 
> IndexWriter(FSDirectory.getDirectory(volume.getIndexPath()),analyzer,true,new 
> IndexWriter.MaxFieldLength(maxIndexChars));
>                     } catch (Exception cie) {
>                         logger.error("failed to delete index 
> {index='"+volume.getIndexPath()+"'}",cie);
>                         return;
>                     }
>                     MessageIndex.volumeIndexes.remove(this);
>                  } finally {
>                        closeIndex();
>                      indexLock.unlock();
>                }
>          }
>                public void startup() {
>            logger.debug("volumeindex is starting up");
>            File lockFile = new 
> File(volume.getIndexPath()+File.separatorChar + "write.lock");
>            if (lockFile.exists()) {
>                logger.warn("The server lock file already exists. 
> Either another indexer is running or the server was not shutdown 
> correctly.");
>                logger.warn("If it is the latter, the lock file must be 
> manually deleted at "+lockFile.getAbsolutePath());
>                if (indexer.getMultipleIndexProcesses()) {
>                    logger.debug("index lock file detected on 
> volumeindex startup.");
>                } else {
>                    logger.warn("index lock file detected. the server 
> was shutdown incorrectly. automatically deleting lock file.");
>                    logger.warn("indexer is configured to deal with 
> only one indexer process.");
>                    logger.warn("if you are running more than one 
> indexer, your index could be subject to corruption.");
>                    lockFile.delete();
>                }
>            }
>            indexProcessor = new IndexProcessor();
>            indexProcessor.start();
>            Runtime.getRuntime().addShutdownHook(this);
>                    }
>                                  public void shutdown() {
>              logger.debug("volumeindex is shutting down");
>              queue.add(EXIT_REQ);
>              scheduler.shutdownNow();
>                     }
>                  @Override
>          public void run() {
>              queue.add(EXIT_REQ);
>          }
>             }
>
>  
>
>
> Is it possible a large merge is running?  By default IW.close waits
> for outstanding merges to complete.  Can you post the stacktrace?
>
> Mike
>
> On Thu, Oct 8, 2009 at 5:22 PM, Jamie Band <jamie@stimulussoft.com> 
> wrote:
>> Hi All
>>
>> I have a long running situation where our indexing thread is getting 
>> stuck
>> indefinitely in IndexWriter's close method. Yourkit shows the thread 
>> to be
>> stuck in TIME_WAITING. Any idea's on what could be causing this?
>> Could it be one of the streams or readers we passed to the document?
>>
>> I am running Lucene 2.9.0.
>>
>> Many thanks in advance
>>
>> Jamie


---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
For additional commands, e-mail: java-user-help@lucene.apache.org


Mime
View raw message