Return-Path: Delivered-To: apmail-lucene-java-user-archive@www.apache.org Received: (qmail 53010 invoked from network); 29 Sep 2010 17:49:09 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 29 Sep 2010 17:49:09 -0000 Received: (qmail 94299 invoked by uid 500); 29 Sep 2010 17:49:07 -0000 Delivered-To: apmail-lucene-java-user-archive@lucene.apache.org Received: (qmail 94180 invoked by uid 500); 29 Sep 2010 17:49:06 -0000 Mailing-List: contact java-user-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: java-user@lucene.apache.org Delivered-To: mailing list java-user@lucene.apache.org Received: (qmail 94172 invoked by uid 99); 29 Sep 2010 17:49:06 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Sep 2010 17:49:06 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=10.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: local policy) Received: from [71.6.200.51] (HELO serve.stimulussoft.com) (71.6.200.51) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Sep 2010 17:48:56 +0000 Received: from serve.stimulussoft.com (localhost.localdomain [127.0.0.1]) by serve.stimulussoft.com (Postfix) with ESMTP id 23BDD10739 for ; Wed, 29 Sep 2010 10:48:35 -0700 (PDT) Received: by serve.stimulussoft.com (Postfix, from userid 5001) id 0B9941078B; Wed, 29 Sep 2010 10:48:35 -0700 (PDT) X-Spam-Checker-Version: SpamAssassin 3.2.4 (2008-01-01) on serve.stimulussoft.com X-Spam-Level: Received: from Jamie-Bands-MacBook-Pro.local (196-210-169-220.dynamic.isadsl.co.za [196.210.169.220]) by serve.stimulussoft.com (Postfix) with ESMTPA id 65EE410739 for ; Wed, 29 Sep 2010 10:48:30 -0700 (PDT) Message-ID: <4CA37BD8.8050608@stimulussoft.com> Date: Wed, 29 Sep 2010 19:48:08 +0200 From: Jamie User-Agent: Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10.6; en-US; rv:1.9.2.9) Gecko/20100915 Thunderbird/3.1.4 MIME-Version: 1.0 To: java-user@lucene.apache.org Subject: File Handle Leaks During Lucene 3.0.2 Merge Content-Type: text/plain; charset=ISO-8859-1; format=flowed Content-Transfer-Encoding: 7bit X-Virus-Scanned: ClamAV using ClamSMTP X-Virus-Checked: Checked by ClamAV on apache.org X-Old-Spam-Status: No, score=-4.3 required=5.0 tests=ALL_TRUSTED,AWL,BAYES_00 autolearn=ham version=3.2.4 Hi There I am noticing file handle leaks appearing on Index files. I think the leaks occur during the Lucene merge operation. Lsof reports the following: java 28604 root 213r REG 8,33 1098681 57409621 /var/index/vol201009/_a4w.cfs (deleted) java 28604 root 214r REG 8,33 35164 57409699 /var/index/vol201009/_a4x.cfs (deleted) java 28604 root 215r REG 8,33 46139 57409691 /var/index/vol201009/_a4y.cfs (deleted) java 28604 root 216r REG 8,33 40342 57409673 /var/index/vol201009/_a4z.cfs (deleted) java 28604 root 217r REG 8,33 44204 57409675 /var/index/vol201009/_a50.cfs (deleted) We are using Lucene's realtime search feature so have handles open on the index. Could it be that we are not handling the merge situation correctly or something? Your ideas are most appreciated. The source code to our index file as follows: package com.stimulus.archiva.index; import com.stimulus.util.*; import java.io.File; import java.io.IOException; import java.io.PrintStream; import org.apache.commons.logging.*; import org.apache.lucene.document.Document; import org.apache.lucene.index.*; import org.apache.lucene.store.FSDirectory; import com.stimulus.archiva.domain.Config; import com.stimulus.archiva.exception.*; import com.stimulus.archiva.language.AnalyzerFactory; import com.stimulus.archiva.search.*; import java.util.*; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.AlreadyClosedException; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.*; public class LuceneIndex extends Thread { protected ArrayBlockingQueue queue; protected static final Log logger = LogFactory.getLog(LuceneIndex.class.getName()); protected static final Log indexLog = LogFactory.getLog("indexlog"); IndexWriter writer = null; protected static ScheduledExecutorService scheduler; protected static ScheduledFuture scheduledTask; protected LuceneDocument EXIT_REQ = null; ReentrantLock indexLock = new ReentrantLock(); ArchivaAnalyzer analyzer = new ArchivaAnalyzer(); File indexLogFile; PrintStream indexLogOut; IndexProcessor indexProcessor; String friendlyName; String indexPath; int maxSimultaneousDocs; int indexThreads; IndexReader reader = null; volatile boolean reopen = false; FSDirectory fsDirectory; ReentrantLock readerLock = new ReentrantLock(); enum Status { READY, SHUTDOWN }; Status status = Status.SHUTDOWN; public LuceneIndex(int queueSize, LuceneDocument exitReq, String friendlyName, String indexPath, int maxSimultaneousDocs, int indexThreads) { this.queue = new ArrayBlockingQueue(queueSize); this.EXIT_REQ = exitReq; this.friendlyName = friendlyName; this.indexPath = indexPath; this.maxSimultaneousDocs = maxSimultaneousDocs; this.indexThreads = indexThreads; this.status = Status.SHUTDOWN; // if (indexLog.isDebugEnabled()) { //setLog(friendlyName); //} } public int getMaxSimultaneousDocs() { return maxSimultaneousDocs; } public void setMaxSimultaneousDocs(int maxSimultaneousDocs) { this.maxSimultaneousDocs = maxSimultaneousDocs; } public ReentrantLock getIndexLock() { return indexLock; } protected void setLog(String logName) { try { indexLogFile = getIndexLogFile(logName); if (indexLogFile!=null) { if (indexLogFile.length()>10485760) indexLogFile.delete(); indexLogOut = new PrintStream(indexLogFile); } logger.debug("set index log file path {path='"+indexLogFile.getCanonicalPath()+"'}"); } catch (Exception e) { logger.error("failed to open index log file:"+e.getMessage(),e); } } protected File getIndexLogFile(String logName) { try { String logfilepath = Config.getFileSystem().getLogPath()+File.separator+"indexdebug_"+logName+".log"; return new File(logfilepath); } catch (Exception e) { logger.error("failed to open index log file:"+e.getMessage(),e); return null; } } protected void openIndex() throws MessageSearchException { Exception lastError = null; if (writer==null) { logger.debug("openIndex() index "+friendlyName+" will be opened. it is currently closed."); } else { logger.debug("openIndex() did not bother opening index "+friendlyName+". it is already open."); return; } logger.debug("opening index "+friendlyName+" for write"); logger.debug("opening search index "+friendlyName+" for write {indexpath='"+indexPath+"'}"); boolean writelock; int attempt = 0; int maxattempt = 10; if (Config.getConfig().getIndex().getMultipleIndexProcesses()) { maxattempt = 10000; } else { maxattempt = 10; } do { writelock = false; try { fsDirectory = FSDirectory.open(new File(indexPath)); int maxIndexChars = Config.getConfig().getIndex().getMaxIndexPerFieldChars(); writer = new IndexWriter(fsDirectory,analyzer,new IndexWriter.MaxFieldLength(maxIndexChars)); if (indexLog.isDebugEnabled() && indexLogOut!=null) { writer.setInfoStream(indexLogOut); } } catch (LockObtainFailedException lobfe) { logger.debug("write lock on index "+friendlyName+". will reopen in 50ms."); try { Thread.sleep(50); } catch (Exception e) {} attempt++; writelock = true; } catch (CorruptIndexException cie) { throw new MessageSearchException("index "+friendlyName+" appears to be corrupt. please reindex the active volume."+cie.getMessage(),logger); } catch (Throwable io) { throw new MessageSearchException("failed to write document to index "+friendlyName+":"+io.getMessage(),logger); } } while (writelock && attempt=10000) throw new MessageSearchException("failed to open index "+friendlyName+" writer {indexPath='"+indexPath+"'}",lastError,logger); } public void indexDocument(LuceneDocument luceneDocument) throws MessageSearchException { logger.debug("index document {"+luceneDocument+"}"); if (status==Status.SHUTDOWN) { throw new MessageSearchException("index is shutdown.",logger); } long s = (new Date()).getTime(); if (luceneDocument == null) throw new MessageSearchException("assertion failure: null document",logger); try { queue.put(luceneDocument); } catch (InterruptedException ie) { throw new MessageSearchException("failed to add document to queue:"+ie.getMessage(),ie,logger); } logger.debug("document indexed successfully {"+luceneDocument+"}"); logger.debug("indexing message end {"+luceneDocument+"}"); long e = (new Date()).getTime(); logger.debug("indexing time {time='"+(e-s)+"'}"); } public class DocWriter implements Runnable { LuceneDocument doc; String language; LinkedList pushbacks; ReentrantLock pushbackLock; public DocWriter(LuceneDocument doc,String language,LinkedList pushbacks, ReentrantLock pushbackLock) { this.doc = doc; this.language = language; this.pushbacks = pushbacks; } public void run() { try { writer.addDocument(doc.getDocument(),AnalyzerFactory.getAnalyzer(language,AnalyzerFactory.Operation.INDEX)); } catch (IOException io) { logger.error("failed to add document to index:"+io.getMessage(),io); } catch (AlreadyClosedException e) { try { pushbackLock.lock(); pushbacks.add(doc); } finally { pushbackLock.unlock(); } } } } public class IndexProcessor extends Thread { public IndexProcessor() { setName("index processor"); } public void run() { boolean exit = false; LuceneDocument luceneDocument = null; LinkedList pushbacks = new LinkedList(); ReentrantLock pushbackLock = new ReentrantLock(); while (!exit) { //documentPool = Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads()); luceneDocument = null; try { luceneDocument = (LuceneDocument) queue.take(); } catch (InterruptedException e) { logger.debug("index exit req received. exiting"); exit = true; continue; } if (luceneDocument==EXIT_REQ) { logger.debug("index exit req received. exiting"); exit = true; continue; } try { indexLock.lock(); if (luceneDocument==null) { logger.debug("index info is null"); } int i = 0; ExecutorService threadPool = Executors.newFixedThreadPool(indexThreads,ThreadUtil.getFlexibleThreadFactory("indexwritepool",Thread.NORM_PRIORITY,true)); while(luceneDocument!=null && i0) { for (LuceneDocument pushback : pushbacks) { try { writer.addDocument(pushback.getDocument()); } catch (IOException io) { logger.error("failed to add document to index:"+io.getMessage(),io); } catch (AlreadyClosedException e) { pushbacks.add(pushback); } i++; } } } finally { pushbackLock.unlock(); } logger.debug("index commit"); try { if (writer!=null) { writer.commit(); } } catch (Exception e) { logger.error("failed to commit index:"+e.getMessage(),e); try { readerLock.lock(); closeIndex(); openIndex(); } finally { readerLock.unlock(); } } } catch (Throwable ie) { logger.error("index write interrupted:"+ie.getMessage(),ie); } finally { indexLock.unlock(); } } logger.debug("exit indexer"); } public class IndexDocument extends Thread { LuceneDocument luceneDocument = null; List pushbacks = null; public IndexDocument(LuceneDocument luceneDocument,List pushbacks) { this.luceneDocument = luceneDocument; this.pushbacks = pushbacks; setName("index document"); } public void run() { try { writer.addDocument(luceneDocument.getDocument()); } catch (IOException io) { logger.error("failed to add document to index:"+io.getMessage(),io); } catch (AlreadyClosedException e) { pushbacks.add(luceneDocument); } catch (Throwable t) { logger.error("failed to add document to index:"+t.getMessage(),t); } }}; } protected void closeIndex() { try { indexLock.lock(); if (writer!=null) { writer.close(); } if (fsDirectory!=null) { fsDirectory.close(); } } catch (Throwable io) { logger.error("failed to close index writer:"+io.getMessage(),io); } finally { writer = null; indexLock.unlock(); } } public void optimize() throws MessageSearchException { logger.debug("optimize volume"); try { indexLock.lock(); try { writer.optimize(false); } catch (Exception io) { throw new MessageSearchException("failed to optimize the index:"+io.getMessage(),io,logger); } } catch (Throwable t) { // diskspace problems could arise logger.error("failed to optimize index:"+t.getMessage(),t); } finally { indexLock.unlock(); } } public void deleteDocs(Term[] terms) throws MessageSearchException { logger.debug("delete docs"); if (status==Status.SHUTDOWN) { throw new MessageSearchException("index is shutdown.",logger); } try { indexLock.lock(); openIndex(); try { writer.deleteDocuments(terms); } catch (Exception e) { throw new MessageSearchException("failed to delete doc from index:"+e.getMessage(),e,logger); } finally { try { writer.commit(); writer.expungeDeletes(false); } catch (Exception io) { throw new MessageSearchException("failed to expunge docs from index:"+io.getMessage(),io,logger); } } } catch (Throwable t) { logger.error("failed to delete docs from index."+t.getMessage(),t); } finally { indexLock.unlock(); } } public void deleteIndex() throws MessageSearchException { logger.debug("delete index {indexpath='"+indexPath+"'}"); try { indexLock.lock(); closeIndex(); File indexFile = new File(indexPath); //deleteDirContents(indexFile); try { int maxIndexChars = Config.getConfig().getIndex().getMaxIndexPerFieldChars(); writer = new IndexWriter(FSDirectory.open(indexFile),analyzer,true,new IndexWriter.MaxFieldLength(maxIndexChars)); } catch (Throwable cie) { logger.error("failed to delete index {index='"+indexPath+"'}",cie); return; } finally { try { writer.close(); } catch (Exception e) { logger.debug("failed to close writer:"+e.getMessage()); } writer = null; } } finally { openIndex(); indexLock.unlock(); } } public void startup() throws MessageSearchException { logger.debug("luceneindex is starting up"); File lockFile = new File(indexPath+File.separatorChar + "write.lock"); if (lockFile.exists()) { if (Config.getConfig().getIndex().getMultipleIndexProcesses()) { logger.debug("index lock file detected on volumeindex startup."); } else { logger.warn("index lock file detected. the server was shutdown incorrectly. automatically deleting lock file. {lockFile='"+lockFile.getPath()+"'}"); lockFile.delete(); } } openIndex(); scheduler = Executors.newScheduledThreadPool(1,ThreadUtil.getFlexibleThreadFactory("index reopen",Thread.NORM_PRIORITY-1,true)); scheduledTask = scheduler.scheduleWithFixedDelay(new Runnable() { public void run() { reopen=true; }},1,1,TimeUnit.SECONDS); indexProcessor = new IndexProcessor(); indexProcessor.start(); Runtime.getRuntime().addShutdownHook(this); status = Status.READY; } public IndexReader getReader() throws MessageSearchException { if (status==Status.SHUTDOWN) { throw new MessageSearchException("index is shutdown.",logger); } readerLock.lock(); try { if (writer==null) { throw new MessageSearchException("cannot retrieve reader. writer is closed (or null)",logger); } if (reader == null) { reader = new VolumeIndexReader(writer.getReader(5)); } else { try { if (reopen) { reader = new VolumeIndexReader(writer.getReader(5)); reopen = false; } } catch (AlreadyClosedException ace) { logger.debug("reader was found closed. reopening"); reader = new VolumeIndexReader(writer.getReader(5)); } } } catch (IOException io) { throw new MessageSearchException("failed to retrieve reader from writer:"+io.getMessage(),io,logger); } finally { readerLock.unlock(); } return reader; } public void shutdown() { status = Status.SHUTDOWN; try { queue.put(EXIT_REQ); } catch (InterruptedException e) {} if (reader!=null) { try { reader.close(); } catch (Exception e) { logger.error("failed to close index reader:"+e.getMessage()); } } reader = null; if (scheduler!=null) { scheduler.shutdown(); } closeIndex(); indexProcessor.interrupt(); if (scheduler!=null) { scheduler.shutdownNow(); } } @Override public void run() { shutdown(); } public interface LuceneDocument { public String toString(); public Document getDocument(); public void finalize(); } public static void deleteDirContents(File path) { if( path.exists() ) { File[] files = path.listFiles(); for(int i=0; i