lucene-java-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Otis Gospodnetic <otis_gospodne...@yahoo.com>
Subject Re: Lucene index write performance optimization
Date Wed, 11 Nov 2009 04:17:23 GMT
This is what we have in Lucene in Action 2:

~/lia2$ ff \*Thread\*java
./src/lia/admin/CreateThreadedIndexTask.java
./src/lia/admin/ThreadedIndexWriter.java

 Otis
--
Sematext is hiring -- http://sematext.com/about/jobs.html?mls
Lucene, Solr, Nutch, Katta, Hadoop, HBase, UIMA, NLP, NER, IR



----- Original Message ----
> From: Jamie Band <jamie@stimulussoft.com>
> To: java-user@lucene.apache.org
> Sent: Tue, November 10, 2009 11:43:30 AM
> Subject: Lucene index write performance optimization
> 
> Hi There
> 
> Our app spends alot of time waiting for Lucene to finish writing to the index. 
> I'd like to minimize this. If you have a moment to spare, please let me know if 
> my LuceneIndex class presented below can be improved upon.
> 
> It is used in the following way:
> 
> luceneIndex = new LuceneIndex(Config.getConfig().getIndex().getIndexBacklog(),
>                                            exitReq,volume.getID()+" 
> indexer",volume.getIndexPath(),
>                                           
> Config.getConfig().getIndex().getMaxSimultaneousDocs());
> Document doc = new Document();
> IndexInfo indexInfo = new IndexInfo(doc);
> luceneIndex.indexDocument(indexInfo);
> 
> As an aside note, is there any way for Lucene to support simultaneous writes to 
> an index? For example, each write threads could write to a separate shard, after 
> a period the shared could be merged into a single index? Or is this overkill? I 
> am interested hear the opinion of the Lucene experts.
> 
> Thanks in advance
> 
> Jamie
> 
> package com.stimulus.archiva.index;
> 
> import java.io.File;
> import java.io.IOException;
> import java.io.PrintStream;
> import org.apache.commons.logging.*;
> import org.apache.lucene.document.Document;
> import org.apache.lucene.index.*;
> import org.apache.lucene.store.FSDirectory;
> import java.util.*;
> import org.apache.lucene.store.LockObtainFailedException;
> import org.apache.lucene.store.AlreadyClosedException;
> import java.util.concurrent.locks.ReentrantLock;
> import java.util.concurrent.*;
> 
> public class LuceneIndex extends Thread {
>           protected ArrayBlockingQueuequeue;
>         protected static final Log logger = 
> LogFactory.getLog(LuceneIndex.class.getName());
>         protected static final Log indexLog = LogFactory.getLog("indexlog");
>            IndexWriter writer = null;
>            protected static ScheduledExecutorService scheduler;
>         protected static ScheduledFuture scheduledTask;
>         protected LuceneDocument EXIT_REQ = null;
>         ReentrantLock indexLock = new ReentrantLock();
>         ArchivaAnalyzer analyzer     = new ArchivaAnalyzer();
>         File indexLogFile;
>         PrintStream indexLogOut;
>         IndexProcessor indexProcessor;
>         String friendlyName;
>         String indexPath;
>         int maxSimultaneousDocs;
>                    public LuceneIndex(int queueSize, LuceneDocument exitReq,
>                                 String friendlyName, String indexPath, int  
> maxSimultaneousDocs) {
>                this.queue = new ArrayBlockingQueue(queueSize);
>                this.EXIT_REQ = exitReq;
>                this.friendlyName = friendlyName;
>                this.indexPath = indexPath;
>                this.maxSimultaneousDocs = maxSimultaneousDocs;
>                setLog(friendlyName);
>            }
>                              public int getMaxSimultaneousDocs() {
>              return maxSimultaneousDocs;
>          }
>                  public void setMaxSimultaneousDocs(int maxSimultaneousDocs) {
>              this.maxSimultaneousDocs = maxSimultaneousDocs;
>          }
>                              public ReentrantLock getIndexLock() {
>              return indexLock;
>          }
>              protected void setLog(String logName) {
> 
>                try {
>                    indexLogFile = getIndexLogFile(logName);
>                    if (indexLogFile!=null) {
>                        if (indexLogFile.length()>10485760)
>                            indexLogFile.delete();
>                        indexLogOut = new PrintStream(indexLogFile);
>                    }
>                    logger.debug("set index log file path 
> {path='"+indexLogFile.getCanonicalPath()+"'}");
>                } catch (Exception e) {
>                    logger.error("failed to open index log 
> file:"+e.getMessage(),e);
>                }
>          }
>                protected File getIndexLogFile(String logName) {
>               try {
>                    String logfilepath = 
> Config.getFileSystem().getLogPath()+File.separator+logName+"index.log";
>                    return new File(logfilepath);
>                } catch (Exception e) {
>                    logger.error("failed to open index log 
> file:"+e.getMessage(),e);
>                    return null;
>                }
>          }
>                               protected void openIndex() throws 
> MessageSearchException {
>            Exception lastError = null;
>                      if (writer==null) {
>                logger.debug("openIndex() index "+friendlyName+" will be opened. 
> it is currently closed.");
>            } else {
>                logger.debug("openIndex() did not bother opening index 
> "+friendlyName+". it is already open.");
>                return;
>            }
>            logger.debug("opening index "+friendlyName+" for write");
>            logger.debug("opening search index "+friendlyName+" for write 
> {indexpath='"+indexPath+"'}");
>            boolean writelock;
>            int attempt = 0;
>            int maxattempt = 10;
>                      if 
> (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
>                maxattempt = 10000;
>             } else {
>                maxattempt = 10;
>             }
>                      do {
>                writelock = false;
>                try {
>                        FSDirectory fsDirectory = 
> FSDirectory.getDirectory(indexPath);
>                        int maxIndexChars = 
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
>                        writer = new IndexWriter(fsDirectory,analyzer,new 
> IndexWriter.MaxFieldLength(maxIndexChars));
>                        if (indexLog.isDebugEnabled() && indexLogOut!=null) {
>                            writer.setInfoStream(indexLogOut);
>                        }
>                } catch (LockObtainFailedException lobfe) {
>                        logger.debug("write lock on index "+friendlyName+". will 
> reopen in 50ms.");
>                        try { Thread.sleep(50); } catch (Exception e) {}
>                        attempt++;
>                        writelock = true;
>                } catch (CorruptIndexException cie) {
>                    throw new MessageSearchException("index "+friendlyName+" 
> appears to be corrupt. please reindex the active 
> volume."+cie.getMessage(),logger);
>                } catch (Throwable io) {
>                    throw new MessageSearchException("failed to write document to 
> index "+friendlyName+":"+io.getMessage(),logger);
>                }
>           } while (writelock && attempt
>           if (attempt>=10000)
>             throw new MessageSearchException("failed to open index 
> "+friendlyName+" writer {indexPath='"+indexPath+"'}",lastError,logger);
>        }
>              public void indexDocument(LuceneDocument luceneDocument) throws 
> MessageSearchException {
>            logger.debug("index document {"+luceneDocument+"}");
>            long s = (new Date()).getTime();
>            if (luceneDocument == null)
>                throw new MessageSearchException("assertion failure: null 
> document",logger);
>            try {
>                queue.put(luceneDocument);
>            } catch (InterruptedException ie) {
>                throw new MessageSearchException("failed to add document to 
> queue:"+ie.getMessage(),ie,logger);
>            }
>            logger.debug("document indexed successfully {"+luceneDocument+"}");
>                      logger.debug("indexing message end {"+luceneDocument+"}");
>            long e = (new Date()).getTime();
>            logger.debug("indexing time {time='"+(e-s)+"'}");
>        }
>          public class IndexProcessor extends Thread {
>                      public IndexProcessor() {
>                setName("index processor");
>            }
>                public void run() {
>                boolean exit = false;
>                //ExecutorService documentPool;
>                // we abandoned pool as it does not seem to offer any major 
> performance benefit
>                LuceneDocument luceneDocument = null;                  
> LinkedListpushbacks = new LinkedList();
>                              while (!exit) {
>                                      try {
>                        //documentPool = 
> Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
>                        luceneDocument = null;                          
> luceneDocument = (LuceneDocument) queue.take();
>                                              indexLock.lock();
>                                              if (luceneDocument==EXIT_REQ) {
>                            logger.debug("index exit req received. exiting");
>                            exit = true;
>                            continue;
>                        }
>                                                        try {
>                             openIndex();
>                        } catch (Exception e) {
>                             logger.error("failed to open 
> index:"+e.getMessage(),e);
>                             return;
>                        }
>                        if (luceneDocument==null) {
>                            logger.debug("index info is null");
>                        }
>                        int i = 0;
>                        while(luceneDocument!=null && i
>                            try {
>                                Document doc = luceneDocument.getDocument();
>                                String language = doc.get("lang");
>                                if (language==null) {
>                                    language = 
> Config.getConfig().getIndex().getIndexLanguage();
>                                }
>                               
> writer.addDocument(doc,AnalyzerFactory.getAnalyzer(language,AnalyzerFactory.Operation.INDEX));
>                            } catch (IOException io) {
>                                logger.error("failed to add document to 
> index:"+io.getMessage(),io);
>                            } catch (AlreadyClosedException e) {
>                                pushbacks.add(luceneDocument);
>                                break;
>                            }
>                              //documentPool.execute(new 
> IndexDocument(luceneDocument,pushbacks));
>                                                       i++;
>                             if (i
>                                 luceneDocument = (LuceneDocument) queue.poll();
>                                                                 if 
> (luceneDocument==null) {
>                                        logger.debug("index info is null");
>                                 }
>                                                                 if 
> (luceneDocument==EXIT_REQ) {
>                                         logger.debug("index exit req received. 
> exiting (2)");
>                                        exit = true;
>                                        break;
>                                  }
>                             }
>                                                    }
>                        if (pushbacks.size()>0) {
>                              closeIndex();
>                              try {
>                                     openIndex();
>                              } catch (Exception e) {
>                                 logger.error("failed to open 
> index:"+e.getMessage(),e);
>                                 return;
>                              }
>                              for (LuceneDocument pushback : pushbacks) {
>                                    try {
>                                       
> writer.addDocument(pushback.getDocument());
>                                    } catch (IOException io) {
>                                        logger.error("failed to add document to 
> index:"+io.getMessage(),io);
>                                    } catch (AlreadyClosedException e) {
>                                        pushbacks.add(pushback);
>                                    }
>                                    //documentPool.execute(new 
> IndexDocument(pushback,pushbacks));
>                                    i++;
>                              }
>                        }
>                                              //documentPool.shutdown();
>                        //documentPool.awaitTermination(30,TimeUnit.MINUTES);
>                                           } catch (Throwable ie) {
>                         logger.error("index write 
> interrupted:"+ie.getMessage());
>                     } finally {
>                           closeIndex();
>                          indexLock.unlock();
>                    }
>                }                 }
>                      public class IndexDocument extends Thread {
>                                  LuceneDocument luceneDocument = null;
>                    Listpushbacks = null;
>                                      public IndexDocument(LuceneDocument 
> luceneDocument,Listpushbacks) {
>                        this.luceneDocument = luceneDocument;
>                        this.pushbacks = pushbacks;
>                        setName("index document");
>                    }
>                                  public void run() {
>                        try {
>                            writer.addDocument(luceneDocument.getDocument());
>                        } catch (IOException io) {
>                            logger.error("failed to add document to 
> index:"+io.getMessage(),io);
>                        } catch (AlreadyClosedException e) {
>                            pushbacks.add(luceneDocument);
>                        } catch (Throwable t) {
>                            logger.error("failed to add document to 
> index:"+t.getMessage(),t);
>                        }
>                    }};
>            }
>            protected void closeIndex() {
>             try {
>                 indexLock.lock();
>                 if (writer!=null) {
>                    writer.close();
>                 }
>             } catch (Throwable io) {
>                logger.error("failed to close index writer:"+io.getMessage(),io);
>             } finally {
>                 logger.debug("writer closed");
>                 writer = null;
>                 indexLock.unlock();
>             }
>        }
>            public void deleteIndex() throws MessageSearchException {
>                   logger.debug("delete index {indexpath='"+indexPath+"'}");
>                   try {
>                      indexLock.lock();
>                     try {
>                        int maxIndexChars = 
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
>                        writer = new 
> IndexWriter(FSDirectory.getDirectory(indexPath),analyzer,true,new 
> IndexWriter.MaxFieldLength(maxIndexChars));
>                     } catch (Throwable cie) {
>                         logger.error("failed to delete index 
> {index='"+indexPath+"'}",cie);
>                         return;
>                     }
>                     MessageIndex.volumeIndexes.remove(this);
>                  } finally {
>                        closeIndex();
>                      indexLock.unlock();
>                }
>          }
>                public void startup() {
>            logger.debug("volumeindex is starting up");
>            File lockFile = new File(indexPath+File.separatorChar + 
> "write.lock");
>            if (lockFile.exists()) {
>                logger.warn("The server lock file already exists. Either another 
> indexer is running or the server was not shutdown correctly.");
>                logger.warn("If it is the latter, the lock file must be manually 
> deleted at "+lockFile.getAbsolutePath());
>                if (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
>                    logger.debug("index lock file detected on volumeindex 
> startup.");
>                } else {
>                    logger.warn("index lock file detected. the server was 
> shutdown incorrectly. automatically deleting lock file.");
>                    logger.warn("indexer is configured to deal with only one 
> indexer process.");
>                    logger.warn("if you are running more than one indexer, your 
> index could be subject to corruption.");
>                    lockFile.delete();
>                }
>            }
>            indexProcessor = new IndexProcessor();
>            indexProcessor.start();
>            Runtime.getRuntime().addShutdownHook(this);
>                    }
>                  public void shutdown() {
>              logger.debug("volumeindex is shutting down");
>              queue.add(EXIT_REQ);
>              scheduler.shutdownNow();
>                     }
>                  @Override
>          public void run() {
>              queue.add(EXIT_REQ);
>          }
>                          public interface LuceneDocument {
>                          public String toString();
>              public Document getDocument();
>              public void finalize();
>                      }
>     }
> 
>   
> 
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
> For additional commands, e-mail: java-user-help@lucene.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
For additional commands, e-mail: java-user-help@lucene.apache.org


Mime
View raw message