lucene-java-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jamie Band <ja...@stimulussoft.com>
Subject Lucene index write performance optimization
Date Tue, 10 Nov 2009 16:43:30 GMT
Hi There

Our app spends alot of time waiting for Lucene to finish writing to the 
index. I'd like to minimize this. If you have a moment to spare, please 
let me know if my LuceneIndex class presented below can be improved upon.

It is used in the following way:

 luceneIndex = new 
LuceneIndex(Config.getConfig().getIndex().getIndexBacklog(),
                                            exitReq,volume.getID()+" 
indexer",volume.getIndexPath(),
                                            
Config.getConfig().getIndex().getMaxSimultaneousDocs());
Document doc = new Document();
IndexInfo indexInfo = new IndexInfo(doc);
luceneIndex.indexDocument(indexInfo);

As an aside note, is there any way for Lucene to support simultaneous 
writes to an index? For example, each write threads could write to a 
separate shard, after a period the shared could be merged into a single 
index? Or is this overkill? I am interested hear the opinion of the 
Lucene experts.

Thanks in advance

Jamie

package com.stimulus.archiva.index;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import org.apache.commons.logging.*;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.*;
import org.apache.lucene.store.FSDirectory;
import java.util.*;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.AlreadyClosedException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.*;

public class LuceneIndex extends Thread {
   
         protected ArrayBlockingQueue<LuceneDocument> queue;
         protected static final Log logger = 
LogFactory.getLog(LuceneIndex.class.getName());
         protected static final Log indexLog = 
LogFactory.getLog("indexlog");
            IndexWriter writer = null;
            protected static ScheduledExecutorService scheduler;
         protected static ScheduledFuture<?> scheduledTask;
         protected LuceneDocument EXIT_REQ = null;
         ReentrantLock indexLock = new ReentrantLock();
         ArchivaAnalyzer analyzer     = new ArchivaAnalyzer();
         File indexLogFile;
         PrintStream indexLogOut;
         IndexProcessor indexProcessor;
         String friendlyName;
         String indexPath;
         int maxSimultaneousDocs;
         
            public LuceneIndex(int queueSize, LuceneDocument exitReq,
                                 String friendlyName, String indexPath, 
int  maxSimultaneousDocs) {
                this.queue = new 
ArrayBlockingQueue<LuceneDocument>(queueSize);
                this.EXIT_REQ = exitReq;
                this.friendlyName = friendlyName;
                this.indexPath = indexPath;
                this.maxSimultaneousDocs = maxSimultaneousDocs;
                setLog(friendlyName);
            }
           
           
          public int getMaxSimultaneousDocs() {
              return maxSimultaneousDocs;
          }
         
          public void setMaxSimultaneousDocs(int maxSimultaneousDocs) {
              this.maxSimultaneousDocs = maxSimultaneousDocs;
          }
           
           
          public ReentrantLock getIndexLock() {
              return indexLock;
          }
     
