You might try re-implementing, using ThreadPoolExecutor
http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.html
glen
2009/11/10 Jamie Band <jamie@stimulussoft.com>:
> Hi There
>
> Our app spends alot of time waiting for Lucene to finish writing to the
> index. I'd like to minimize this. If you have a moment to spare, please let
> me know if my LuceneIndex class presented below can be improved upon.
>
> It is used in the following way:
>
> luceneIndex = new
> LuceneIndex(Config.getConfig().getIndex().getIndexBacklog(),
> exitReq,volume.getID()+"
> indexer",volume.getIndexPath(),
>
> Config.getConfig().getIndex().getMaxSimultaneousDocs());
> Document doc = new Document();
> IndexInfo indexInfo = new IndexInfo(doc);
> luceneIndex.indexDocument(indexInfo);
>
> As an aside note, is there any way for Lucene to support simultaneous writes
> to an index? For example, each write threads could write to a separate
> shard, after a period the shared could be merged into a single index? Or is
> this overkill? I am interested hear the opinion of the Lucene experts.
>
> Thanks in advance
>
> Jamie
>
> package com.stimulus.archiva.index;
>
> import java.io.File;
> import java.io.IOException;
> import java.io.PrintStream;
> import org.apache.commons.logging.*;
> import org.apache.lucene.document.Document;
> import org.apache.lucene.index.*;
> import org.apache.lucene.store.FSDirectory;
> import java.util.*;
> import org.apache.lucene.store.LockObtainFailedException;
> import org.apache.lucene.store.AlreadyClosedException;
> import java.util.concurrent.locks.ReentrantLock;
> import java.util.concurrent.*;
>
> public class LuceneIndex extends Thread {
> protected ArrayBlockingQueue<LuceneDocument> queue;
> protected static final Log logger =
> LogFactory.getLog(LuceneIndex.class.getName());
> protected static final Log indexLog = LogFactory.getLog("indexlog");
> IndexWriter writer = null;
> protected static ScheduledExecutorService scheduler;
> protected static ScheduledFuture<?> scheduledTask;
> protected LuceneDocument EXIT_REQ = null;
> ReentrantLock indexLock = new ReentrantLock();
> ArchivaAnalyzer analyzer = new ArchivaAnalyzer();
> File indexLogFile;
> PrintStream indexLogOut;
> IndexProcessor indexProcessor;
> String friendlyName;
> String indexPath;
> int maxSimultaneousDocs;
> public LuceneIndex(int queueSize, LuceneDocument exitReq,
> String friendlyName, String indexPath,
int
> maxSimultaneousDocs) {
> this.queue = new
> ArrayBlockingQueue<LuceneDocument>(queueSize);
> this.EXIT_REQ = exitReq;
> this.friendlyName = friendlyName;
> this.indexPath = indexPath;
> this.maxSimultaneousDocs = maxSimultaneousDocs;
> setLog(friendlyName);
> }
> public int getMaxSimultaneousDocs() {
> return maxSimultaneousDocs;
> }
> public void setMaxSimultaneousDocs(int maxSimultaneousDocs)
> {
> this.maxSimultaneousDocs = maxSimultaneousDocs;
> }
> public ReentrantLock getIndexLock() {
> return indexLock;
> }
> protected void setLog(String logName) {
>
> try {
> indexLogFile = getIndexLogFile(logName);
> if (indexLogFile!=null) {
> if (indexLogFile.length()>10485760)
> indexLogFile.delete();
> indexLogOut = new PrintStream(indexLogFile);
> }
> logger.debug("set index log file path
> {path='"+indexLogFile.getCanonicalPath()+"'}");
> } catch (Exception e) {
> logger.error("failed to open index log
> file:"+e.getMessage(),e);
> }
> }
> protected File getIndexLogFile(String logName) {
> try {
> String logfilepath =
> Config.getFileSystem().getLogPath()+File.separator+logName+"index.log";
> return new File(logfilepath);
> } catch (Exception e) {
> logger.error("failed to open index log
> file:"+e.getMessage(),e);
> return null;
> }
> }
> protected void openIndex() throws
> MessageSearchException {
> Exception lastError = null;
> if (writer==null) {
> logger.debug("openIndex() index "+friendlyName+" will be
> opened. it is currently closed.");
> } else {
> logger.debug("openIndex() did not bother opening index
> "+friendlyName+". it is already open.");
> return;
> }
> logger.debug("opening index "+friendlyName+" for write");
> logger.debug("opening search index "+friendlyName+" for write
> {indexpath='"+indexPath+"'}");
> boolean writelock;
> int attempt = 0;
> int maxattempt = 10;
> if
> (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
> maxattempt = 10000;
> } else {
> maxattempt = 10;
> }
> do {
> writelock = false;
> try {
> FSDirectory fsDirectory =
> FSDirectory.getDirectory(indexPath);
> int maxIndexChars =
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
> writer = new IndexWriter(fsDirectory,analyzer,new
> IndexWriter.MaxFieldLength(maxIndexChars));
> if (indexLog.isDebugEnabled() && indexLogOut!=null)
{
> writer.setInfoStream(indexLogOut);
> }
> } catch (LockObtainFailedException lobfe) {
> logger.debug("write lock on index "+friendlyName+".
> will reopen in 50ms.");
> try { Thread.sleep(50); } catch (Exception e) {}
> attempt++;
> writelock = true;
> } catch (CorruptIndexException cie) {
> throw new MessageSearchException("index "+friendlyName+"
> appears to be corrupt. please reindex the active
> volume."+cie.getMessage(),logger);
> } catch (Throwable io) {
> throw new MessageSearchException("failed to write document
> to index "+friendlyName+":"+io.getMessage(),logger);
> }
> } while (writelock && attempt<maxattempt);
> if (attempt>=10000)
> throw new MessageSearchException("failed to open index
> "+friendlyName+" writer {indexPath='"+indexPath+"'}",lastError,logger);
> }
> public void indexDocument(LuceneDocument luceneDocument) throws
> MessageSearchException {
> logger.debug("index document {"+luceneDocument+"}");
> long s = (new Date()).getTime();
> if (luceneDocument == null)
> throw new MessageSearchException("assertion failure: null
> document",logger);
> try {
> queue.put(luceneDocument);
> } catch (InterruptedException ie) {
> throw new MessageSearchException("failed to add document to
> queue:"+ie.getMessage(),ie,logger);
> }
> logger.debug("document indexed successfully
> {"+luceneDocument+"}");
> logger.debug("indexing message end
> {"+luceneDocument+"}");
> long e = (new Date()).getTime();
> logger.debug("indexing time {time='"+(e-s)+"'}");
> }
> public class IndexProcessor extends Thread {
> public IndexProcessor() {
> setName("index processor");
> }
> public void run() {
> boolean exit = false;
> //ExecutorService documentPool;
> // we abandoned pool as it does not seem to offer any major
> performance benefit
> LuceneDocument luceneDocument = null;
> LinkedList<LuceneDocument> pushbacks = new LinkedList<LuceneDocument>();
> while (!exit) {
> try {
> //documentPool =
> Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
> luceneDocument = null;
> luceneDocument = (LuceneDocument) queue.take();
> indexLock.lock();
> if (luceneDocument==EXIT_REQ)
{
> logger.debug("index exit req received. exiting");
> exit = true;
> continue;
> }
> try
{
> openIndex();
> } catch (Exception e) {
> logger.error("failed to open
> index:"+e.getMessage(),e);
> return;
> }
> if (luceneDocument==null) {
> logger.debug("index info is null");
> }
> int i = 0;
> while(luceneDocument!=null && i<maxSimultaneousDocs)
{
> try {
> Document doc = luceneDocument.getDocument();
> String language = doc.get("lang");
> if (language==null) {
> language =
> Config.getConfig().getIndex().getIndexLanguage();
> }
>
> writer.addDocument(doc,AnalyzerFactory.getAnalyzer(language,AnalyzerFactory.Operation.INDEX));
> } catch (IOException io) {
> logger.error("failed to add document to
> index:"+io.getMessage(),io);
> } catch (AlreadyClosedException e) {
> pushbacks.add(luceneDocument);
> break;
> }
> //documentPool.execute(new
> IndexDocument(luceneDocument,pushbacks));
> i++;
> if (i<maxSimultaneousDocs) {
> luceneDocument = (LuceneDocument)
> queue.poll();
>
if
> (luceneDocument==null) {
> logger.debug("index info is
null");
> }
>
if
> (luceneDocument==EXIT_REQ) {
> logger.debug("index exit req
> received. exiting (2)");
> exit = true;
> break;
> }
> }
> }
> if (pushbacks.size()>0) {
> closeIndex();
> try {
> openIndex();
> } catch (Exception e) {
> logger.error("failed to open
> index:"+e.getMessage(),e);
> return;
> }
> for (LuceneDocument pushback : pushbacks) {
> try {
>
> writer.addDocument(pushback.getDocument());
> } catch (IOException io) {
> logger.error("failed to add
document
> to index:"+io.getMessage(),io);
> } catch (AlreadyClosedException e)
{
> pushbacks.add(pushback);
> }
> //documentPool.execute(new
> IndexDocument(pushback,pushbacks));
> i++;
> }
> }
> //documentPool.shutdown();
> //documentPool.awaitTermination(30,TimeUnit.MINUTES);
> } catch (Throwable ie)
{
> logger.error("index write
> interrupted:"+ie.getMessage());
> } finally {
> closeIndex();
> indexLock.unlock();
> }
> } }
> public class IndexDocument extends Thread {
> LuceneDocument luceneDocument = null;
> List<LuceneDocument> pushbacks = null;
> public IndexDocument(LuceneDocument
> luceneDocument,List<LuceneDocument> pushbacks) {
> this.luceneDocument = luceneDocument;
> this.pushbacks = pushbacks;
> setName("index document");
> }
> public void run() {
> try {
> writer.addDocument(luceneDocument.getDocument());
> } catch (IOException io) {
> logger.error("failed to add document to
> index:"+io.getMessage(),io);
> } catch (AlreadyClosedException e) {
> pushbacks.add(luceneDocument);
> } catch (Throwable t) {
> logger.error("failed to add document to
> index:"+t.getMessage(),t);
> }
> }};
> }
> protected void closeIndex() {
> try {
> indexLock.lock();
> if (writer!=null) {
> writer.close();
> }
> } catch (Throwable io) {
> logger.error("failed to close index
> writer:"+io.getMessage(),io);
> } finally {
> logger.debug("writer closed");
> writer = null;
> indexLock.unlock();
> }
> }
> public void deleteIndex() throws MessageSearchException {
> logger.debug("delete index {indexpath='"+indexPath+"'}");
> try {
> indexLock.lock();
> try {
> int maxIndexChars =
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
> writer = new
> IndexWriter(FSDirectory.getDirectory(indexPath),analyzer,true,new
> IndexWriter.MaxFieldLength(maxIndexChars));
> } catch (Throwable cie) {
> logger.error("failed to delete index
> {index='"+indexPath+"'}",cie);
> return;
> }
> MessageIndex.volumeIndexes.remove(this);
> } finally {
> closeIndex();
> indexLock.unlock();
> }
> }
> public void startup() {
> logger.debug("volumeindex is starting up");
> File lockFile = new File(indexPath+File.separatorChar +
> "write.lock");
> if (lockFile.exists()) {
> logger.warn("The server lock file already exists. Either
> another indexer is running or the server was not shutdown correctly.");
> logger.warn("If it is the latter, the lock file must be
> manually deleted at "+lockFile.getAbsolutePath());
> if (Config.getConfig().getIndex().getMultipleIndexProcesses())
> {
> logger.debug("index lock file detected on volumeindex
> startup.");
> } else {
> logger.warn("index lock file detected. the server was
> shutdown incorrectly. automatically deleting lock file.");
> logger.warn("indexer is configured to deal with only one
> indexer process.");
> logger.warn("if you are running more than one indexer,
> your index could be subject to corruption.");
> lockFile.delete();
> }
> }
> indexProcessor = new IndexProcessor();
> indexProcessor.start();
> Runtime.getRuntime().addShutdownHook(this);
> }
> public void shutdown() {
> logger.debug("volumeindex is shutting down");
> queue.add(EXIT_REQ);
> scheduler.shutdownNow();
> }
> @Override
> public void run() {
> queue.add(EXIT_REQ);
> }
> public interface LuceneDocument {
> public String toString();
> public Document getDocument();
> public void finalize();
> }
> }
>
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
> For additional commands, e-mail: java-user-help@lucene.apache.org
>
>
--
-
|