lucene-java-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jamie <ja...@stimulussoft.com>
Subject File Handle Leaks During Lucene 3.0.2 Merge
Date Wed, 29 Sep 2010 17:48:08 GMT
  Hi There

I am noticing file handle leaks appearing on Index files. I think the 
leaks occur during the Lucene merge operation.
Lsof reports the following:

java      28604   root  213r      REG               8,33  1098681   
57409621 /var/index/vol201009/_a4w.cfs (deleted)
java      28604   root  214r      REG               8,33    35164   
57409699 /var/index/vol201009/_a4x.cfs (deleted)
java      28604   root  215r      REG               8,33    46139   
57409691 /var/index/vol201009/_a4y.cfs (deleted)
java      28604   root  216r      REG               8,33    40342   
57409673 /var/index/vol201009/_a4z.cfs (deleted)
java      28604   root  217r      REG               8,33    44204   
57409675 /var/index/vol201009/_a50.cfs (deleted)

We are using Lucene's realtime search feature so have handles open on 
the index. Could it be that we are not
handling the merge situation correctly or something? Your ideas are most 
appreciated.

The source code to our index file as follows:

package com.stimulus.archiva.index;
import com.stimulus.util.*;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import org.apache.commons.logging.*;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.*;
import org.apache.lucene.store.FSDirectory;
import com.stimulus.archiva.domain.Config;
import com.stimulus.archiva.exception.*;
import com.stimulus.archiva.language.AnalyzerFactory;
import com.stimulus.archiva.search.*;
import java.util.*;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.AlreadyClosedException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.*;

public class LuceneIndex extends Thread {

          protected ArrayBlockingQueue<LuceneDocument> queue;
          protected static final Log logger = 
LogFactory.getLog(LuceneIndex.class.getName());
          protected static final Log indexLog = 
LogFactory.getLog("indexlog");
             IndexWriter writer = null;
             protected static ScheduledExecutorService scheduler;
          protected static ScheduledFuture<?> scheduledTask;
          protected LuceneDocument EXIT_REQ = null;
          ReentrantLock indexLock = new ReentrantLock();
          ArchivaAnalyzer analyzer     = new ArchivaAnalyzer();
          File indexLogFile;
          PrintStream indexLogOut;
          IndexProcessor indexProcessor;
          String friendlyName;
          String indexPath;
          int maxSimultaneousDocs;
          int indexThreads;
          IndexReader reader = null;
          volatile boolean reopen = false;
          FSDirectory fsDirectory;
          ReentrantLock readerLock = new ReentrantLock();
          enum Status { READY, SHUTDOWN };
          Status status = Status.SHUTDOWN;

             public LuceneIndex(int queueSize, LuceneDocument exitReq, 
String friendlyName, String indexPath, int  maxSimultaneousDocs, int 
indexThreads) {
                 this.queue = new 
ArrayBlockingQueue<LuceneDocument>(queueSize);
                 this.EXIT_REQ = exitReq;
                 this.friendlyName = friendlyName;
                 this.indexPath = indexPath;
                 this.maxSimultaneousDocs = maxSimultaneousDocs;
                 this.indexThreads = indexThreads;
                 this.status = Status.SHUTDOWN;
               //  if (indexLog.isDebugEnabled()) {
                   //setLog(friendlyName);
                 //}
             }


           public int getMaxSimultaneousDocs() {
               return maxSimultaneousDocs;
           }

           public void setMaxSimultaneousDocs(int maxSimultaneousDocs) {
               this.maxSimultaneousDocs = maxSimultaneousDocs;
           }


           public ReentrantLock getIndexLock() {
               return indexLock;
           }

