lucene-java-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Uwe Schindler" <...@thetaphi.de>
Subject RE: File Handle Leaks During Lucene 3.0.2 Merge
Date Wed, 29 Sep 2010 18:21:47 GMT
The "deleted" files are only freed by OS kernel if no longer an IndexReader
accesses them. Did you get a new realtime reader after merging and *closed*
the old one?

-----
Uwe Schindler
H.-H.-Meier-Allee 63, D-28213 Bremen
http://www.thetaphi.de
eMail: uwe@thetaphi.de


> -----Original Message-----
> From: Jamie [mailto:jamie@stimulussoft.com]
> Sent: Wednesday, September 29, 2010 10:48 AM
> To: java-user@lucene.apache.org
> Subject: File Handle Leaks During Lucene 3.0.2 Merge
> 
>   Hi There
> 
> I am noticing file handle leaks appearing on Index files. I think the
leaks occur
> during the Lucene merge operation.
> Lsof reports the following:
> 
> java      28604   root  213r      REG               8,33  1098681
> 57409621 /var/index/vol201009/_a4w.cfs (deleted)
> java      28604   root  214r      REG               8,33    35164
> 57409699 /var/index/vol201009/_a4x.cfs (deleted)
> java      28604   root  215r      REG               8,33    46139
> 57409691 /var/index/vol201009/_a4y.cfs (deleted)
> java      28604   root  216r      REG               8,33    40342
> 57409673 /var/index/vol201009/_a4z.cfs (deleted)
> java      28604   root  217r      REG               8,33    44204
> 57409675 /var/index/vol201009/_a50.cfs (deleted)
> 
> We are using Lucene's realtime search feature so have handles open on the
> index. Could it be that we are not handling the merge situation correctly
or
> something? Your ideas are most appreciated.
> 
> The source code to our index file as follows:
> 
> package com.stimulus.archiva.index;
> import com.stimulus.util.*;
> 
> import java.io.File;
> import java.io.IOException;
> import java.io.PrintStream;
> import org.apache.commons.logging.*;
> import org.apache.lucene.document.Document;
> import org.apache.lucene.index.*;
> import org.apache.lucene.store.FSDirectory;
> import com.stimulus.archiva.domain.Config;
> import com.stimulus.archiva.exception.*; import
> com.stimulus.archiva.language.AnalyzerFactory;
> import com.stimulus.archiva.search.*;
> import java.util.*;
> import org.apache.lucene.store.LockObtainFailedException;
> import org.apache.lucene.store.AlreadyClosedException;
> import java.util.concurrent.locks.ReentrantLock;
> import java.util.concurrent.*;
> 
> public class LuceneIndex extends Thread {
> 
>           protected ArrayBlockingQueue<LuceneDocument> queue;
>           protected static final Log logger =
> LogFactory.getLog(LuceneIndex.class.getName());
>           protected static final Log indexLog =
LogFactory.getLog("indexlog");
>              IndexWriter writer = null;
>              protected static ScheduledExecutorService scheduler;
>           protected static ScheduledFuture<?> scheduledTask;
>           protected LuceneDocument EXIT_REQ = null;
>           ReentrantLock indexLock = new ReentrantLock();
>           ArchivaAnalyzer analyzer     = new ArchivaAnalyzer();
>           File indexLogFile;
>           PrintStream indexLogOut;
>           IndexProcessor indexProcessor;
>           String friendlyName;
>           String indexPath;
>           int maxSimultaneousDocs;
>           int indexThreads;
>           IndexReader reader = null;
>           volatile boolean reopen = false;
>           FSDirectory fsDirectory;
>           ReentrantLock readerLock = new ReentrantLock();
>           enum Status { READY, SHUTDOWN };
>           Status status = Status.SHUTDOWN;
> 
>              public LuceneIndex(int queueSize, LuceneDocument exitReq,
String
> friendlyName, String indexPath, int  maxSimultaneousDocs, int
> indexThreads) {
>                  this.queue = new
> ArrayBlockingQueue<LuceneDocument>(queueSize);
>                  this.EXIT_REQ = exitReq;
>                  this.friendlyName = friendlyName;
>                  this.indexPath = indexPath;
>                  this.maxSimultaneousDocs = maxSimultaneousDocs;
>                  this.indexThreads = indexThreads;
>                  this.status = Status.SHUTDOWN;
>                //  if (indexLog.isDebugEnabled()) {
>                    //setLog(friendlyName);
>                  //}
>              }
> 
> 
>            public int getMaxSimultaneousDocs() {
>                return maxSimultaneousDocs;
>            }
> 
>            public void setMaxSimultaneousDocs(int maxSimultaneousDocs) {
>                this.maxSimultaneousDocs = maxSimultaneousDocs;
>            }
> 
> 
>            public ReentrantLock getIndexLock() {
>                return indexLock;
>            }
> 
>            protected void setLog(String logName) {
> 
>                  try {
>                      indexLogFile = getIndexLogFile(logName);
>                      if (indexLogFile!=null) {
>                          if (indexLogFile.length()>10485760)
>                              indexLogFile.delete();
>                          indexLogOut = new PrintStream(indexLogFile);
>                      }
>                      logger.debug("set index log file path
> {path='"+indexLogFile.getCanonicalPath()+"'}");
>                  } catch (Exception e) {
>                      logger.error("failed to open index log
file:"+e.getMessage(),e);
>                  }
> 
>            }
> 
>            protected File getIndexLogFile(String logName) {
>                 try {
>                      String logfilepath =
>
Config.getFileSystem().getLogPath()+File.separator+"indexdebug_"+logName+".
> log";
>                      return new File(logfilepath);
>                  } catch (Exception e) {
>                      logger.error("failed to open index log
file:"+e.getMessage(),e);
>                      return null;
>                  }
>            }
> 
> 
> 
>            protected void openIndex() throws MessageSearchException {
>              Exception lastError = null;
> 
>              if (writer==null) {
>                  logger.debug("openIndex() index "+friendlyName+" will be
opened. it
> is currently closed.");
>              } else {
>                  logger.debug("openIndex() did not bother opening index
> "+friendlyName+". it is already open.");
>                  return;
>              }
>              logger.debug("opening index "+friendlyName+" for write");
>              logger.debug("opening search index "+friendlyName+" for write
> {indexpath='"+indexPath+"'}");
>              boolean writelock;
>              int attempt = 0;
>              int maxattempt = 10;
> 
>              if
> (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
>                  maxattempt = 10000;
>               } else {
>                  maxattempt = 10;
>               }
> 
>              do {
>                  writelock = false;
>                  try {
>                          fsDirectory = FSDirectory.open(new
File(indexPath));
>                          int maxIndexChars =
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
>                          writer = new
> IndexWriter(fsDirectory,analyzer,new
> IndexWriter.MaxFieldLength(maxIndexChars));
>                          if (indexLog.isDebugEnabled() &&
> indexLogOut!=null) {
>                              writer.setInfoStream(indexLogOut);
>                          }
>                  } catch (LockObtainFailedException lobfe) {
>                          logger.debug("write lock on index
"+friendlyName+". will
> reopen in 50ms.");
>                          try { Thread.sleep(50); } catch (Exception e) {}
>                          attempt++;
>                          writelock = true;
>                  } catch (CorruptIndexException cie) {
>                      throw new MessageSearchException("index
"+friendlyName+"
> appears to be corrupt. please reindex the active
> volume."+cie.getMessage(),logger);
>                  } catch (Throwable io) {
>                      throw new MessageSearchException("failed to write
document to
> index "+friendlyName+":"+io.getMessage(),logger);
>                  }
>             } while (writelock && attempt<maxattempt);
>             if (attempt>=10000)
>               throw new MessageSearchException("failed to open index
> "+friendlyName+" writer {indexPath='"+indexPath+"'}",lastError,logger);
>          }
> 
>          public void indexDocument(LuceneDocument luceneDocument) throws
> MessageSearchException {
>              logger.debug("index document {"+luceneDocument+"}");
>              if (status==Status.SHUTDOWN) {
>                    throw new MessageSearchException("index is
shutdown.",logger);
>              }
>              long s = (new Date()).getTime();
>              if (luceneDocument == null)
>                  throw new MessageSearchException("assertion failure:
> null document",logger);
>              try {
>                  queue.put(luceneDocument);
>              } catch (InterruptedException ie) {
>                  throw new MessageSearchException("failed to add document
to
> queue:"+ie.getMessage(),ie,logger);
>              }
>              logger.debug("document indexed successfully
{"+luceneDocument+"}");
> 
>              logger.debug("indexing message end {"+luceneDocument+"}");
>              long e = (new Date()).getTime();
>              logger.debug("indexing time {time='"+(e-s)+"'}");
>          }
> 
>          public class DocWriter implements Runnable {
> 
>              LuceneDocument doc;
>              String language;
>              LinkedList<LuceneDocument> pushbacks;
>              ReentrantLock pushbackLock;
> 
>              public DocWriter(LuceneDocument doc,String
> language,LinkedList<LuceneDocument> pushbacks, ReentrantLock
> pushbackLock) {
>                  this.doc = doc;
>                  this.language = language;
>                  this.pushbacks = pushbacks;
>              }
> 
>              public void run() {
>                  try {
> 
> writer.addDocument(doc.getDocument(),AnalyzerFactory.getAnalyzer(language
> ,AnalyzerFactory.Operation.INDEX));
>                  } catch (IOException io) {
>                      logger.error("failed to add document to
> index:"+io.getMessage(),io);
>                  } catch (AlreadyClosedException e) {
>                      try {
>                          pushbackLock.lock();
>                          pushbacks.add(doc);
>                      } finally {
>                          pushbackLock.unlock();
>                      }
>                  }
>              }
> 
>          }
> 
> 
> 
>          public class IndexProcessor extends Thread {
> 
>              public IndexProcessor() {
>                  setName("index processor");
>              }
> 
>              public void run() {
>                  boolean exit = false;
>                  LuceneDocument luceneDocument = null;
>                  LinkedList<LuceneDocument> pushbacks = new
> LinkedList<LuceneDocument>();
>                  ReentrantLock pushbackLock = new ReentrantLock();
> 
> 
>                  while (!exit) {
> 
> 
>                      //documentPool =
>
Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThr
> eads());
>                      luceneDocument = null;
>                      try {
>                          luceneDocument = (LuceneDocument) queue.take();
>                      } catch (InterruptedException e) {
>                          logger.debug("index exit req received. exiting");
>                          exit = true;
>                          continue;
>                      }
>                      if (luceneDocument==EXIT_REQ) {
>                          logger.debug("index exit req received. exiting");
>                          exit = true;
>                          continue;
>                      }
>                       try {
> 
>                          indexLock.lock();
> 
>                          if (luceneDocument==null) {
>                              logger.debug("index info is null");
>                          }
>                          int i = 0;
>                          ExecutorService threadPool =
> Executors.newFixedThreadPool(indexThreads,ThreadUtil.getFlexibleThreadFact
> ory("indexwritepool",Thread.NORM_PRIORITY,true));
> 
>                          while(luceneDocument!=null &&
> i<maxSimultaneousDocs) {
>                                  Document doc =
> luceneDocument.getDocument();
>                                  String language = doc.get("lang");
>                                  if (language==null) {
>                                      language =
> Config.getConfig().getIndex().getIndexLanguage();
>                                  }
>                                  DocWriter docWriter = new
> DocWriter(luceneDocument,language,pushbacks,pushbackLock);
>                                  threadPool.submit(docWriter);
> 
>                                   i++;
>                                   if (i<maxSimultaneousDocs) {
>                                       luceneDocument = (LuceneDocument)
> queue.poll();
> 
>                                       if (luceneDocument==null) {
>                                              logger.debug("index info is
> null");
>                                       }
> 
>                                       if (luceneDocument==EXIT_REQ) {
>                                               logger.debug("index exit
> req received. exiting (2)");
>                                              exit = true;
>                                              break;
>                                        }
>                                   }
> 
>                          }
>                          threadPool.shutdown();
>                          threadPool.awaitTermination(30,TimeUnit.MINUTES);
>                          try {
>                              pushbackLock.lock();
>                              if (pushbacks.size()>0) {
>                                   for (LuceneDocument pushback :
> pushbacks) {
>                                          try {
> 
> writer.addDocument(pushback.getDocument());
>                                          } catch (IOException io) {
>                                              logger.error("failed to add
> document to index:"+io.getMessage(),io);
>                                          } catch (AlreadyClosedException
> e) {
>                                              pushbacks.add(pushback);
>                                          }
>                                          i++;
>                                    }
>                              }
>                          } finally {
>                              pushbackLock.unlock();
>                          }
>                          logger.debug("index commit");
>                          try {
>                              if (writer!=null) {
>                                  writer.commit();
>                              }
>                          } catch (Exception e) {
>                              logger.error("failed to commit
> index:"+e.getMessage(),e);
>                              try {
>                                  readerLock.lock();
>                                  closeIndex();
>                                  openIndex();
>                              } finally {
>                                  readerLock.unlock();
>                              }
>                          }
> 
>                       } catch (Throwable ie) {
>                           logger.error("index write
> interrupted:"+ie.getMessage(),ie);
>                       } finally {
> 
>                            indexLock.unlock();
>                      }
>                  }
>                  logger.debug("exit indexer");
>                 }
> 
>              public class IndexDocument extends Thread {
> 
>                      LuceneDocument luceneDocument = null;
>                      List<LuceneDocument> pushbacks = null;
> 
>                      public IndexDocument(LuceneDocument
> luceneDocument,List<LuceneDocument> pushbacks) {
>                          this.luceneDocument = luceneDocument;
>                          this.pushbacks = pushbacks;
>                          setName("index document");
>                      }
> 
>                      public void run() {
>                          try {
> 
> writer.addDocument(luceneDocument.getDocument());
>                          } catch (IOException io) {
>                              logger.error("failed to add document to
> index:"+io.getMessage(),io);
>                          } catch (AlreadyClosedException e) {
>                              pushbacks.add(luceneDocument);
>                          } catch (Throwable t) {
>                              logger.error("failed to add document to
> index:"+t.getMessage(),t);
>                          }
>                      }};
>              }
> 
>          protected void closeIndex() {
>               try {
>                   indexLock.lock();
> 
> 
>                   if (writer!=null) {
>                      writer.close();
>                   }
> 
>                   if (fsDirectory!=null) {
>                       fsDirectory.close();
>                   }
>               } catch (Throwable io) {
>                  logger.error("failed to close index
> writer:"+io.getMessage(),io);
>               } finally {
>                   writer = null;
>                   indexLock.unlock();
>               }
>          }
> 
> 
> 
>          public void optimize() throws MessageSearchException {
>                logger.debug("optimize volume");
>                try {
>                    indexLock.lock();
>                    try {
>                        writer.optimize(false);
>                    } catch (Exception io) {
>                        throw new MessageSearchException("failed to
> optimize the index:"+io.getMessage(),io,logger);
>                    }
>                } catch (Throwable t) { // diskspace problems could arise
>                    logger.error("failed to optimize
> index:"+t.getMessage(),t);
>                } finally {
>                    indexLock.unlock();
>                }
> 
>          }
>          public void deleteDocs(Term[] terms) throws
> MessageSearchException {
>                logger.debug("delete docs");
>                if (status==Status.SHUTDOWN) {
>                     throw new MessageSearchException("index is
> shutdown.",logger);
>                }
>                try {
>                    indexLock.lock();
>                    openIndex();
>                    try {
>                        writer.deleteDocuments(terms);
>                    } catch (Exception e) {
>                        throw new MessageSearchException("failed to
> delete doc from index:"+e.getMessage(),e,logger);
>                    } finally {
>                        try {
>                            writer.commit();
>                            writer.expungeDeletes(false);
>                        } catch (Exception io) {
>                            throw new MessageSearchException("failed to
> expunge docs from index:"+io.getMessage(),io,logger);
>                        }
>                    }
>                } catch (Throwable t) {
>                    logger.error("failed to delete docs from
> index."+t.getMessage(),t);
>                } finally {
>                    indexLock.unlock();
>                }
>          }
> 
> 
>            public void deleteIndex() throws MessageSearchException {
>                     logger.debug("delete index
> {indexpath='"+indexPath+"'}");
>                     try {
>                        indexLock.lock();
>                       closeIndex();
>                       File indexFile = new File(indexPath);
>                       //deleteDirContents(indexFile);
>                       try {
>                          int maxIndexChars =
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
>                          writer = new
> IndexWriter(FSDirectory.open(indexFile),analyzer,true,new
> IndexWriter.MaxFieldLength(maxIndexChars));
>                       } catch (Throwable cie) {
>                           logger.error("failed to delete index
> {index='"+indexPath+"'}",cie);
>                           return;
>                       } finally {
>                           try { writer.close(); } catch (Exception e) {
> logger.debug("failed to close writer:"+e.getMessage()); }
>                           writer = null;
>                       }
>                    } finally {
>                          openIndex();
>                        indexLock.unlock();
>                  }
>            }
> 
>            public void startup() throws MessageSearchException {
>              logger.debug("luceneindex is starting up");
> 
> 
>              File lockFile = new File(indexPath+File.separatorChar +
> "write.lock");
>              if (lockFile.exists()) {
>                  if
> (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
>                      logger.debug("index lock file detected on
> volumeindex startup.");
>                  } else {
>                      logger.warn("index lock file detected. the server
> was shutdown incorrectly. automatically deleting lock file.
> {lockFile='"+lockFile.getPath()+"'}");
>                      lockFile.delete();
>                  }
>              }
>              openIndex();
>              scheduler =
> Executors.newScheduledThreadPool(1,ThreadUtil.getFlexibleThreadFactory("in
> dex
> reopen",Thread.NORM_PRIORITY-1,true));
>              scheduledTask = scheduler.scheduleWithFixedDelay(new
> Runnable() { public void run() { reopen=true; }},1,1,TimeUnit.SECONDS);
> 
>              indexProcessor = new IndexProcessor();
>              indexProcessor.start();
> 
> 
>              Runtime.getRuntime().addShutdownHook(this);
>              status = Status.READY;
>            }
> 
>            public IndexReader getReader() throws MessageSearchException {
>                if (status==Status.SHUTDOWN) {
>                     throw new MessageSearchException("index is
> shutdown.",logger);
>                }
>                readerLock.lock();
>                try {
>                    if (writer==null) {
>                        throw new MessageSearchException("cannot retrieve
> reader. writer is closed (or null)",logger);
>                    }
>                    if (reader == null) {
>                        reader = new
VolumeIndexReader(writer.getReader(5));
>                    } else {
>                        try {
>                            if (reopen) {
>                                reader = new
> VolumeIndexReader(writer.getReader(5));
>                                reopen = false;
>                            }
>                        } catch (AlreadyClosedException ace) {
>                            logger.debug("reader was found closed.
> reopening");
>                            reader = new
> VolumeIndexReader(writer.getReader(5));
>                        }
>                    }
>                } catch (IOException io) {
>                    throw new MessageSearchException("failed to retrieve
> reader from writer:"+io.getMessage(),io,logger);
>                } finally {
>                    readerLock.unlock();
>                }
>                return reader;
>            }
> 
> 
>            public void shutdown() {
>                status = Status.SHUTDOWN;
>                try { queue.put(EXIT_REQ); } catch (InterruptedException
> e) {}
> 
>                if (reader!=null) {
>                    try {
>                        reader.close();
>                    } catch (Exception e) {
>                        logger.error("failed to close index
> reader:"+e.getMessage());
>                    }
>                }
>                reader = null;
>                if (scheduler!=null) {
>                    scheduler.shutdown();
>                }
>                closeIndex();
> 
>                indexProcessor.interrupt();
> 
>                if (scheduler!=null) {
>                    scheduler.shutdownNow();
>                }
> 
>             }
> 
>            @Override
>            public void run() {
>                shutdown();
>            }
> 
> 
>            public interface LuceneDocument {
> 
>                public String toString();
>                public Document getDocument();
>                public void finalize();
> 
>            }
> 
>            public static void deleteDirContents(File path) {
>                  if( path.exists() ) {
>                    File[] files = path.listFiles();
>                    for(int i=0; i<files.length; i++) {
>                       if(files[i].isFile()) {
>                        files[i].delete();
>                       }
>                    }
>                  }
>             }
> 
> 
> 
> }
> 
> 
> 
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
> For additional commands, e-mail: java-user-help@lucene.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
For additional commands, e-mail: java-user-help@lucene.apache.org


Mime
View raw message