Return-Path: Delivered-To: apmail-lucene-java-commits-archive@www.apache.org Received: (qmail 72524 invoked from network); 10 Sep 2007 14:34:26 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 10 Sep 2007 14:34:26 -0000 Received: (qmail 84265 invoked by uid 500); 10 Sep 2007 14:34:19 -0000 Delivered-To: apmail-lucene-java-commits-archive@lucene.apache.org Received: (qmail 84230 invoked by uid 500); 10 Sep 2007 14:34:19 -0000 Mailing-List: contact java-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: java-dev@lucene.apache.org Delivered-To: mailing list java-commits@lucene.apache.org Received: (qmail 84219 invoked by uid 99); 10 Sep 2007 14:34:19 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Sep 2007 07:34:19 -0700 X-ASF-Spam-Status: No, hits=-97.6 required=10.0 tests=ALL_TRUSTED,FR_ALMOST_VIAG2 X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Sep 2007 14:34:13 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 348BA1A9832; Mon, 10 Sep 2007 07:33:53 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r574260 - in /lucene/java/trunk: ./ src/java/org/apache/lucene/index/ src/test/org/apache/lucene/index/ Date: Mon, 10 Sep 2007 14:33:52 -0000 To: java-commits@lucene.apache.org From: mikemccand@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20070910143353.348BA1A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mikemccand Date: Mon Sep 10 07:33:51 2007 New Revision: 574260 URL: http://svn.apache.org/viewvc?rev=574260&view=rev Log: LUCENE-992: move buffered deletes into DocumentsWriter so IndexWriter.updateDocument is atomic Added: lucene/java/trunk/src/test/org/apache/lucene/index/TestAtomicUpdate.java (with props) Modified: lucene/java/trunk/CHANGES.txt lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriterDelete.java Modified: lucene/java/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/java/trunk/CHANGES.txt?rev=574260&r1=574259&r2=574260&view=diff ============================================================================== --- lucene/java/trunk/CHANGES.txt (original) +++ lucene/java/trunk/CHANGES.txt Mon Sep 10 07:33:51 2007 @@ -83,6 +83,9 @@ 13. LUCENE-991: The explain() method of BoostingTermQuery had errors when no payloads were present on a document. (Peter Keegan via Grant Ingersoll) +14. LUCENE-992: Fixed IndexWriter.updateDocument to be atomic again + (this was broken by LUCENE-843). (Ning Li via Mike McCandless) + New features 1. LUCENE-906: Elision filter for French. Modified: lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=574260&r1=574259&r2=574260&view=diff ============================================================================== --- lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java (original) +++ lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java Mon Sep 10 07:33:51 2007 @@ -129,6 +129,16 @@ private PrintStream infoStream; + // This Hashmap buffers delete terms in ram before they + // are applied. The key is delete term; the value is + // number of buffered documents the term applies to. + private HashMap bufferedDeleteTerms = new HashMap(); + private int numBufferedDeleteTerms = 0; + + // The max number of delete terms that can be buffered before + // they must be flushed to disk. + private int maxBufferedDeleteTerms = IndexWriter.DEFAULT_MAX_BUFFERED_DELETE_TERMS; + // How much RAM we can use before flushing. This is 0 if // we are flushing by doc count instead. private long ramBufferSize = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024); @@ -265,8 +275,8 @@ /** Called if we hit an exception when adding docs, * flushing, etc. This resets our state, discarding any - * * docs added since last flush. */ - void abort() throws IOException { + * docs added since last flush. */ + synchronized void abort() throws IOException { // Forcefully remove waiting ThreadStates from line for(int i=0;i= maxBufferedDeleteTerms && setFlushPending(); + } + + void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) { + if (maxBufferedDeleteTerms < 1) + throw new IllegalArgumentException("maxBufferedDeleteTerms must at least be 1"); + this.maxBufferedDeleteTerms = maxBufferedDeleteTerms; + } + + int getMaxBufferedDeleteTerms() { + return maxBufferedDeleteTerms; + } + + synchronized boolean hasDeletes() { + return bufferedDeleteTerms.size() > 0; + } + + // Number of documents a delete term applies to. + static class Num { + private int num; + + Num(int num) { + this.num = num; + } + + int getNum() { + return num; + } + + void setNum(int num) { + // Only record the new number if it's greater than the + // current one. This is important because if multiple + // threads are replacing the same doc at nearly the + // same time, it's possible that one thread that got a + // higher docID is scheduled before the other + // threads. + if (num > this.num) + this.num = num; + } + } + + // Buffer a term in bufferedDeleteTerms, which records the + // current number of documents buffered in ram so that the + // delete term will be applied to those documents as well + // as the disk segments. + synchronized private void addDeleteTerm(Term term, int docCount) { + Num num = (Num) bufferedDeleteTerms.get(term); + if (num == null) { + bufferedDeleteTerms.put(term, new Num(docCount)); + } else { + num.setNum(docCount); + } + numBufferedDeleteTerms++; } /** Does the synchronized work to finish/flush the Modified: lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java?rev=574260&r1=574259&r2=574260&view=diff ============================================================================== --- lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java (original) +++ lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java Mon Sep 10 07:33:51 2007 @@ -247,16 +247,6 @@ private int termIndexInterval = DEFAULT_TERM_INDEX_INTERVAL; - // The max number of delete terms that can be buffered before - // they must be flushed to disk. - private int maxBufferedDeleteTerms = DEFAULT_MAX_BUFFERED_DELETE_TERMS; - - // This Hashmap buffers delete terms in ram before they are applied. - // The key is delete term; the value is number of ram - // segments the term applies to. - private HashMap bufferedDeleteTerms = new HashMap(); - private int numBufferedDeleteTerms = 0; - /** Use compound file setting. Defaults to true, minimizing the number of * files used. Setting this to false may improve indexing performance, but * may also cause file handle problems. @@ -773,9 +763,7 @@ */ public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) { ensureOpen(); - if (maxBufferedDeleteTerms < 1) - throw new IllegalArgumentException("maxBufferedDeleteTerms must at least be 1"); - this.maxBufferedDeleteTerms = maxBufferedDeleteTerms; + docWriter.setMaxBufferedDeleteTerms(maxBufferedDeleteTerms); } /** @@ -785,7 +773,7 @@ */ public int getMaxBufferedDeleteTerms() { ensureOpen(); - return maxBufferedDeleteTerms; + return docWriter.getMaxBufferedDeleteTerms(); } /** Determines how often segment indices are merged by addDocument(). With @@ -1134,10 +1122,11 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public synchronized void deleteDocuments(Term term) throws CorruptIndexException, IOException { + public void deleteDocuments(Term term) throws CorruptIndexException, IOException { ensureOpen(); - bufferDeleteTerm(term); - maybeFlush(); + boolean doFlush = docWriter.bufferDeleteTerm(term); + if (doFlush) + flush(true, false); } /** @@ -1148,12 +1137,11 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public synchronized void deleteDocuments(Term[] terms) throws CorruptIndexException, IOException { + public void deleteDocuments(Term[] terms) throws CorruptIndexException, IOException { ensureOpen(); - for (int i = 0; i < terms.length; i++) { - bufferDeleteTerm(terms[i]); - } - maybeFlush(); + boolean doFlush = docWriter.bufferDeleteTerms(terms); + if (doFlush) + flush(true, false); } /** @@ -1189,20 +1177,15 @@ public void updateDocument(Term term, Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { ensureOpen(); - synchronized (this) { - bufferDeleteTerm(term); - } - boolean success = false; + boolean doFlush = false; try { - success = docWriter.addDocument(doc, analyzer); + doFlush = docWriter.updateDocument(term, doc, analyzer); } catch (IOException ioe) { deleter.refresh(); throw ioe; } - if (success) + if (doFlush) flush(true, false); - else - maybeFlush(); } // for test purpose @@ -1357,7 +1340,7 @@ */ private void startTransaction() throws IOException { - assert numBufferedDeleteTerms == 0 : + assert docWriter.getNumBufferedDeleteTerms() == 0 : "calling startTransaction with buffered delete terms not supported"; assert docWriter.getNumDocsInRAM() == 0 : "calling startTransaction with buffered documents not supported"; @@ -1462,9 +1445,6 @@ deleter.checkpoint(segmentInfos, false); deleter.refresh(); - bufferedDeleteTerms.clear(); - numBufferedDeleteTerms = 0; - commitPending = false; docWriter.abort(); close(); @@ -1846,20 +1826,6 @@ } /** - * Used internally to trigger a flush if the number of - * buffered added documents or buffered deleted terms are - * large enough. - */ - protected final synchronized void maybeFlush() throws CorruptIndexException, IOException { - // We only check for flush due to number of buffered - // delete terms, because triggering of a flush due to - // too many added documents is handled by - // DocumentsWriter - if (numBufferedDeleteTerms >= maxBufferedDeleteTerms && docWriter.setFlushPending()) - flush(true, false); - } - - /** * Flush all in-memory buffered updates (adds and deletes) * to the Directory. *