          protected void setLog(String logName) {

                try {
                    indexLogFile = getIndexLogFile(logName);
                    if (indexLogFile!=null) {
                        if (indexLogFile.length()>10485760)
                            indexLogFile.delete();
                        indexLogOut = new PrintStream(indexLogFile);
                    }
                    logger.debug("set index log file path 
{path='"+indexLogFile.getCanonicalPath()+"'}");
                } catch (Exception e) {
                    logger.error("failed to open index log 
file:"+e.getMessage(),e);
                }
          }
       
          protected File getIndexLogFile(String logName) {
               try {
                    String logfilepath = 
Config.getFileSystem().getLogPath()+File.separator+logName+"index.log";
                    return new File(logfilepath);
                } catch (Exception e) {
                    logger.error("failed to open index log 
file:"+e.getMessage(),e);
                    return null;
                }
          }
         
        
       
          protected void openIndex() throws MessageSearchException {
            Exception lastError = null;
           
            if (writer==null) {
                logger.debug("openIndex() index "+friendlyName+" will be 
opened. it is currently closed.");
            } else {
                logger.debug("openIndex() did not bother opening index 
"+friendlyName+". it is already open.");
                return;
            }
            logger.debug("opening index "+friendlyName+" for write");
            logger.debug("opening search index "+friendlyName+" for 
write {indexpath='"+indexPath+"'}");
            boolean writelock;
            int attempt = 0;
            int maxattempt = 10;
           
            if (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
                maxattempt = 10000;
             } else {
                maxattempt = 10;
             }
           
            do {
                writelock = false;
                try {
                        FSDirectory fsDirectory = 
FSDirectory.getDirectory(indexPath);
                        int maxIndexChars = 
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
                        writer = new 
IndexWriter(fsDirectory,analyzer,new 
IndexWriter.MaxFieldLength(maxIndexChars));
                        if (indexLog.isDebugEnabled() && 
indexLogOut!=null) {
                            writer.setInfoStream(indexLogOut);
                        }
                } catch (LockObtainFailedException lobfe) {
                        logger.debug("write lock on index 
"+friendlyName+". will reopen in 50ms.");
                        try { Thread.sleep(50); } catch (Exception e) {}
                        attempt++;
                        writelock = true;
                } catch (CorruptIndexException cie) {
                    throw new MessageSearchException("index 
"+friendlyName+" appears to be corrupt. please reindex the active 
volume."+cie.getMessage(),logger);
                } catch (Throwable io) {
                    throw new MessageSearchException("failed to write 
document to index "+friendlyName+":"+io.getMessage(),logger);
                }
           } while (writelock && attempt<maxattempt);
           if (attempt>=10000)
             throw new MessageSearchException("failed to open index 
"+friendlyName+" writer {indexPath='"+indexPath+"'}",lastError,logger);
        }
       
        public void indexDocument(LuceneDocument luceneDocument) throws 
MessageSearchException {
            logger.debug("index document {"+luceneDocument+"}");
            long s = (new Date()).getTime();
            if (luceneDocument == null)
                throw new MessageSearchException("assertion failure: 
null document",logger);
            try {
                queue.put(luceneDocument);
            } catch (InterruptedException ie) {
                throw new MessageSearchException("failed to add document 
to queue:"+ie.getMessage(),ie,logger);
            }
            logger.debug("document indexed successfully 
{"+luceneDocument+"}");
           
            logger.debug("indexing message end {"+luceneDocument+"}");
            long e = (new Date()).getTime();
            logger.debug("indexing time {time='"+(e-s)+"'}");
        }
   
        public class IndexProcessor extends Thread {
           
            public IndexProcessor() {
                setName("index processor");
            }
     
            public void run() {
                boolean exit = false;
                //ExecutorService documentPool;
                // we abandoned pool as it does not seem to offer any 
major performance benefit
                LuceneDocument luceneDocument = null;   
                LinkedList<LuceneDocument> pushbacks = new 
LinkedList<LuceneDocument>();
               
                while (!exit) {
                   
                    try {
                        //documentPool = 
Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
                        luceneDocument = null;   
                        luceneDocument = (LuceneDocument) queue.take();
                       
                        indexLock.lock();
                       
                        if (luceneDocument==EXIT_REQ) {
                            logger.debug("index exit req received. 
exiting");
                            exit = true;
                            continue;
                        }
                   
               
                        try {
                             openIndex();
                        } catch (Exception e) {
                             logger.error("failed to open 
index:"+e.getMessage(),e);
                             return;
                        }
                        if (luceneDocument==null) {
                            logger.debug("index info is null");
                        }
                        int i = 0;
                        while(luceneDocument!=null && 
i<maxSimultaneousDocs) {
                            try {
                                Document doc = luceneDocument.getDocument();
                                String language = doc.get("lang");
                                if (language==null) {
                                    language = 
Config.getConfig().getIndex().getIndexLanguage();
                                }
                                
writer.addDocument(doc,AnalyzerFactory.getAnalyzer(language,AnalyzerFactory.Operation.INDEX));
                            } catch (IOException io) {
                                logger.error("failed to add document to 
index:"+io.getMessage(),io);
                            } catch (AlreadyClosedException e) {
                                pushbacks.add(luceneDocument);
                                break;
                            }
                              //documentPool.execute(new 
IndexDocument(luceneDocument,pushbacks));
                           
                             i++;
                             if (i<maxSimultaneousDocs) {
                                 luceneDocument = (LuceneDocument) 
queue.poll();
                                 
                                 if (luceneDocument==null) {
                                        logger.debug("index info is null");
                                 }
                                 
                                 if (luceneDocument==EXIT_REQ) {
                                         logger.debug("index exit req 
received. exiting (2)");
                                        exit = true;
                                        break;
                                  }
                             }
                             
                        }
                        if (pushbacks.size()>0) {
                              closeIndex();
                              try {
                                     openIndex();
                              } catch (Exception e) {
                                 logger.error("failed to open 
index:"+e.getMessage(),e);
                                 return;
                              }
                              for (LuceneDocument pushback : pushbacks) {
                                    try {
                                        
writer.addDocument(pushback.getDocument());
                                    } catch (IOException io) {
                                        logger.error("failed to add 
document to index:"+io.getMessage(),io);
                                    } catch (AlreadyClosedException e) {
                                        pushbacks.add(pushback);
                                    }
                                    //documentPool.execute(new 
IndexDocument(pushback,pushbacks));
                                    i++;
                              }
                        }
                       
                        //documentPool.shutdown();
                        
//documentPool.awaitTermination(30,TimeUnit.MINUTES);
                       
                     } catch (Throwable ie) {
                         logger.error("index write 
interrupted:"+ie.getMessage());
                     } finally {
                           closeIndex();
                          indexLock.unlock();
                    }
                }   
               }
           
            public class IndexDocument extends Thread {
               
                    LuceneDocument luceneDocument = null;
                    List<LuceneDocument> pushbacks = null;
                   
                    public IndexDocument(LuceneDocument 
luceneDocument,List<LuceneDocument> pushbacks) {
                        this.luceneDocument = luceneDocument;
                        this.pushbacks = pushbacks;
                        setName("index document");
                    }
               
                    public void run() {
                        try {
                            
writer.addDocument(luceneDocument.getDocument());
                        } catch (IOException io) {
                            logger.error("failed to add document to 
index:"+io.getMessage(),io);
                        } catch (AlreadyClosedException e) {
                            pushbacks.add(luceneDocument);
                        } catch (Throwable t) {
                            logger.error("failed to add document to 
index:"+t.getMessage(),t);
                        }
                    }};
            }
     
        protected void closeIndex() {
             try {
                 indexLock.lock();
                 if (writer!=null) {
                    writer.close();
                 }
             } catch (Throwable io) {
                logger.error("failed to close index 
writer:"+io.getMessage(),io);
             } finally {
                 logger.debug("writer closed");
                 writer = null;
                 indexLock.unlock();
             }
        }
   
          public void deleteIndex() throws MessageSearchException {
                   logger.debug("delete index {indexpath='"+indexPath+"'}");
                   try {
                      indexLock.lock();
                     try {
                        int maxIndexChars = 
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
                        writer = new 
IndexWriter(FSDirectory.getDirectory(indexPath),analyzer,true,new 
IndexWriter.MaxFieldLength(maxIndexChars));
                     } catch (Throwable cie) {
                         logger.error("failed to delete index 
{index='"+indexPath+"'}",cie);
                         return;
                     }
                     MessageIndex.volumeIndexes.remove(this);
                  } finally {
                        closeIndex();
                      indexLock.unlock();
                }
          }
       
          public void startup() {
            logger.debug("volumeindex is starting up");
            File lockFile = new File(indexPath+File.separatorChar + 
"write.lock");
            if (lockFile.exists()) {
                logger.warn("The server lock file already exists. Either 
another indexer is running or the server was not shutdown correctly.");
                logger.warn("If it is the latter, the lock file must be 
manually deleted at "+lockFile.getAbsolutePath());
                if 
(Config.getConfig().getIndex().getMultipleIndexProcesses()) {
                    logger.debug("index lock file detected on 
volumeindex startup.");
                } else {
                    logger.warn("index lock file detected. the server 
was shutdown incorrectly. automatically deleting lock file.");
                    logger.warn("indexer is configured to deal with only 
one indexer process.");
                    logger.warn("if you are running more than one 
indexer, your index could be subject to corruption.");
                    lockFile.delete();
                }
            }
            indexProcessor = new IndexProcessor();
            indexProcessor.start();
            Runtime.getRuntime().addShutdownHook(this);
           
          }
         
          public void shutdown() {
              logger.debug("volumeindex is shutting down");
              queue.add(EXIT_REQ);
              scheduler.shutdownNow();
            
          }
         
          @Override
          public void run() {
              queue.add(EXIT_REQ);
          }
         
         
          public interface LuceneDocument {
             
              public String toString();
              public Document getDocument();
              public void finalize();
             
          }
     
}

   




---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
For additional commands, e-mail: java-user-help@lucene.apache.org


Mime
View raw message