           protected void setLog(String logName) {

                 try {
                     indexLogFile = getIndexLogFile(logName);
                     if (indexLogFile!=null) {
                         if (indexLogFile.length()>10485760)
                             indexLogFile.delete();
                         indexLogOut = new PrintStream(indexLogFile);
                     }
                     logger.debug("set index log file path 
{path='"+indexLogFile.getCanonicalPath()+"'}");
                 } catch (Exception e) {
                     logger.error("failed to open index log 
file:"+e.getMessage(),e);
                 }

           }

           protected File getIndexLogFile(String logName) {
                try {
                     String logfilepath = 
Config.getFileSystem().getLogPath()+File.separator+"indexdebug_"+logName+".log";
                     return new File(logfilepath);
                 } catch (Exception e) {
                     logger.error("failed to open index log 
file:"+e.getMessage(),e);
                     return null;
                 }
           }



           protected void openIndex() throws MessageSearchException {
             Exception lastError = null;

             if (writer==null) {
                 logger.debug("openIndex() index "+friendlyName+" will 
be opened. it is currently closed.");
             } else {
                 logger.debug("openIndex() did not bother opening index 
"+friendlyName+". it is already open.");
                 return;
             }
             logger.debug("opening index "+friendlyName+" for write");
             logger.debug("opening search index "+friendlyName+" for 
write {indexpath='"+indexPath+"'}");
             boolean writelock;
             int attempt = 0;
             int maxattempt = 10;

             if 
(Config.getConfig().getIndex().getMultipleIndexProcesses()) {
                 maxattempt = 10000;
              } else {
                 maxattempt = 10;
              }

             do {
                 writelock = false;
                 try {
                         fsDirectory = FSDirectory.open(new 
File(indexPath));
                         int maxIndexChars = 
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
                         writer = new 
IndexWriter(fsDirectory,analyzer,new 
IndexWriter.MaxFieldLength(maxIndexChars));
                         if (indexLog.isDebugEnabled() && 
indexLogOut!=null) {
                             writer.setInfoStream(indexLogOut);
                         }
                 } catch (LockObtainFailedException lobfe) {
                         logger.debug("write lock on index 
"+friendlyName+". will reopen in 50ms.");
                         try { Thread.sleep(50); } catch (Exception e) {}
                         attempt++;
                         writelock = true;
                 } catch (CorruptIndexException cie) {
                     throw new MessageSearchException("index 
"+friendlyName+" appears to be corrupt. please reindex the active 
volume."+cie.getMessage(),logger);
                 } catch (Throwable io) {
                     throw new MessageSearchException("failed to write 
document to index "+friendlyName+":"+io.getMessage(),logger);
                 }
            } while (writelock && attempt<maxattempt);
            if (attempt>=10000)
              throw new MessageSearchException("failed to open index 
"+friendlyName+" writer {indexPath='"+indexPath+"'}",lastError,logger);
         }

         public void indexDocument(LuceneDocument luceneDocument) throws 
MessageSearchException {
             logger.debug("index document {"+luceneDocument+"}");
             if (status==Status.SHUTDOWN) {
                   throw new MessageSearchException("index is 
shutdown.",logger);
             }
             long s = (new Date()).getTime();
             if (luceneDocument == null)
                 throw new MessageSearchException("assertion failure: 
null document",logger);
             try {
                 queue.put(luceneDocument);
             } catch (InterruptedException ie) {
                 throw new MessageSearchException("failed to add 
document to queue:"+ie.getMessage(),ie,logger);
             }
             logger.debug("document indexed successfully 
{"+luceneDocument+"}");

             logger.debug("indexing message end {"+luceneDocument+"}");
             long e = (new Date()).getTime();
             logger.debug("indexing time {time='"+(e-s)+"'}");
         }

         public class DocWriter implements Runnable {

             LuceneDocument doc;
             String language;
             LinkedList<LuceneDocument> pushbacks;
             ReentrantLock pushbackLock;

             public DocWriter(LuceneDocument doc,String 
language,LinkedList<LuceneDocument> pushbacks, ReentrantLock pushbackLock) {
                 this.doc = doc;
                 this.language = language;
                 this.pushbacks = pushbacks;
             }

             public void run() {
                 try {
                     
writer.addDocument(doc.getDocument(),AnalyzerFactory.getAnalyzer(language,AnalyzerFactory.Operation.INDEX));
                 } catch (IOException io) {
                     logger.error("failed to add document to 
index:"+io.getMessage(),io);
                 } catch (AlreadyClosedException e) {
                     try {
                         pushbackLock.lock();
                         pushbacks.add(doc);
                     } finally {
                         pushbackLock.unlock();
                     }
                 }
             }

         }



         public class IndexProcessor extends Thread {

             public IndexProcessor() {
                 setName("index processor");
             }

             public void run() {
                 boolean exit = false;
                 LuceneDocument luceneDocument = null;
                 LinkedList<LuceneDocument> pushbacks = new 
LinkedList<LuceneDocument>();
                 ReentrantLock pushbackLock = new ReentrantLock();


                 while (!exit) {


                     //documentPool = 
Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
                     luceneDocument = null;
                     try {
                         luceneDocument = (LuceneDocument) queue.take();
                     } catch (InterruptedException e) {
                         logger.debug("index exit req received. exiting");
                         exit = true;
                         continue;
                     }
                     if (luceneDocument==EXIT_REQ) {
                         logger.debug("index exit req received. exiting");
                         exit = true;
                         continue;
                     }
                      try {

                         indexLock.lock();

                         if (luceneDocument==null) {
                             logger.debug("index info is null");
                         }
                         int i = 0;
                         ExecutorService threadPool = 
Executors.newFixedThreadPool(indexThreads,ThreadUtil.getFlexibleThreadFactory("indexwritepool",Thread.NORM_PRIORITY,true));

                         while(luceneDocument!=null && 
i<maxSimultaneousDocs) {
                                 Document doc = 
luceneDocument.getDocument();
                                 String language = doc.get("lang");
                                 if (language==null) {
                                     language = 
Config.getConfig().getIndex().getIndexLanguage();
                                 }
                                 DocWriter docWriter = new 
DocWriter(luceneDocument,language,pushbacks,pushbackLock);
                                 threadPool.submit(docWriter);

                                  i++;
                                  if (i<maxSimultaneousDocs) {
                                      luceneDocument = (LuceneDocument) 
queue.poll();

                                      if (luceneDocument==null) {
                                             logger.debug("index info is 
null");
                                      }

                                      if (luceneDocument==EXIT_REQ) {
                                              logger.debug("index exit 
req received. exiting (2)");
                                             exit = true;
                                             break;
                                       }
                                  }

                         }
                         threadPool.shutdown();
                         threadPool.awaitTermination(30,TimeUnit.MINUTES);
                         try {
                             pushbackLock.lock();
                             if (pushbacks.size()>0) {
                                  for (LuceneDocument pushback : 
pushbacks) {
                                         try {
                                             
writer.addDocument(pushback.getDocument());
                                         } catch (IOException io) {
                                             logger.error("failed to add 
document to index:"+io.getMessage(),io);
                                         } catch (AlreadyClosedException 
e) {
                                             pushbacks.add(pushback);
                                         }
                                         i++;
                                   }
                             }
                         } finally {
                             pushbackLock.unlock();
                         }
                         logger.debug("index commit");
                         try {
                             if (writer!=null) {
                                 writer.commit();
                             }
                         } catch (Exception e) {
                             logger.error("failed to commit 
index:"+e.getMessage(),e);
                             try {
                                 readerLock.lock();
                                 closeIndex();
                                 openIndex();
                             } finally {
                                 readerLock.unlock();
                             }
                         }

                      } catch (Throwable ie) {
                          logger.error("index write 
interrupted:"+ie.getMessage(),ie);
                      } finally {

                           indexLock.unlock();
                     }
                 }
                 logger.debug("exit indexer");
                }

             public class IndexDocument extends Thread {

                     LuceneDocument luceneDocument = null;
                     List<LuceneDocument> pushbacks = null;

                     public IndexDocument(LuceneDocument 
luceneDocument,List<LuceneDocument> pushbacks) {
                         this.luceneDocument = luceneDocument;
                         this.pushbacks = pushbacks;
                         setName("index document");
                     }

                     public void run() {
                         try {
                             
writer.addDocument(luceneDocument.getDocument());
                         } catch (IOException io) {
                             logger.error("failed to add document to 
index:"+io.getMessage(),io);
                         } catch (AlreadyClosedException e) {
                             pushbacks.add(luceneDocument);
                         } catch (Throwable t) {
                             logger.error("failed to add document to 
index:"+t.getMessage(),t);
                         }
                     }};
             }

         protected void closeIndex() {
              try {
                  indexLock.lock();


                  if (writer!=null) {
                     writer.close();
                  }

                  if (fsDirectory!=null) {
                      fsDirectory.close();
                  }
              } catch (Throwable io) {
                 logger.error("failed to close index 
writer:"+io.getMessage(),io);
              } finally {
                  writer = null;
                  indexLock.unlock();
              }
         }



         public void optimize() throws MessageSearchException {
               logger.debug("optimize volume");
               try {
                   indexLock.lock();
                   try {
                       writer.optimize(false);
                   } catch (Exception io) {
                       throw new MessageSearchException("failed to 
optimize the index:"+io.getMessage(),io,logger);
                   }
               } catch (Throwable t) { // diskspace problems could arise
                   logger.error("failed to optimize 
index:"+t.getMessage(),t);
               } finally {
                   indexLock.unlock();
               }

         }
         public void deleteDocs(Term[] terms) throws 
MessageSearchException {
               logger.debug("delete docs");
               if (status==Status.SHUTDOWN) {
                    throw new MessageSearchException("index is 
shutdown.",logger);
               }
               try {
                   indexLock.lock();
                   openIndex();
                   try {
                       writer.deleteDocuments(terms);
                   } catch (Exception e) {
                       throw new MessageSearchException("failed to 
delete doc from index:"+e.getMessage(),e,logger);
                   } finally {
                       try {
                           writer.commit();
                           writer.expungeDeletes(false);
                       } catch (Exception io) {
                           throw new MessageSearchException("failed to 
expunge docs from index:"+io.getMessage(),io,logger);
                       }
                   }
               } catch (Throwable t) {
                   logger.error("failed to delete docs from 
index."+t.getMessage(),t);
               } finally {
                   indexLock.unlock();
               }
         }


           public void deleteIndex() throws MessageSearchException {
                    logger.debug("delete index 
{indexpath='"+indexPath+"'}");
                    try {
                       indexLock.lock();
                      closeIndex();
                      File indexFile = new File(indexPath);
                      //deleteDirContents(indexFile);
                      try {
                         int maxIndexChars = 
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
                         writer = new 
IndexWriter(FSDirectory.open(indexFile),analyzer,true,new 
IndexWriter.MaxFieldLength(maxIndexChars));
                      } catch (Throwable cie) {
                          logger.error("failed to delete index 
{index='"+indexPath+"'}",cie);
                          return;
                      } finally {
                          try { writer.close(); } catch (Exception e) { 
logger.debug("failed to close writer:"+e.getMessage()); }
                          writer = null;
                      }
                   } finally {
                         openIndex();
                       indexLock.unlock();
                 }
           }

           public void startup() throws MessageSearchException {
             logger.debug("luceneindex is starting up");


             File lockFile = new File(indexPath+File.separatorChar + 
"write.lock");
             if (lockFile.exists()) {
                 if 
(Config.getConfig().getIndex().getMultipleIndexProcesses()) {
                     logger.debug("index lock file detected on 
volumeindex startup.");
                 } else {
                     logger.warn("index lock file detected. the server 
was shutdown incorrectly. automatically deleting lock file. 
{lockFile='"+lockFile.getPath()+"'}");
                     lockFile.delete();
                 }
             }
             openIndex();
             scheduler = 
Executors.newScheduledThreadPool(1,ThreadUtil.getFlexibleThreadFactory("index 
reopen",Thread.NORM_PRIORITY-1,true));
             scheduledTask = scheduler.scheduleWithFixedDelay(new 
Runnable() { public void run() { reopen=true; }},1,1,TimeUnit.SECONDS);

             indexProcessor = new IndexProcessor();
             indexProcessor.start();


             Runtime.getRuntime().addShutdownHook(this);
             status = Status.READY;
           }

           public IndexReader getReader() throws MessageSearchException {
               if (status==Status.SHUTDOWN) {
                    throw new MessageSearchException("index is 
shutdown.",logger);
               }
               readerLock.lock();
               try {
                   if (writer==null) {
                       throw new MessageSearchException("cannot retrieve 
reader. writer is closed (or null)",logger);
                   }
                   if (reader == null) {
                       reader = new VolumeIndexReader(writer.getReader(5));
                   } else {
                       try {
                           if (reopen) {
                               reader = new 
VolumeIndexReader(writer.getReader(5));
                               reopen = false;
                           }
                       } catch (AlreadyClosedException ace) {
                           logger.debug("reader was found closed. 
reopening");
                           reader = new 
VolumeIndexReader(writer.getReader(5));
                       }
                   }
               } catch (IOException io) {
                   throw new MessageSearchException("failed to retrieve 
reader from writer:"+io.getMessage(),io,logger);
               } finally {
                   readerLock.unlock();
               }
               return reader;
           }


           public void shutdown() {
               status = Status.SHUTDOWN;
               try { queue.put(EXIT_REQ); } catch (InterruptedException 
e) {}

               if (reader!=null) {
                   try {
                       reader.close();
                   } catch (Exception e) {
                       logger.error("failed to close index 
reader:"+e.getMessage());
                   }
               }
               reader = null;
               if (scheduler!=null) {
                   scheduler.shutdown();
               }
               closeIndex();

               indexProcessor.interrupt();

               if (scheduler!=null) {
                   scheduler.shutdownNow();
               }

            }

           @Override
           public void run() {
               shutdown();
           }


           public interface LuceneDocument {

               public String toString();
               public Document getDocument();
               public void finalize();

           }

           public static void deleteDirContents(File path) {
                 if( path.exists() ) {
                   File[] files = path.listFiles();
                   for(int i=0; i<files.length; i++) {
                      if(files[i].isFile()) {
                       files[i].delete();
                      }
                   }
                 }
            }



}





---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
For additional commands, e-mail: java-user-help@lucene.apache.org


Mime
View raw message