Note: if autoCommit=false, flushed data would still @@ -1908,7 +1874,7 @@ // when they are full or writer is being closed. We // have to fix the "applyDeletesSelectively" logic to // apply to more than just the last flushed segment - boolean flushDeletes = bufferedDeleteTerms.size() > 0; + boolean flushDeletes = docWriter.hasDeletes(); if (infoStream != null) infoStream.println(" flush: flushDocs=" + flushDocs + @@ -1938,9 +1904,6 @@ SegmentInfos rollback = null; - HashMap saveBufferedDeleteTerms = null; - int saveNumBufferedDeleteTerms = 0; - if (flushDeletes) rollback = (SegmentInfos) segmentInfos.clone(); @@ -1975,9 +1938,9 @@ // buffer deletes longer and then flush them to // multiple flushed segments, when // autoCommit=false - saveBufferedDeleteTerms = bufferedDeleteTerms; - saveNumBufferedDeleteTerms = numBufferedDeleteTerms; - applyDeletes(flushDocs); + int delCount = applyDeletes(flushDocs); + if (infoStream != null) + infoStream.println("flushed " + delCount + " deleted documents"); doAfterFlush(); } @@ -1991,11 +1954,6 @@ // SegmentInfo instances: segmentInfos.clear(); segmentInfos.addAll(rollback); - - if (saveBufferedDeleteTerms != null) { - numBufferedDeleteTerms = saveNumBufferedDeleteTerms; - bufferedDeleteTerms = saveBufferedDeleteTerms; - } } else { // Remove segment we added, if any: @@ -2319,11 +2277,14 @@ // flushedNewSegment is true then a new segment was just // created and flushed from the ram segments, so we will // selectively apply the deletes to that new segment. - private final void applyDeletes(boolean flushedNewSegment) throws CorruptIndexException, IOException { + private final int applyDeletes(boolean flushedNewSegment) throws CorruptIndexException, IOException { + + final HashMap bufferedDeleteTerms = docWriter.getBufferedDeleteTerms(); + int delCount = 0; if (bufferedDeleteTerms.size() > 0) { if (infoStream != null) - infoStream.println("flush " + numBufferedDeleteTerms + " buffered deleted terms on " + infoStream.println("flush " + docWriter.getNumBufferedDeleteTerms() + " buffered deleted terms on " + segmentInfos.size() + " segments."); if (flushedNewSegment) { @@ -2337,7 +2298,7 @@ // Apply delete terms to the segment just flushed from ram // apply appropriately so that a delete term is only applied to // the documents buffered before it, not those buffered after it. - applyDeletesSelectively(bufferedDeleteTerms, reader); + delCount += applyDeletesSelectively(bufferedDeleteTerms, reader); } finally { if (reader != null) { try { @@ -2361,7 +2322,7 @@ // Apply delete terms to disk segments // except the one just flushed from ram. - applyDeletes(bufferedDeleteTerms, reader); + delCount += applyDeletes(bufferedDeleteTerms, reader); } finally { if (reader != null) { try { @@ -2374,15 +2335,10 @@ } // Clean up bufferedDeleteTerms. - - // Rollbacks of buffered deletes are based on restoring the old - // map, so don't modify this one. Rare enough that the gc - // overhead is almost certainly lower than the alternate, which - // would be clone to support rollback. - - bufferedDeleteTerms = new HashMap(); - numBufferedDeleteTerms = 0; + docWriter.clearBufferedDeleteTerms(); } + + return delCount; } private final boolean checkNonDecreasingLevels(int start) { @@ -2410,59 +2366,28 @@ // For test purposes. final synchronized int getBufferedDeleteTermsSize() { - return bufferedDeleteTerms.size(); + return docWriter.getBufferedDeleteTerms().size(); } // For test purposes. final synchronized int getNumBufferedDeleteTerms() { - return numBufferedDeleteTerms; - } - - // Number of ram segments a delete term applies to. - private static class Num { - private int num; - - Num(int num) { - this.num = num; - } - - int getNum() { - return num; - } - - void setNum(int num) { - this.num = num; - } - } - - // Buffer a term in bufferedDeleteTerms, which records the - // current number of documents buffered in ram so that the - // delete term will be applied to those ram segments as - // well as the disk segments. - private void bufferDeleteTerm(Term term) { - Num num = (Num) bufferedDeleteTerms.get(term); - int numDoc = docWriter.getNumDocsInRAM(); - if (num == null) { - bufferedDeleteTerms.put(term, new Num(numDoc)); - } else { - num.setNum(numDoc); - } - numBufferedDeleteTerms++; + return docWriter.getNumBufferedDeleteTerms(); } // Apply buffered delete terms to the segment just flushed from ram // apply appropriately so that a delete term is only applied to // the documents buffered before it, not those buffered after it. - private final void applyDeletesSelectively(HashMap deleteTerms, + private final int applyDeletesSelectively(HashMap deleteTerms, IndexReader reader) throws CorruptIndexException, IOException { Iterator iter = deleteTerms.entrySet().iterator(); + int delCount = 0; while (iter.hasNext()) { Entry entry = (Entry) iter.next(); Term term = (Term) entry.getKey(); TermDocs docs = reader.termDocs(term); if (docs != null) { - int num = ((Num) entry.getValue()).getNum(); + int num = ((DocumentsWriter.Num) entry.getValue()).getNum(); try { while (docs.next()) { int doc = docs.doc(); @@ -2470,21 +2395,37 @@ break; } reader.deleteDocument(doc); + delCount++; } } finally { docs.close(); } } } + return delCount; } // Apply buffered delete terms to this reader. - private final void applyDeletes(HashMap deleteTerms, IndexReader reader) + private final int applyDeletes(HashMap deleteTerms, IndexReader reader) throws CorruptIndexException, IOException { Iterator iter = deleteTerms.entrySet().iterator(); + int delCount = 0; while (iter.hasNext()) { Entry entry = (Entry) iter.next(); - reader.deleteDocuments((Term) entry.getKey()); + delCount += reader.deleteDocuments((Term) entry.getKey()); } + return delCount; + } + + public synchronized String segString() { + StringBuffer buffer = new StringBuffer(); + for(int i = 0; i < segmentInfos.size(); i++) { + if (i > 0) { + buffer.append(' '); + } + buffer.append(segmentInfos.info(i).name + ":" + segmentInfos.info(i).docCount); + } + + return buffer.toString(); } } Added: lucene/java/trunk/src/test/org/apache/lucene/index/TestAtomicUpdate.java URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestAtomicUpdate.java?rev=574260&view=auto ============================================================================== --- lucene/java/trunk/src/test/org/apache/lucene/index/TestAtomicUpdate.java (added) +++ lucene/java/trunk/src/test/org/apache/lucene/index/TestAtomicUpdate.java Mon Sep 10 07:33:51 2007 @@ -0,0 +1,184 @@ +package org.apache.lucene.index; + +/** + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.util.*; +import org.apache.lucene.store.*; +import org.apache.lucene.document.*; +import org.apache.lucene.analysis.*; +import org.apache.lucene.index.*; +import org.apache.lucene.search.*; +import org.apache.lucene.queryParser.*; +import org.apache.lucene.util._TestUtil; + +import junit.framework.TestCase; + +import java.util.Random; +import java.io.File; + +public class TestAtomicUpdate extends TestCase { + private static final Analyzer ANALYZER = new SimpleAnalyzer(); + private static final Random RANDOM = new Random(); + + private static abstract class TimedThread extends Thread { + boolean failed; + int count; + private static int RUN_TIME_SEC = 3; + private TimedThread[] allThreads; + + abstract public void doWork() throws Throwable; + + TimedThread(TimedThread[] threads) { + this.allThreads = threads; + } + + public void run() { + final long stopTime = System.currentTimeMillis() + 1000*RUN_TIME_SEC; + + count = 0; + + try { + while(System.currentTimeMillis() < stopTime && !anyErrors()) { + doWork(); + count++; + } + } catch (Throwable e) { + e.printStackTrace(System.out); + failed = true; + } + } + + private boolean anyErrors() { + for(int i=0;i