lucene-java-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Glen Newton <glen.new...@gmail.com>
Subject Re: Lucene index write performance optimization
Date Tue, 10 Nov 2009 16:54:40 GMT
You might try re-implementing, using ThreadPoolExecutor
http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.html

glen

2009/11/10 Jamie Band <jamie@stimulussoft.com>:
> Hi There
>
> Our app spends alot of time waiting for Lucene to finish writing to the
> index. I'd like to minimize this. If you have a moment to spare, please let
> me know if my LuceneIndex class presented below can be improved upon.
>
> It is used in the following way:
>
> luceneIndex = new
> LuceneIndex(Config.getConfig().getIndex().getIndexBacklog(),
>                                           exitReq,volume.getID()+"
> indexer",volume.getIndexPath(),
>
> Config.getConfig().getIndex().getMaxSimultaneousDocs());
> Document doc = new Document();
> IndexInfo indexInfo = new IndexInfo(doc);
> luceneIndex.indexDocument(indexInfo);
>
> As an aside note, is there any way for Lucene to support simultaneous writes
> to an index? For example, each write threads could write to a separate
> shard, after a period the shared could be merged into a single index? Or is
> this overkill? I am interested hear the opinion of the Lucene experts.
>
> Thanks in advance
>
> Jamie
>
> package com.stimulus.archiva.index;
>
> import java.io.File;
> import java.io.IOException;
> import java.io.PrintStream;
> import org.apache.commons.logging.*;
> import org.apache.lucene.document.Document;
> import org.apache.lucene.index.*;
> import org.apache.lucene.store.FSDirectory;
> import java.util.*;
> import org.apache.lucene.store.LockObtainFailedException;
> import org.apache.lucene.store.AlreadyClosedException;
> import java.util.concurrent.locks.ReentrantLock;
> import java.util.concurrent.*;
>
> public class LuceneIndex extends Thread {
>          protected ArrayBlockingQueue<LuceneDocument> queue;
>        protected static final Log logger =
> LogFactory.getLog(LuceneIndex.class.getName());
>        protected static final Log indexLog = LogFactory.getLog("indexlog");
>           IndexWriter writer = null;
>           protected static ScheduledExecutorService scheduler;
>        protected static ScheduledFuture<?> scheduledTask;
>        protected LuceneDocument EXIT_REQ = null;
>        ReentrantLock indexLock = new ReentrantLock();
>        ArchivaAnalyzer analyzer     = new ArchivaAnalyzer();
>        File indexLogFile;
>        PrintStream indexLogOut;
>        IndexProcessor indexProcessor;
>        String friendlyName;
>        String indexPath;
>        int maxSimultaneousDocs;
>                   public LuceneIndex(int queueSize, LuceneDocument exitReq,
>                                String friendlyName, String indexPath,
int
>  maxSimultaneousDocs) {
>               this.queue = new
> ArrayBlockingQueue<LuceneDocument>(queueSize);
>               this.EXIT_REQ = exitReq;
>               this.friendlyName = friendlyName;
>               this.indexPath = indexPath;
>               this.maxSimultaneousDocs = maxSimultaneousDocs;
>               setLog(friendlyName);
>           }
>                             public int getMaxSimultaneousDocs() {
>             return maxSimultaneousDocs;
>         }
>                 public void setMaxSimultaneousDocs(int maxSimultaneousDocs)
> {
>             this.maxSimultaneousDocs = maxSimultaneousDocs;
>         }
>                             public ReentrantLock getIndexLock() {
>             return indexLock;
>         }
>             protected void setLog(String logName) {
>
>               try {
>                   indexLogFile = getIndexLogFile(logName);
>                   if (indexLogFile!=null) {
>                       if (indexLogFile.length()>10485760)
>                           indexLogFile.delete();
>                       indexLogOut = new PrintStream(indexLogFile);
>                   }
>                   logger.debug("set index log file path
> {path='"+indexLogFile.getCanonicalPath()+"'}");
>               } catch (Exception e) {
>                   logger.error("failed to open index log
> file:"+e.getMessage(),e);
>               }
>         }
>               protected File getIndexLogFile(String logName) {
>              try {
>                   String logfilepath =
> Config.getFileSystem().getLogPath()+File.separator+logName+"index.log";
>                   return new File(logfilepath);
>               } catch (Exception e) {
>                   logger.error("failed to open index log
> file:"+e.getMessage(),e);
>                   return null;
>               }
>         }
>                             protected void openIndex() throws
> MessageSearchException {
>           Exception lastError = null;
>                     if (writer==null) {
>               logger.debug("openIndex() index "+friendlyName+" will be
> opened. it is currently closed.");
>           } else {
>               logger.debug("openIndex() did not bother opening index
> "+friendlyName+". it is already open.");
>               return;
>           }
>           logger.debug("opening index "+friendlyName+" for write");
>           logger.debug("opening search index "+friendlyName+" for write
> {indexpath='"+indexPath+"'}");
>           boolean writelock;
>           int attempt = 0;
>           int maxattempt = 10;
>                     if
> (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
>               maxattempt = 10000;
>            } else {
>               maxattempt = 10;
>            }
>                     do {
>               writelock = false;
>               try {
>                       FSDirectory fsDirectory =
> FSDirectory.getDirectory(indexPath);
>                       int maxIndexChars =
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
>                       writer = new IndexWriter(fsDirectory,analyzer,new
> IndexWriter.MaxFieldLength(maxIndexChars));
>                       if (indexLog.isDebugEnabled() && indexLogOut!=null)
{
>                           writer.setInfoStream(indexLogOut);
>                       }
>               } catch (LockObtainFailedException lobfe) {
>                       logger.debug("write lock on index "+friendlyName+".
> will reopen in 50ms.");
>                       try { Thread.sleep(50); } catch (Exception e) {}
>                       attempt++;
>                       writelock = true;
>               } catch (CorruptIndexException cie) {
>                   throw new MessageSearchException("index "+friendlyName+"
> appears to be corrupt. please reindex the active
> volume."+cie.getMessage(),logger);
>               } catch (Throwable io) {
>                   throw new MessageSearchException("failed to write document
> to index "+friendlyName+":"+io.getMessage(),logger);
>               }
>          } while (writelock && attempt<maxattempt);
>          if (attempt>=10000)
>            throw new MessageSearchException("failed to open index
> "+friendlyName+" writer {indexPath='"+indexPath+"'}",lastError,logger);
>       }
>             public void indexDocument(LuceneDocument luceneDocument) throws
> MessageSearchException {
>           logger.debug("index document {"+luceneDocument+"}");
>           long s = (new Date()).getTime();
>           if (luceneDocument == null)
>               throw new MessageSearchException("assertion failure: null
> document",logger);
>           try {
>               queue.put(luceneDocument);
>           } catch (InterruptedException ie) {
>               throw new MessageSearchException("failed to add document to
> queue:"+ie.getMessage(),ie,logger);
>           }
>           logger.debug("document indexed successfully
> {"+luceneDocument+"}");
>                     logger.debug("indexing message end
> {"+luceneDocument+"}");
>           long e = (new Date()).getTime();
>           logger.debug("indexing time {time='"+(e-s)+"'}");
>       }
>         public class IndexProcessor extends Thread {
>                     public IndexProcessor() {
>               setName("index processor");
>           }
>               public void run() {
>               boolean exit = false;
>               //ExecutorService documentPool;
>               // we abandoned pool as it does not seem to offer any major
> performance benefit
>               LuceneDocument luceneDocument = null;
> LinkedList<LuceneDocument> pushbacks = new LinkedList<LuceneDocument>();
>                             while (!exit) {
>                                     try {
>                       //documentPool =
> Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
>                       luceneDocument = null;
> luceneDocument = (LuceneDocument) queue.take();
>                                             indexLock.lock();
>                                             if (luceneDocument==EXIT_REQ)
{
>                           logger.debug("index exit req received. exiting");
>                           exit = true;
>                           continue;
>                       }
>                                                       try
{
>                            openIndex();
>                       } catch (Exception e) {
>                            logger.error("failed to open
> index:"+e.getMessage(),e);
>                            return;
>                       }
>                       if (luceneDocument==null) {
>                           logger.debug("index info is null");
>                       }
>                       int i = 0;
>                       while(luceneDocument!=null && i<maxSimultaneousDocs)
{
>                           try {
>                               Document doc = luceneDocument.getDocument();
>                               String language = doc.get("lang");
>                               if (language==null) {
>                                   language =
> Config.getConfig().getIndex().getIndexLanguage();
>                               }
>
> writer.addDocument(doc,AnalyzerFactory.getAnalyzer(language,AnalyzerFactory.Operation.INDEX));
>                           } catch (IOException io) {
>                               logger.error("failed to add document to
> index:"+io.getMessage(),io);
>                           } catch (AlreadyClosedException e) {
>                               pushbacks.add(luceneDocument);
>                               break;
>                           }
>                             //documentPool.execute(new
> IndexDocument(luceneDocument,pushbacks));
>                                                      i++;
>                            if (i<maxSimultaneousDocs) {
>                                luceneDocument = (LuceneDocument)
> queue.poll();
>                                                          
     if
> (luceneDocument==null) {
>                                       logger.debug("index info is
null");
>                                }
>                                                          
     if
> (luceneDocument==EXIT_REQ) {
>                                        logger.debug("index exit req
> received. exiting (2)");
>                                       exit = true;
>                                       break;
>                                 }
>                            }
>                                                   }
>                       if (pushbacks.size()>0) {
>                             closeIndex();
>                             try {
>                                    openIndex();
>                             } catch (Exception e) {
>                                logger.error("failed to open
> index:"+e.getMessage(),e);
>                                return;
>                             }
>                             for (LuceneDocument pushback : pushbacks) {
>                                   try {
>
> writer.addDocument(pushback.getDocument());
>                                   } catch (IOException io) {
>                                       logger.error("failed to add
document
> to index:"+io.getMessage(),io);
>                                   } catch (AlreadyClosedException e)
{
>                                       pushbacks.add(pushback);
>                                   }
>                                   //documentPool.execute(new
> IndexDocument(pushback,pushbacks));
>                                   i++;
>                             }
>                       }
>                                             //documentPool.shutdown();
>                       //documentPool.awaitTermination(30,TimeUnit.MINUTES);
>                                          } catch (Throwable ie)
{
>                        logger.error("index write
> interrupted:"+ie.getMessage());
>                    } finally {
>                          closeIndex();
>                         indexLock.unlock();
>                   }
>               }                }
>                     public class IndexDocument extends Thread {
>                                 LuceneDocument luceneDocument = null;
>                   List<LuceneDocument> pushbacks = null;
>                                     public IndexDocument(LuceneDocument
> luceneDocument,List<LuceneDocument> pushbacks) {
>                       this.luceneDocument = luceneDocument;
>                       this.pushbacks = pushbacks;
>                       setName("index document");
>                   }
>                                 public void run() {
>                       try {
>                           writer.addDocument(luceneDocument.getDocument());
>                       } catch (IOException io) {
>                           logger.error("failed to add document to
> index:"+io.getMessage(),io);
>                       } catch (AlreadyClosedException e) {
>                           pushbacks.add(luceneDocument);
>                       } catch (Throwable t) {
>                           logger.error("failed to add document to
> index:"+t.getMessage(),t);
>                       }
>                   }};
>           }
>           protected void closeIndex() {
>            try {
>                indexLock.lock();
>                if (writer!=null) {
>                   writer.close();
>                }
>            } catch (Throwable io) {
>               logger.error("failed to close index
> writer:"+io.getMessage(),io);
>            } finally {
>                logger.debug("writer closed");
>                writer = null;
>                indexLock.unlock();
>            }
>       }
>           public void deleteIndex() throws MessageSearchException {
>                  logger.debug("delete index {indexpath='"+indexPath+"'}");
>                  try {
>                     indexLock.lock();
>                    try {
>                       int maxIndexChars =
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
>                       writer = new
> IndexWriter(FSDirectory.getDirectory(indexPath),analyzer,true,new
> IndexWriter.MaxFieldLength(maxIndexChars));
>                    } catch (Throwable cie) {
>                        logger.error("failed to delete index
> {index='"+indexPath+"'}",cie);
>                        return;
>                    }
>                    MessageIndex.volumeIndexes.remove(this);
>                 } finally {
>                       closeIndex();
>                     indexLock.unlock();
>               }
>         }
>               public void startup() {
>           logger.debug("volumeindex is starting up");
>           File lockFile = new File(indexPath+File.separatorChar +
> "write.lock");
>           if (lockFile.exists()) {
>               logger.warn("The server lock file already exists. Either
> another indexer is running or the server was not shutdown correctly.");
>               logger.warn("If it is the latter, the lock file must be
> manually deleted at "+lockFile.getAbsolutePath());
>               if (Config.getConfig().getIndex().getMultipleIndexProcesses())
> {
>                   logger.debug("index lock file detected on volumeindex
> startup.");
>               } else {
>                   logger.warn("index lock file detected. the server was
> shutdown incorrectly. automatically deleting lock file.");
>                   logger.warn("indexer is configured to deal with only one
> indexer process.");
>                   logger.warn("if you are running more than one indexer,
> your index could be subject to corruption.");
>                   lockFile.delete();
>               }
>           }
>           indexProcessor = new IndexProcessor();
>           indexProcessor.start();
>           Runtime.getRuntime().addShutdownHook(this);
>                   }
>                 public void shutdown() {
>             logger.debug("volumeindex is shutting down");
>             queue.add(EXIT_REQ);
>             scheduler.shutdownNow();
>                   }
>                 @Override
>         public void run() {
>             queue.add(EXIT_REQ);
>         }
>                         public interface LuceneDocument {
>                         public String toString();
>             public Document getDocument();
>             public void finalize();
>                     }
>    }
>
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
> For additional commands, e-mail: java-user-help@lucene.apache.org
>
>



-- 

-
Mime
View raw message