Return-Path: X-Original-To: apmail-lucene-commits-archive@www.apache.org Delivered-To: apmail-lucene-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 54A8E10677 for ; Wed, 16 Oct 2013 07:43:52 +0000 (UTC) Received: (qmail 61697 invoked by uid 500); 16 Oct 2013 07:43:45 -0000 Mailing-List: contact commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@lucene.apache.org Delivered-To: mailing list commits@lucene.apache.org Received: (qmail 61630 invoked by uid 99); 16 Oct 2013 07:43:43 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Oct 2013 07:43:43 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Oct 2013 07:43:39 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 3C791238889B; Wed, 16 Oct 2013 07:43:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1532670 [1/2] - in /lucene/dev/trunk/lucene/core/src: java/org/apache/lucene/index/ test/org/apache/lucene/index/ Date: Wed, 16 Oct 2013 07:43:18 -0000 To: commits@lucene.apache.org From: shaie@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131016074319.3C791238889B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: shaie Date: Wed Oct 16 07:43:18 2013 New Revision: 1532670 URL: http://svn.apache.org/r1532670 Log: LUCENE-5248: improve the data structure used to hold field updates Added: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/NumericFieldUpdates.java (with props) Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestNumericDocValuesUpdates.java Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java?rev=1532670&r1=1532669&r2=1532670&view=diff ============================================================================== --- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java (original) +++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java Wed Oct 16 07:43:18 2013 @@ -22,7 +22,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -125,14 +127,10 @@ class BufferedDeletesStream { // TODO (D // If non-null, contains segments that are 100% deleted public final List allDeleted; - // True if any actual numeric docvalues updates took place - public final boolean anyNumericDVUpdates; - - ApplyDeletesResult(boolean anyDeletes, long gen, List allDeleted, boolean anyNumericDVUpdates) { + ApplyDeletesResult(boolean anyDeletes, long gen, List allDeleted) { this.anyDeletes = anyDeletes; this.gen = gen; this.allDeleted = allDeleted; - this.anyNumericDVUpdates = anyNumericDVUpdates; } } @@ -151,7 +149,7 @@ class BufferedDeletesStream { // TODO (D final long t0 = System.currentTimeMillis(); if (infos.size() == 0) { - return new ApplyDeletesResult(false, nextGen++, null, false); + return new ApplyDeletesResult(false, nextGen++, null); } assert checkDeleteStats(); @@ -160,7 +158,7 @@ class BufferedDeletesStream { // TODO (D if (infoStream.isEnabled("BD")) { infoStream.message("BD", "applyDeletes: no deletes; skipping"); } - return new ApplyDeletesResult(false, nextGen++, null, false); + return new ApplyDeletesResult(false, nextGen++, null); } if (infoStream.isEnabled("BD")) { @@ -175,7 +173,6 @@ class BufferedDeletesStream { // TODO (D CoalescedDeletes coalescedDeletes = null; boolean anyNewDeletes = false; - boolean anyNewUpdates = false; int infosIDX = infos2.size()-1; int delIDX = deletes.size()-1; @@ -213,21 +210,25 @@ class BufferedDeletesStream { // TODO (D // Lock order: IW -> BD -> RP assert readerPool.infoIsLive(info); final ReadersAndLiveDocs rld = readerPool.get(info, true); - final SegmentReader reader = rld.getReader(false, IOContext.READ); // don't apply deletes, as we're about to add more! + final SegmentReader reader = rld.getReader(IOContext.READ); int delCount = 0; final boolean segAllDeletes; try { + Map fieldUpdates = null; if (coalescedDeletes != null) { //System.out.println(" del coalesced"); delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader); delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader); - anyNewUpdates |= applyNumericDocValueUpdates(coalescedDeletes.numericDVUpdates, rld, reader); + fieldUpdates = applyNumericDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, fieldUpdates); } //System.out.println(" del exact"); // Don't delete by Term here; DocumentsWriterPerThread // already did that on flush: delCount += applyQueryDeletes(packet.queriesIterable(), rld, reader); - anyNewUpdates |= applyNumericDocValueUpdates(Arrays.asList(packet.updates), rld, reader); + fieldUpdates = applyNumericDocValuesUpdates(Arrays.asList(packet.updates), rld, reader, fieldUpdates); + if (!fieldUpdates.isEmpty()) { + rld.writeFieldUpdates(info.info.dir, fieldUpdates); + } final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount(); assert fullDelCount <= rld.info.info.getDocCount(); segAllDeletes = fullDelCount == rld.info.info.getDocCount(); @@ -268,17 +269,20 @@ class BufferedDeletesStream { // TODO (D // Lock order: IW -> BD -> RP assert readerPool.infoIsLive(info); final ReadersAndLiveDocs rld = readerPool.get(info, true); - final SegmentReader reader = rld.getReader(false, IOContext.READ); // don't apply deletes, as we're about to add more! + final SegmentReader reader = rld.getReader(IOContext.READ); int delCount = 0; final boolean segAllDeletes; try { delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader); delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader); - anyNewUpdates |= applyNumericDocValueUpdates(coalescedDeletes.numericDVUpdates, rld, reader); + Map fieldUpdates = applyNumericDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, null); + if (!fieldUpdates.isEmpty()) { + rld.writeFieldUpdates(info.info.dir, fieldUpdates); + } final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount(); assert fullDelCount <= rld.info.info.getDocCount(); segAllDeletes = fullDelCount == rld.info.info.getDocCount(); - } finally { + } finally { rld.release(reader); readerPool.release(rld); } @@ -292,7 +296,7 @@ class BufferedDeletesStream { // TODO (D } if (infoStream.isEnabled("BD")) { - infoStream.message("BD", "seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] newDelCount=" + delCount + (segAllDeletes ? " 100% deleted" : "")); + infoStream.message("BD", "seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + coalescedDeletes + "] newDelCount=" + delCount + (segAllDeletes ? " 100% deleted" : "")); } } info.setBufferedDeletesGen(gen); @@ -307,7 +311,7 @@ class BufferedDeletesStream { // TODO (D } // assert infos != segmentInfos || !any() : "infos=" + infos + " segmentInfos=" + segmentInfos + " any=" + any; - return new ApplyDeletesResult(anyNewDeletes, gen, allDeleted, anyNewUpdates); + return new ApplyDeletesResult(anyNewDeletes, gen, allDeleted); } synchronized long getNextGen() { @@ -432,43 +436,61 @@ class BufferedDeletesStream { // TODO (D return delCount; } - // NumericDocValue Updates - private synchronized boolean applyNumericDocValueUpdates(Iterable updates, ReadersAndLiveDocs rld, SegmentReader reader) throws IOException { + // NumericDocValues Updates + // If otherFieldUpdates != null, we need to merge the updates into them + private synchronized Map applyNumericDocValuesUpdates(Iterable updates, + ReadersAndLiveDocs rld, SegmentReader reader, Map otherFieldUpdates) throws IOException { Fields fields = reader.fields(); if (fields == null) { // This reader has no postings - return false; + return Collections.emptyMap(); } + // TODO: we can process the updates per DV field, from last to first so that + // if multiple terms affect same document for the same field, we add an update + // only once (that of the last term). To do that, we can keep a bitset which + // marks which documents have already been updated. So e.g. if term T1 + // updates doc 7, and then we process term T2 and it updates doc 7 as well, + // we don't apply the update since we know T1 came last and therefore wins + // the update. + // We can also use that bitset as 'liveDocs' to pass to TermEnum.docs(), so + // that these documents aren't even returned. + + String currentField = null; TermsEnum termsEnum = null; DocsEnum docs = null; - boolean any = false; + final Map result = otherFieldUpdates == null ? new HashMap() : otherFieldUpdates; //System.out.println(Thread.currentThread().getName() + " numericDVUpdate reader=" + reader); for (NumericUpdate update : updates) { Term term = update.term; int limit = update.docIDUpto; - // TODO: we rely on the map being ordered by updates order, not by terms order. - // we need that so that if two terms update the same document, the one that came - // last wins. - // alternatively, we could keep a map from doc->lastUpto and apply the update - // in terms order, where an update is applied only if its docIDUpto is greater - // than lastUpto. - // but, since app can send two updates, in order, which will have same upto, we - // cannot rely solely on docIDUpto, and need to have our own gen, which is - // incremented with every update. + // TODO: we traverse the terms in update order (not term order) so that we + // apply the updates in the correct order, i.e. if two terms udpate the + // same document, the last one that came in wins, irrespective of the + // terms lexical order. + // we can apply the updates in terms order if we keep an updatesGen (and + // increment it with every update) and attach it to each NumericUpdate. Note + // that we cannot rely only on docIDUpto because an app may send two updates + // which will get same docIDUpto, yet will still need to respect the order + // those updates arrived. - // Unlike applyTermDeletes, we visit terms in update order, not term order. - // Therefore we cannot assume we can only seek forwards and must ask for a - // new TermsEnum - Terms terms = fields.terms(term.field); - if (terms == null) { // no terms in that field - termsEnum = null; - continue; + if (!term.field().equals(currentField)) { + // if we change the code to process updates in terms order, enable this assert +// assert currentField == null || currentField.compareTo(term.field()) < 0; + currentField = term.field(); + Terms terms = fields.terms(currentField); + if (terms != null) { + termsEnum = terms.iterator(termsEnum); + } else { + termsEnum = null; + continue; // no terms in that field + } } - - termsEnum = terms.iterator(termsEnum); + if (termsEnum == null) { + continue; + } // System.out.println(" term=" + term); if (termsEnum.seekExact(term.bytes())) { @@ -477,18 +499,22 @@ class BufferedDeletesStream { // TODO (D //System.out.println("BDS: got docsEnum=" + docsEnum); + NumericFieldUpdates fieldUpdates = result.get(update.field); + if (fieldUpdates == null) { + fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(reader.maxDoc()); + result.put(update.field, fieldUpdates); + } int doc; while ((doc = docsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { //System.out.println(Thread.currentThread().getName() + " numericDVUpdate term=" + term + " doc=" + docID); if (doc >= limit) { break; // no more docs that can be updated for this term } - rld.updateNumericDocValue(update.field, doc, update.value); - any = true; + fieldUpdates.add(doc, update.value); } } } - return any; + return result; } public static class QueryAndLimit { Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java?rev=1532670&r1=1532669&r2=1532670&view=diff ============================================================================== --- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (original) +++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java Wed Oct 16 07:43:18 2013 @@ -30,6 +30,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Map.Entry; import java.util.Queue; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -41,6 +42,7 @@ import org.apache.lucene.index.FieldInfo import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.index.MergePolicy.MergeTrigger; import org.apache.lucene.index.MergeState.CheckAbort; +import org.apache.lucene.index.NumericFieldUpdates.UpdatesIterator; import org.apache.lucene.search.Query; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.CompoundFileDirectory; @@ -466,7 +468,7 @@ public class IndexWriter implements Clos // pooling, so remove it: // System.out.println("[" + Thread.currentThread().getName() + "] ReaderPool.release: " + rld.info); if (rld.writeLiveDocs(directory)) { - // Make sure we only write del docs and field updates for a live segment: + // Make sure we only write del docs for a live segment: assert assertInfoLive == false || infoIsLive(rld.info); // Must checkpoint because we just // created new _X_N.del and field updates files; @@ -549,9 +551,8 @@ public class IndexWriter implements Clos final ReadersAndLiveDocs rld = readerMap.get(info); if (rld != null) { assert rld.info == info; - boolean hasFieldUpdates = rld.hasFieldUpdates(); // only reopen reader if there were field udpates if (rld.writeLiveDocs(directory)) { - // Make sure we only write del docs and updates for a live segment: + // Make sure we only write del docs for a live segment: assert infoIsLive(info); // Must checkpoint because we just @@ -562,11 +563,6 @@ public class IndexWriter implements Clos // invoked BDS.applyDeletes), whereas here all we // did was move the state to disk: checkpointNoSIS(); - - // we wrote field updates, reopen the reader - if (hasFieldUpdates) { - rld.reopenReader(IOContext.READ); - } } } } @@ -3049,7 +3045,7 @@ public class IndexWriter implements Clos flushDeletesCount.incrementAndGet(); final BufferedDeletesStream.ApplyDeletesResult result; result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos.asList()); - if (result.anyDeletes || result.anyNumericDVUpdates) { + if (result.anyDeletes) { checkpoint(); } if (!keepFullyDeletedSegments && result.allDeleted != null) { @@ -3107,6 +3103,18 @@ public class IndexWriter implements Clos return docMap; } + private void skipDeletedDoc(UpdatesIterator[] updatesIters, int deletedDoc) { + for (UpdatesIterator iter : updatesIters) { + if (iter.doc() == deletedDoc) { + iter.nextDoc(); + } + // when entering the method, all iterators must already be beyond the + // deleted document, or right on it, in which case we advance them above + // and they must be beyond it now. + assert iter.doc() > deletedDoc : "updateDoc=" + iter.doc() + " deletedDoc=" + deletedDoc; + } + } + /** * Carefully merges deletes and updates for the segments we just merged. This * is tricky because, although merging will clear all deletes (compacts the @@ -3137,7 +3145,7 @@ public class IndexWriter implements Clos ReadersAndLiveDocs mergedDeletes = null; // TODO (DVU_RENAME) to mergedDeletesAndUpdates boolean initWritableLiveDocs = false; MergePolicy.DocMap docMap = null; - final Map> mergedUpdates = new HashMap>(); + final Map mergedFieldUpdates = new HashMap(); for (int i = 0; i < sourceSegments.size(); i++) { SegmentInfoPerCommit info = sourceSegments.get(i); @@ -3148,8 +3156,23 @@ public class IndexWriter implements Clos // We hold a ref so it should still be in the pool: assert rld != null: "seg=" + info.info.name; final Bits currentLiveDocs = rld.getLiveDocs(); - final Map> mergingUpdates = rld.getMergingUpdates(); - + final Map mergingFieldUpdates = rld.getMergingFieldUpdates(); + final String[] mergingFields; + final UpdatesIterator[] updatesIters; + if (mergingFieldUpdates.isEmpty()) { + mergingFields = null; + updatesIters = null; + } else { + mergingFields = new String[mergingFieldUpdates.size()]; + updatesIters = new UpdatesIterator[mergingFieldUpdates.size()]; + int idx = 0; + for (Entry e : mergingFieldUpdates.entrySet()) { + mergingFields[idx] = e.getKey(); + updatesIters[idx] = e.getValue().getUpdates(); + updatesIters[idx].nextDoc(); // advance to first update doc + ++idx; + } + } // System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMergedDeletes: info=" + info + ", mergingUpdates=" + mergingUpdates); if (prevLiveDocs != null) { @@ -3191,35 +3214,73 @@ public class IndexWriter implements Clos initWritableLiveDocs = true; } mergedDeletes.delete(docMap.map(docUpto)); - } else if (mergingUpdates != null) { - // document isn't deleted, check if it has updates - Map docUpdates = mergingUpdates.get(Integer.valueOf(j)); - if (docUpdates != null) { - if (mergedDeletes == null) { - mergedDeletes = readerPool.get(merge.info, true); - docMap = getDocMap(merge, mergeState); + if (mergingFields != null) { // advance all iters beyond the deleted document + skipDeletedDoc(updatesIters, j); + } + } else if (mergingFields != null) { + // document isn't deleted, check if any of the fields have an update to it + int newDoc = -1; + for (int idx = 0; idx < mergingFields.length; idx++) { + UpdatesIterator updatesIter = updatesIters[idx]; + if (updatesIter.doc() == j) { // document has an update + if (mergedDeletes == null) { + mergedDeletes = readerPool.get(merge.info, true); + docMap = getDocMap(merge, mergeState); + } + if (newDoc == -1) { // map once per all field updates, but only if there are any updates + newDoc = docMap.map(docUpto); + } + String field = mergingFields[idx]; + NumericFieldUpdates fieldUpdates = mergedFieldUpdates.get(field); + if (fieldUpdates == null) { + // an approximantion of maxDoc, used to compute best bitsPerValue + fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(mergeState.segmentInfo.getDocCount()); + mergedFieldUpdates.put(field, fieldUpdates); + } + fieldUpdates.add(newDoc, updatesIter.value() == null ? NumericUpdate.MISSING : updatesIter.value()); + updatesIter.nextDoc(); // advance to next document + } else { + assert updatesIter.doc() > j : "updateDoc=" + updatesIter.doc() + " curDoc=" + j; } - mergedUpdates.put(Integer.valueOf(docMap.map(docUpto)), docUpdates); } } docUpto++; } } - } else if (mergingUpdates != null) { + } else if (mergingFields != null) { // need to check each non-deleted document if it has any updates for (int j = 0; j < docCount; j++) { if (prevLiveDocs.get(j)) { - // document isn't deleted, check if it has updates - Map docUpdates = mergingUpdates.get(Integer.valueOf(j)); - if (docUpdates != null) { - if (mergedDeletes == null) { - mergedDeletes = readerPool.get(merge.info, true); - docMap = getDocMap(merge, mergeState); + // document isn't deleted, check if any of the fields have an update to it + int newDoc = -1; + for (int idx = 0; idx < mergingFields.length; idx++) { + UpdatesIterator updatesIter = updatesIters[idx]; + if (updatesIter.doc() == j) { // document has an update + if (mergedDeletes == null) { + mergedDeletes = readerPool.get(merge.info, true); + docMap = getDocMap(merge, mergeState); + } + if (newDoc == -1) { // map once per all field updates, but only if there are any updates + newDoc = docMap.map(docUpto); + } + String field = mergingFields[idx]; + NumericFieldUpdates fieldUpdates = mergedFieldUpdates.get(field); + if (fieldUpdates == null) { + // an approximantion of maxDoc, used to compute best bitsPerValue + fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(mergeState.segmentInfo.getDocCount()); + mergedFieldUpdates.put(field, fieldUpdates); + } + fieldUpdates.add(newDoc, updatesIter.value() == null ? NumericUpdate.MISSING : updatesIter.value()); + updatesIter.nextDoc(); // advance to next document + } else { + assert updatesIter.doc() > j : "updateDoc=" + updatesIter.doc() + " curDoc=" + j; } - mergedUpdates.put(Integer.valueOf(docMap.map(docUpto)), docUpdates); } // advance docUpto for every non-deleted document docUpto++; + } else { + // advance all iters beyond the deleted document + skipDeletedDoc(updatesIters, j); } } } else { @@ -3241,54 +3302,106 @@ public class IndexWriter implements Clos initWritableLiveDocs = true; } mergedDeletes.delete(docMap.map(docUpto)); - } else if (mergingUpdates != null) { - // document isn't deleted, check if it has updates - Map docUpdates = mergingUpdates.get(Integer.valueOf(j)); - if (docUpdates != null) { - if (mergedDeletes == null) { - mergedDeletes = readerPool.get(merge.info, true); - docMap = getDocMap(merge, mergeState); + if (mergingFields != null) { // advance all iters beyond the deleted document + skipDeletedDoc(updatesIters, j); + } + } else if (mergingFields != null) { + // document isn't deleted, check if any of the fields have an update to it + int newDoc = -1; + for (int idx = 0; idx < mergingFields.length; idx++) { + UpdatesIterator updatesIter = updatesIters[idx]; + if (updatesIter.doc() == j) { // document has an update + if (mergedDeletes == null) { + mergedDeletes = readerPool.get(merge.info, true); + docMap = getDocMap(merge, mergeState); + } + if (newDoc == -1) { // map once per all field updates, but only if there are any updates + newDoc = docMap.map(docUpto); + } + String field = mergingFields[idx]; + NumericFieldUpdates fieldUpdates = mergedFieldUpdates.get(field); + if (fieldUpdates == null) { + // an approximantion of maxDoc, used to compute best bitsPerValue + fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(mergeState.segmentInfo.getDocCount()); + mergedFieldUpdates.put(field, fieldUpdates); + } + fieldUpdates.add(newDoc, updatesIter.value() == null ? NumericUpdate.MISSING : updatesIter.value()); + updatesIter.nextDoc(); // advance to next document + } else { + assert updatesIter.doc() > j : "field=" + mergingFields[idx] + " updateDoc=" + updatesIter.doc() + " curDoc=" + j; } - mergedUpdates.put(Integer.valueOf(docMap.map(docUpto)), docUpdates); } } docUpto++; } - } else if (mergingUpdates != null) { + } else if (mergingFields != null) { // no deletions before or after, but there were updates for (int j = 0; j < docCount; j++) { - Map docUpdates = mergingUpdates.get(Integer.valueOf(j)); - if (docUpdates != null) { - if (mergedDeletes == null) { - mergedDeletes = readerPool.get(merge.info, true); - docMap = getDocMap(merge, mergeState); + int newDoc = -1; + for (int idx = 0; idx < mergingFields.length; idx++) { + UpdatesIterator updatesIter = updatesIters[idx]; + if (updatesIter.doc() == j) { // document has an update + if (mergedDeletes == null) { + mergedDeletes = readerPool.get(merge.info, true); + docMap = getDocMap(merge, mergeState); + } + if (newDoc == -1) { // map once per all field updates, but only if there are any updates + newDoc = docMap.map(docUpto); + } + String field = mergingFields[idx]; + NumericFieldUpdates fieldUpdates = mergedFieldUpdates.get(field); + if (fieldUpdates == null) { + // an approximantion of maxDoc, used to compute best bitsPerValue + fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(mergeState.segmentInfo.getDocCount()); + mergedFieldUpdates.put(field, fieldUpdates); + } + fieldUpdates.add(newDoc, updatesIter.value() == null ? NumericUpdate.MISSING : updatesIter.value()); + updatesIter.nextDoc(); // advance to next document + } else { + assert updatesIter.doc() > j : "updateDoc=" + updatesIter.doc() + " curDoc=" + j; } - mergedUpdates.put(Integer.valueOf(docMap.map(docUpto)), docUpdates); } // advance docUpto for every non-deleted document docUpto++; } } else { - // No deletes before or after + // No deletes or updates before or after docUpto += info.info.getDocCount(); } } assert docUpto == merge.info.info.getDocCount(); - // set any updates that came while the segment was merging - if (!mergedUpdates.isEmpty()) { -// System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMergedDeletes: mergedDeletes.info=" + mergedDeletes.info + ", mergedUpdates=" + mergedUpdates); - assert mergedDeletes != null; - mergedDeletes.setMergingUpdates(mergedUpdates); + if (!mergedFieldUpdates.isEmpty()) { +// System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMergedDeletes: mergedDeletes.info=" + mergedDeletes.info + ", mergedFieldUpdates=" + mergedFieldUpdates); + boolean success = false; + try { + // if any error occurs while writing the field updates we should release + // the info, otherwise it stays in the pool but is considered not "live" + // which later causes false exceptions in pool.dropAll(). + // NOTE: currently this is the only place which throws a true + // IOException. If this ever changes, we need to extend that try/finally + // block to the rest of the method too. + mergedDeletes.writeFieldUpdates(directory, mergedFieldUpdates); + success = true; + } finally { + if (!success) { + mergedDeletes.dropChanges(); + readerPool.drop(merge.info); + } + } } - + if (infoStream.isEnabled("IW")) { if (mergedDeletes == null) { infoStream.message("IW", "no new deletes or field updates since merge started"); } else { - infoStream.message("IW", mergedDeletes.getPendingDeleteCount() + " new deletes since merge started and " - + mergedDeletes.getPendingUpdatesCount() + " new field updates since merge started"); + String msg = mergedDeletes.getPendingDeleteCount() + " new deletes"; + if (!mergedFieldUpdates.isEmpty()) { + msg += " and " + mergedFieldUpdates.size() + " new field updates"; + } + msg += " since merge started"; + infoStream.message("IW", msg); } } @@ -3325,11 +3438,9 @@ public class IndexWriter implements Clos return false; } - final ReadersAndLiveDocs mergedDeletes = merge.info.info.getDocCount() == 0 ? null : commitMergedDeletes(merge, mergeState); + final ReadersAndLiveDocs mergedDeletes = merge.info.info.getDocCount() == 0 ? null : commitMergedDeletes(merge, mergeState); // System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMerge: mergedDeletes=" + mergedDeletes); - assert mergedDeletes == null || mergedDeletes.getPendingDeleteCount() != 0 || mergedDeletes.hasFieldUpdates(); - // If the doc store we are using has been closed and // is in now compound format (but wasn't when we // started), then we will switch to the compound @@ -3654,7 +3765,7 @@ public class IndexWriter implements Clos // Lock order: IW -> BD final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream.applyDeletes(readerPool, merge.segments); - if (result.anyDeletes || result.anyNumericDVUpdates) { + if (result.anyDeletes) { checkpoint(); } @@ -3807,20 +3918,20 @@ public class IndexWriter implements Clos // Hold onto the "live" reader; we will use this to // commit merged deletes final ReadersAndLiveDocs rld = readerPool.get(info, true); - SegmentReader reader = rld.getReaderForMerge(context); - assert reader != null; - // Carefully pull the most recent live docs: + // Carefully pull the most recent live docs and reader + SegmentReader reader; final Bits liveDocs; final int delCount; - synchronized(this) { - // Must sync to ensure BufferedDeletesStream - // cannot change liveDocs/pendingDeleteCount while - // we pull a copy: + synchronized (this) { + // Must sync to ensure BufferedDeletesStream cannot change liveDocs, + // pendingDeleteCount and field updates while we pull a copy: + reader = rld.getReaderForMerge(context); liveDocs = rld.getReadOnlyLiveDocs(); delCount = rld.getPendingDeleteCount() + info.getDelCount(); + assert reader != null; assert rld.verifyDocCounts(); if (infoStream.isEnabled("IW")) { @@ -4006,7 +4117,7 @@ public class IndexWriter implements Clos final IndexReaderWarmer mergedSegmentWarmer = config.getMergedSegmentWarmer(); if (poolReaders && mergedSegmentWarmer != null && merge.info.info.getDocCount() != 0) { final ReadersAndLiveDocs rld = readerPool.get(merge.info, true); - final SegmentReader sr = rld.getReader(true, IOContext.READ); + final SegmentReader sr = rld.getReader(IOContext.READ); try { mergedSegmentWarmer.warm(sr); } finally { Added: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/NumericFieldUpdates.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/NumericFieldUpdates.java?rev=1532670&view=auto ============================================================================== --- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/NumericFieldUpdates.java (added) +++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/NumericFieldUpdates.java Wed Oct 16 07:43:18 2013 @@ -0,0 +1,259 @@ +package org.apache.lucene.index; + +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.InPlaceMergeSorter; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.packed.PackedInts; +import org.apache.lucene.util.packed.PagedGrowableWriter; +import org.apache.lucene.util.packed.PagedMutable; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Holds numeric values updates of documents, of a single + * {@link NumericDocValuesField}. + * + * @lucene.experimental + */ +interface NumericFieldUpdates { + + /** + * An iterator over documents and their updated values. Only documents with + * updates are returned by this iterator, and the documents are returned in + * increasing order. + */ + static abstract class UpdatesIterator { + + /** + * Returns the next document which has an update, or + * {@link DocIdSetIterator#NO_MORE_DOCS} if there are no more documents to + * return. + */ + abstract int nextDoc(); + + /** Returns the current document this iterator is on. */ + abstract int doc(); + + /** + * Returns the value of the document returned from {@link #nextDoc()}. A + * {@code null} value means that it was unset for this document. + */ + abstract Long value(); + + /** + * Reset the iterator's state. Should be called before {@link #nextDoc()} + * and {@link #value()}. + */ + abstract void reset(); + + } + + /** + * A {@link NumericFieldUpdates} which holds the updated documents and values + * in packed structures. Only supports up to 2B entries (docs and values) + * since we need to sort the docs/values and the Sorter interfaces currently + * only take integer indexes. + */ + static final class PackedNumericFieldUpdates implements NumericFieldUpdates { + + private FixedBitSet docsWithField; + private PagedMutable docs; + private PagedGrowableWriter values; + private int size; + + public PackedNumericFieldUpdates(int maxDoc) { + docsWithField = new FixedBitSet(64); + docs = new PagedMutable(1, 1024, PackedInts.bitsRequired(maxDoc - 1), PackedInts.COMPACT); + values = new PagedGrowableWriter(1, 1024, 1, PackedInts.FAST); + size = 0; + } + + @Override + public void add(int doc, Long value) { + assert value != null; + // TODO: if the Sorter interface changes to take long indexes, we can remove that limitation + if (size == Integer.MAX_VALUE) { + throw new IllegalStateException("cannot support more than Integer.MAX_VALUE doc/value entries"); + } + + // grow the structures to have room for more elements + if (docs.size() == size) { + docs = docs.grow(size + 1); + values = values.grow(size + 1); + int numWords = (int) (docs.size() >> 6); + if (docsWithField.getBits().length <= numWords) { + numWords = ArrayUtil.oversize(numWords + 1, RamUsageEstimator.NUM_BYTES_LONG); + docsWithField = new FixedBitSet(docsWithField, numWords << 6); + } + } + + if (value != NumericUpdate.MISSING) { + // only mark the document as having a value in that field if the value wasn't set to null (MISSING) + docsWithField.set(size); + } + + docs.set(size, doc); + values.set(size, value.longValue()); + ++size; + } + + @Override + public UpdatesIterator getUpdates() { + final PagedMutable docs = this.docs; + final PagedGrowableWriter values = this.values; + final FixedBitSet docsWithField = this.docsWithField; + new InPlaceMergeSorter() { + @Override + protected void swap(int i, int j) { + long tmpDoc = docs.get(j); + docs.set(j, docs.get(i)); + docs.set(i, tmpDoc); + + long tmpVal = values.get(j); + values.set(j, values.get(i)); + values.set(i, tmpVal); + + boolean tmpBool = docsWithField.get(j); + if (docsWithField.get(i)) { + docsWithField.set(j); + } else { + docsWithField.clear(j); + } + if (tmpBool) { + docsWithField.set(i); + } else { + docsWithField.clear(i); + } + } + + @Override + protected int compare(int i, int j) { + int x = (int) docs.get(i); + int y = (int) docs.get(j); + return (x < y) ? -1 : ((x == y) ? 0 : 1); + } + }.sort(0, size); + + final int size = this.size; + return new UpdatesIterator() { + private long idx = 0; // long so we don't overflow if size == Integer.MAX_VALUE + private int doc = -1; + private Long value = null; + + @Override + Long value() { + return value; + } + + @Override + int nextDoc() { + if (idx >= size) { + value = null; + return doc = DocIdSetIterator.NO_MORE_DOCS; + } + doc = (int) docs.get(idx); + ++idx; + while (idx < size && docs.get(idx) == doc) { + ++idx; + } + if (!docsWithField.get((int) (idx - 1))) { + value = null; + } else { + // idx points to the "next" element + value = Long.valueOf(values.get(idx - 1)); + } + return doc; + } + + @Override + int doc() { + return doc; + } + + @Override + void reset() { + doc = -1; + value = null; + idx = 0; + } + }; + } + + @Override + public void merge(NumericFieldUpdates other) { + if (other instanceof PackedNumericFieldUpdates) { + PackedNumericFieldUpdates packedOther = (PackedNumericFieldUpdates) other; + if (size + packedOther.size > Integer.MAX_VALUE) { + throw new IllegalStateException( + "cannot support more than Integer.MAX_VALUE doc/value entries; size=" + + size + " other.size=" + packedOther.size); + } + docs = docs.grow(size + packedOther.size); + values = values.grow(size + packedOther.size); + int numWords = (int) (docs.size() >> 6); + if (docsWithField.getBits().length <= numWords) { + numWords = ArrayUtil.oversize(numWords + 1, RamUsageEstimator.NUM_BYTES_LONG); + docsWithField = new FixedBitSet(docsWithField, numWords << 6); + } + for (int i = 0; i < packedOther.size; i++) { + int doc = (int) packedOther.docs.get(i); + if (packedOther.docsWithField.get(i)) { + docsWithField.set(size); + } + docs.set(size, doc); + values.set(size, packedOther.values.get(i)); + ++size; + } + } else { + UpdatesIterator iter = other.getUpdates(); + int doc; + while ((doc = iter.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + Long value = iter.value(); + if (value == null) { + value = NumericUpdate.MISSING; + } + add(doc, value); + } + } + } + + } + + /** + * Add an update to a document. For unsetting a value you should pass + * {@link NumericUpdate#MISSING} instead of {@code null}. + */ + public void add(int doc, Long value); + + /** + * Returns an {@link UpdatesIterator} over the updated documents and their + * values. + */ + public UpdatesIterator getUpdates(); + + /** + * Merge with another {@link NumericFieldUpdates}. This is called for a + * segment which received updates while it was being merged. The given updates + * should override whatever numeric updates are in that instance. + */ + public void merge(NumericFieldUpdates other); + +} Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java?rev=1532670&r1=1532669&r2=1532670&view=diff ============================================================================== --- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java (original) +++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java Wed Oct 16 07:43:18 2013 @@ -31,6 +31,7 @@ import org.apache.lucene.codecs.DocValue import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.LiveDocsFormat; import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.index.NumericFieldUpdates.UpdatesIterator; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.TrackingDirectoryWrapper; @@ -61,9 +62,6 @@ class ReadersAndLiveDocs { // TODO (DVU_ // reader). private Bits liveDocs; - // Holds the numeric DocValues updates. - private final Map> numericUpdates = new HashMap>(); - // How many further deletions we've done against // liveDocs vs when we loaded it or last wrote it: private int pendingDeleteCount; @@ -73,15 +71,15 @@ class ReadersAndLiveDocs { // TODO (DVU_ private boolean liveDocsShared; // Indicates whether this segment is currently being merged. While a segment - // is merging, all field updates are also registered in the mergingUpdates - // map. Also, calls to writeLiveDocs merge the updates with mergingUpdates. + // is merging, all field updates are also registered in the + // mergingNumericUpdates map. Also, calls to writeFieldUpdates merge the + // updates with mergingNumericUpdates. // That way, when the segment is done merging, IndexWriter can apply the // updates on the merged segment too. private boolean isMerging = false; - // Holds any updates that come through while this segment was being merged. - private final Map> mergingUpdates = new HashMap>(); - + private final Map mergingNumericUpdates = new HashMap(); + public ReadersAndLiveDocs(IndexWriter writer, SegmentInfoPerCommit info) { this.info = info; this.writer = writer; @@ -108,18 +106,6 @@ class ReadersAndLiveDocs { // TODO (DVU_ return pendingDeleteCount; } - public synchronized boolean hasFieldUpdates() { - return numericUpdates.size() > 0; - } - - public synchronized int getPendingUpdatesCount() { - int pendingUpdatesCount = 0; - for (Entry> e : numericUpdates.entrySet()) { - pendingUpdatesCount += e.getValue().size(); - } - return pendingUpdatesCount; - } - // Call only from assert! public synchronized boolean verifyDocCounts() { int count; @@ -138,23 +124,8 @@ class ReadersAndLiveDocs { // TODO (DVU_ return true; } - public synchronized void reopenReader(IOContext context) throws IOException { - if (reader != null) { - SegmentReader newReader = new SegmentReader(info, reader, liveDocs, info.info.getDocCount() - info.getDelCount() - pendingDeleteCount); - boolean reopened = false; - try { - reader.decRef(); - reader = newReader; - reopened = true; - } finally { - if (!reopened) { - newReader.decRef(); - } - } - } - } - - private synchronized SegmentReader doGetReader(IOContext context) throws IOException { + /** Returns a {@link SegmentReader}. */ + public SegmentReader getReader(IOContext context) throws IOException { if (reader == null) { // We steal returned ref: reader = new SegmentReader(info, context); @@ -168,71 +139,11 @@ class ReadersAndLiveDocs { // TODO (DVU_ return reader; } - private synchronized SegmentReader doGetReaderWithUpdates(IOContext context) throws IOException { - assert Thread.holdsLock(writer); // when we get here, we should already have the writer lock - boolean checkpoint = false; - try { - checkpoint = writeLiveDocs(info.info.dir); - if (reader == null) { - // We steal returned ref: -// System.out.println("[" + Thread.currentThread().getName() + "] RLD.doGetReaderWithUpdates: newSR " + info); - reader = new SegmentReader(info, context); - if (liveDocs == null) { - liveDocs = reader.getLiveDocs(); - } - } else if (checkpoint) { - // enroll a new reader with the applied updates -// System.out.println("[" + Thread.currentThread().getName() + "] RLD.doGetReaderWithUpdates: reopenReader " + info); - reopenReader(context); - } - - // Ref for caller - reader.incRef(); - return reader; - } finally { - if (checkpoint) { - writer.checkpoint(); - } - } - } - - /** Returns a {@link SegmentReader} while applying field updates if requested. */ - public SegmentReader getReader(boolean applyFieldUpdates, IOContext context) throws IOException { - // if we need to apply field updates, we call writeLiveDocs and change - // SegmentInfos. Therefore must hold the lock on IndexWriter. This code - // ensures that readers that don't need to apply updates don't pay the - // cost of obtaining it. - if (applyFieldUpdates && hasFieldUpdates()) { - synchronized (writer) { -// System.out.println("[" + Thread.currentThread().getName() + "] RLD.getReader: getReaderWithUpdates " + info); - return doGetReaderWithUpdates(context); - } - } else { -// System.out.println("[" + Thread.currentThread().getName() + "] RLD.getReader: getReader no updates " + info); - return doGetReader(context); - } - } - public synchronized void release(SegmentReader sr) throws IOException { assert info == sr.getSegmentInfo(); sr.decRef(); } - /** - * Updates the numeric doc value of {@code docID} under {@code field} to the - * given {@code value}. - */ - public synchronized void updateNumericDocValue(String field, int docID, Long value) { - assert Thread.holdsLock(writer); - assert docID >= 0 && docID < reader.maxDoc() : "out of bounds: docid=" + docID + " maxDoc=" + reader.maxDoc() + " seg=" + info.info.name + " docCount=" + info.info.getDocCount(); - Map updates = numericUpdates.get(field); - if (updates == null) { - updates = new HashMap(); - numericUpdates.put(field, updates); - } - updates.put(docID, value); - } - public synchronized boolean delete(int docID) { assert liveDocs != null; assert Thread.holdsLock(writer); @@ -268,8 +179,10 @@ class ReadersAndLiveDocs { // TODO (DVU_ * dont (ie do not call close()). */ public synchronized SegmentReader getReadOnlyClone(IOContext context) throws IOException { - getReader(true, context).decRef(); // make sure we enroll a new reader if there are field updates - assert reader != null; + if (reader == null) { + getReader(context).decRef(); + assert reader != null; + } liveDocsShared = true; if (liveDocs != null) { return new SegmentReader(reader.getSegmentInfo(), reader, liveDocs, info.info.getDocCount() - info.getDelCount() - pendingDeleteCount); @@ -324,7 +237,6 @@ class ReadersAndLiveDocs { // TODO (DVU_ // deletes onto the newly merged segment, so we can // discard them on the sub-readers: pendingDeleteCount = 0; - numericUpdates.clear(); dropMergingUpdates(); } @@ -335,13 +247,12 @@ class ReadersAndLiveDocs { // TODO (DVU_ public synchronized boolean writeLiveDocs(Directory dir) throws IOException { assert Thread.holdsLock(writer); //System.out.println("rld.writeLiveDocs seg=" + info + " pendingDelCount=" + pendingDeleteCount + " numericUpdates=" + numericUpdates); - final boolean hasFieldUpdates = hasFieldUpdates(); - if (pendingDeleteCount == 0 && !hasFieldUpdates) { + if (pendingDeleteCount == 0) { return false; } - // We have new deletes or updates - assert pendingDeleteCount == 0 || liveDocs.length() == info.info.getDocCount(); + // We have new deletes + assert liveDocs.length() == info.info.getDocCount(); // Do this so we can delete any created files on // exception; this saves all codecs from having to do @@ -351,134 +262,166 @@ class ReadersAndLiveDocs { // TODO (DVU_ // We can write directly to the actual name (vs to a // .tmp & renaming it) because the file is not live // until segments file is written: - FieldInfos fieldInfos = null; boolean success = false; try { Codec codec = info.info.getCodec(); - if (pendingDeleteCount > 0) { - codec.liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT); + codec.liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT); + success = true; + } finally { + if (!success) { + // Advance only the nextWriteDelGen so that a 2nd + // attempt to write will write to a new file + info.advanceNextWriteDelGen(); + + // Delete any partially created file(s): + for (String fileName : trackingDir.getCreatedFiles()) { + try { + dir.deleteFile(fileName); + } catch (Throwable t) { + // Ignore so we throw only the first exc + } + } } - - // apply numeric updates if there are any - if (hasFieldUpdates) { - // reader could be null e.g. for a just merged segment (from - // IndexWriter.commitMergedDeletes). -// if (this.reader == null) System.out.println("[" + Thread.currentThread().getName() + "] RLD.writeLiveDocs: newSR " + info); - final SegmentReader reader = this.reader == null ? new SegmentReader(info, IOContext.READONCE) : this.reader; - try { - // clone FieldInfos so that we can update their dvGen separately from - // the reader's infos and write them to a new fieldInfos_gen file - FieldInfos.Builder builder = new FieldInfos.Builder(writer.globalFieldNumberMap); - // cannot use builder.add(reader.getFieldInfos()) because it does not - // clone FI.attributes as well FI.dvGen - for (FieldInfo fi : reader.getFieldInfos()) { - FieldInfo clone = builder.add(fi); - // copy the stuff FieldInfos.Builder doesn't copy - if (fi.attributes() != null) { - for (Entry e : fi.attributes().entrySet()) { - clone.putAttribute(e.getKey(), e.getValue()); - } + } + + // If we hit an exc in the line above (eg disk full) + // then info's delGen remains pointing to the previous + // (successfully written) del docs: + info.advanceDelGen(); + info.setDelCount(info.getDelCount() + pendingDeleteCount); + pendingDeleteCount = 0; + + return true; + } + + // Writes field updates (new _X_N updates files) to the directory + public synchronized void writeFieldUpdates(Directory dir, Map numericFieldUpdates) throws IOException { + assert Thread.holdsLock(writer); + //System.out.println("rld.writeFieldUpdates: seg=" + info + " numericFieldUpdates=" + numericFieldUpdates); + + assert numericFieldUpdates != null && !numericFieldUpdates.isEmpty(); + + // Do this so we can delete any created files on + // exception; this saves all codecs from having to do + // it: + TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir); + + FieldInfos fieldInfos = null; + boolean success = false; + try { + final Codec codec = info.info.getCodec(); + + // reader could be null e.g. for a just merged segment (from + // IndexWriter.commitMergedDeletes). + final SegmentReader reader = this.reader == null ? new SegmentReader(info, IOContext.READONCE) : this.reader; + try { + // clone FieldInfos so that we can update their dvGen separately from + // the reader's infos and write them to a new fieldInfos_gen file + FieldInfos.Builder builder = new FieldInfos.Builder(writer.globalFieldNumberMap); + // cannot use builder.add(reader.getFieldInfos()) because it does not + // clone FI.attributes as well FI.dvGen + for (FieldInfo fi : reader.getFieldInfos()) { + FieldInfo clone = builder.add(fi); + // copy the stuff FieldInfos.Builder doesn't copy + if (fi.attributes() != null) { + for (Entry e : fi.attributes().entrySet()) { + clone.putAttribute(e.getKey(), e.getValue()); } - clone.setDocValuesGen(fi.getDocValuesGen()); } - // create new fields or update existing ones to have NumericDV type - for (String f : numericUpdates.keySet()) { - builder.addOrUpdate(f, NumericDocValuesField.TYPE); - } - - fieldInfos = builder.finish(); - final long nextFieldInfosGen = info.getNextFieldInfosGen(); - final String segmentSuffix = Long.toString(nextFieldInfosGen, Character.MAX_RADIX); - final SegmentWriteState state = new SegmentWriteState(null, trackingDir, info.info, fieldInfos, null, IOContext.DEFAULT, segmentSuffix); - final DocValuesFormat docValuesFormat = codec.docValuesFormat(); - final DocValuesConsumer fieldsConsumer = docValuesFormat.fieldsConsumer(state); - boolean fieldsConsumerSuccess = false; - try { -// System.out.println("[" + Thread.currentThread().getName() + "] RLD.writeLiveDocs: applying updates; seg=" + info + " updates=" + numericUpdates); - for (Entry> e : numericUpdates.entrySet()) { - final String field = e.getKey(); - final Map updates = e.getValue(); - final FieldInfo fieldInfo = fieldInfos.fieldInfo(field); - - assert fieldInfo != null; - - fieldInfo.setDocValuesGen(nextFieldInfosGen); - - // write the numeric updates to a new gen'd docvalues file - fieldsConsumer.addNumericField(fieldInfo, new Iterable() { - @SuppressWarnings("synthetic-access") - final NumericDocValues currentValues = reader.getNumericDocValues(field); - final Bits docsWithField = reader.getDocsWithField(field); - @Override - public Iterator iterator() { - return new Iterator() { - - @SuppressWarnings("synthetic-access") - final int maxDoc = reader.maxDoc(); - int curDoc = -1; - - @Override - public boolean hasNext() { - return curDoc < maxDoc - 1; + clone.setDocValuesGen(fi.getDocValuesGen()); + } + // create new fields or update existing ones to have NumericDV type + for (String f : numericFieldUpdates.keySet()) { + builder.addOrUpdate(f, NumericDocValuesField.TYPE); + } + + fieldInfos = builder.finish(); + final long nextFieldInfosGen = info.getNextFieldInfosGen(); + final String segmentSuffix = Long.toString(nextFieldInfosGen, Character.MAX_RADIX); + final SegmentWriteState state = new SegmentWriteState(null, trackingDir, info.info, fieldInfos, null, IOContext.DEFAULT, segmentSuffix); + final DocValuesFormat docValuesFormat = codec.docValuesFormat(); + final DocValuesConsumer fieldsConsumer = docValuesFormat.fieldsConsumer(state); + boolean fieldsConsumerSuccess = false; + try { +// System.out.println("[" + Thread.currentThread().getName() + "] RLD.writeLiveDocs: applying updates; seg=" + info + " updates=" + numericUpdates); + for (Entry e : numericFieldUpdates.entrySet()) { + final String field = e.getKey(); + final NumericFieldUpdates fieldUpdates = e.getValue(); + final FieldInfo fieldInfo = fieldInfos.fieldInfo(field); + assert fieldInfo != null; + + fieldInfo.setDocValuesGen(nextFieldInfosGen); + // write the numeric updates to a new gen'd docvalues file + fieldsConsumer.addNumericField(fieldInfo, new Iterable() { + final NumericDocValues currentValues = reader.getNumericDocValues(field); + final Bits docsWithField = reader.getDocsWithField(field); + final int maxDoc = reader.maxDoc(); + final UpdatesIterator updatesIter = fieldUpdates.getUpdates(); + @Override + public Iterator iterator() { + updatesIter.reset(); + return new Iterator() { + + int curDoc = -1; + int updateDoc = updatesIter.nextDoc(); + + @Override + public boolean hasNext() { + return curDoc < maxDoc - 1; + } + + @Override + public Number next() { + if (++curDoc >= maxDoc) { + throw new NoSuchElementException("no more documents to return values for"); } - - @Override - public Number next() { - if (++curDoc >= maxDoc) { - throw new NoSuchElementException("no more documents to return values for"); - } - Long updatedValue = updates.get(curDoc); - if (updatedValue == null) { + if (curDoc == updateDoc) { // this document has an updated value + Long value = updatesIter.value(); // either null (unset value) or updated value + updateDoc = updatesIter.nextDoc(); // prepare for next round + return value; + } else { + // no update for this document + assert curDoc < updateDoc; + if (currentValues != null && docsWithField.get(curDoc)) { // only read the current value if the document had a value before - if (currentValues != null && docsWithField.get(curDoc)) { - updatedValue = currentValues.get(curDoc); - } - } else if (updatedValue == NumericUpdate.MISSING) { - updatedValue = null; + return currentValues.get(curDoc); + } else { + return null; } - return updatedValue; } + } - @Override - public void remove() { - throw new UnsupportedOperationException("this iterator does not support removing elements"); - } - - }; - } - }); - } - - codec.fieldInfosFormat().getFieldInfosWriter().write(trackingDir, info.info.name, segmentSuffix, fieldInfos, IOContext.DEFAULT); - fieldsConsumerSuccess = true; - } finally { - if (fieldsConsumerSuccess) { - fieldsConsumer.close(); - } else { - IOUtils.closeWhileHandlingException(fieldsConsumer); - } + @Override + public void remove() { + throw new UnsupportedOperationException("this iterator does not support removing elements"); + } + }; + } + }); } + + codec.fieldInfosFormat().getFieldInfosWriter().write(trackingDir, info.info.name, segmentSuffix, fieldInfos, IOContext.DEFAULT); + fieldsConsumerSuccess = true; } finally { - if (reader != this.reader) { -// System.out.println("[" + Thread.currentThread().getName() + "] RLD.writeLiveDocs: closeReader " + reader); - reader.close(); + if (fieldsConsumerSuccess) { + fieldsConsumer.close(); + } else { + IOUtils.closeWhileHandlingException(fieldsConsumer); } } + } finally { + if (reader != this.reader) { +// System.out.println("[" + Thread.currentThread().getName() + "] RLD.writeLiveDocs: closeReader " + reader); + reader.close(); + } } + success = true; } finally { if (!success) { - // Advance only the nextWriteDelGen so that a 2nd - // attempt to write will write to a new file - if (pendingDeleteCount > 0) { - info.advanceNextWriteDelGen(); - } - // Advance only the nextWriteDocValuesGen so that a 2nd // attempt to write will write to a new file - if (hasFieldUpdates) { - info.advanceNextWriteFieldInfosGen(); - } + info.advanceNextWriteFieldInfosGen(); // Delete any partially created file(s): for (String fileName : trackingDir.getCreatedFiles()) { @@ -491,76 +434,68 @@ class ReadersAndLiveDocs { // TODO (DVU_ } } - // If we hit an exc in the line above (eg disk full) - // then info's delGen remains pointing to the previous - // (successfully written) del docs: - if (pendingDeleteCount > 0) { - info.advanceDelGen(); - info.setDelCount(info.getDelCount() + pendingDeleteCount); - pendingDeleteCount = 0; + info.advanceFieldInfosGen(); + // copy all the updates to mergingUpdates, so they can later be applied to the merged segment + if (isMerging) { + for (Entry e : numericFieldUpdates.entrySet()) { + NumericFieldUpdates fieldUpdates = mergingNumericUpdates.get(e.getKey()); + if (fieldUpdates == null) { + mergingNumericUpdates.put(e.getKey(), e.getValue()); + } else { + fieldUpdates.merge(e.getValue()); + } + } } - if (hasFieldUpdates) { - info.advanceFieldInfosGen(); - // copy all the updates to mergingUpdates, so they can later be applied to the merged segment - if (isMerging) { - copyUpdatesToMerging(); - } - numericUpdates.clear(); - - // create a new map, keeping only the gens that are in use - Map> genUpdatesFiles = info.getUpdatesFiles(); - Map> newGenUpdatesFiles = new HashMap>(); - final long fieldInfosGen = info.getFieldInfosGen(); - for (FieldInfo fi : fieldInfos) { - long dvGen = fi.getDocValuesGen(); - if (dvGen != -1 && !newGenUpdatesFiles.containsKey(dvGen)) { - if (dvGen == fieldInfosGen) { - newGenUpdatesFiles.put(fieldInfosGen, trackingDir.getCreatedFiles()); - } else { - newGenUpdatesFiles.put(dvGen, genUpdatesFiles.get(dvGen)); - } + // create a new map, keeping only the gens that are in use + Map> genUpdatesFiles = info.getUpdatesFiles(); + Map> newGenUpdatesFiles = new HashMap>(); + final long fieldInfosGen = info.getFieldInfosGen(); + for (FieldInfo fi : fieldInfos) { + long dvGen = fi.getDocValuesGen(); + if (dvGen != -1 && !newGenUpdatesFiles.containsKey(dvGen)) { + if (dvGen == fieldInfosGen) { + newGenUpdatesFiles.put(fieldInfosGen, trackingDir.getCreatedFiles()); + } else { + newGenUpdatesFiles.put(dvGen, genUpdatesFiles.get(dvGen)); } } - - info.setGenUpdatesFiles(newGenUpdatesFiles); } + + info.setGenUpdatesFiles(newGenUpdatesFiles); + + // wrote new files, should checkpoint() + writer.checkpoint(); - return true; - } - - private void copyUpdatesToMerging() { -// System.out.println("[" + Thread.currentThread().getName() + "] RLD.copyUpdatesToMerging: " + numericUpdates); - // cannot do a simple putAll, even if mergingUpdates is empty, because we - // need a shallow copy of the values (maps) - for (Entry> e : numericUpdates.entrySet()) { - String field = e.getKey(); - Map merging = mergingUpdates.get(field); - if (merging == null) { - mergingUpdates.put(field, new HashMap(e.getValue())); - } else { - merging.putAll(e.getValue()); + // if there is a reader open, reopen it to reflect the updates + if (reader != null) { + SegmentReader newReader = new SegmentReader(info, reader, liveDocs, info.info.getDocCount() - info.getDelCount() - pendingDeleteCount); + boolean reopened = false; + try { + reader.decRef(); + reader = newReader; + reopened = true; + } finally { + if (!reopened) { + newReader.decRef(); + } } } } - + /** * Returns a reader for merge. This method applies field updates if there are * any and marks that this segment is currently merging. */ - SegmentReader getReaderForMerge(IOContext context) throws IOException { - // lock ordering must be IW -> RLD, otherwise could cause deadlocks - synchronized (writer) { - synchronized (this) { - // must execute these two statements as atomic operation, otherwise we - // could lose updates if e.g. another thread calls writeLiveDocs in - // between, or the updates are applied to the obtained reader, but then - // re-applied in IW.commitMergedDeletes (unnecessary work and potential - // bugs. - isMerging = true; - return getReader(true, context); - } - } + synchronized SegmentReader getReaderForMerge(IOContext context) throws IOException { + assert Thread.holdsLock(writer); + // must execute these two statements as atomic operation, otherwise we + // could lose updates if e.g. another thread calls writeFieldUpdates in + // between, or the updates are applied to the obtained reader, but then + // re-applied in IW.commitMergedDeletes (unnecessary work and potential + // bugs). + isMerging = true; + return getReader(context); } /** @@ -568,54 +503,13 @@ class ReadersAndLiveDocs { // TODO (DVU_ * finished merging (whether successfully or not). */ public synchronized void dropMergingUpdates() { - mergingUpdates.clear(); + mergingNumericUpdates.clear(); isMerging = false; } - /** - * Called from IndexWriter after applying deletes to the merged segment, while - * it was being merged. - */ - public synchronized void setMergingUpdates(Map> updates) { - for (Entry> e : updates.entrySet()) { - int doc = e.getKey().intValue(); - for (Entry docUpdates : e.getValue().entrySet()) { - String field = docUpdates.getKey(); - Long value = docUpdates.getValue(); - Map fieldUpdates = numericUpdates.get(field); - if (fieldUpdates == null) { - fieldUpdates = new HashMap(); - numericUpdates.put(field, fieldUpdates); - } - fieldUpdates.put(doc, value); - } - } - } - /** Returns updates that came in while this segment was merging. */ - public synchronized Map> getMergingUpdates() { - copyUpdatesToMerging(); - if (mergingUpdates.isEmpty()) { - return null; - } - - Map> updates = new HashMap>(); - for (Entry> e : mergingUpdates.entrySet()) { - String field = e.getKey(); - for (Entry fieldUpdates : e.getValue().entrySet()) { - Integer doc = fieldUpdates.getKey(); - Long value = fieldUpdates.getValue(); - Map docUpdates = updates.get(doc); - if (docUpdates == null) { - docUpdates = new HashMap(); - updates.put(doc, docUpdates); - } - docUpdates.put(field, value); - } - } - - mergingUpdates.clear(); - return updates; + public synchronized Map getMergingFieldUpdates() { + return mergingNumericUpdates; } @Override @@ -624,7 +518,6 @@ class ReadersAndLiveDocs { // TODO (DVU_ sb.append("ReadersAndLiveDocs(seg=").append(info); sb.append(" pendingDeleteCount=").append(pendingDeleteCount); sb.append(" liveDocsShared=").append(liveDocsShared); - sb.append(" pendingUpdatesCount=").append(getPendingUpdatesCount()); return sb.toString(); } Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java?rev=1532670&r1=1532669&r2=1532670&view=diff ============================================================================== --- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java (original) +++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java Wed Oct 16 07:43:18 2013 @@ -43,9 +43,7 @@ import org.apache.lucene.search.ScoreDoc import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; -import org.apache.lucene.store.MockDirectoryWrapper.FakeIOException; import org.apache.lucene.store.RAMDirectory; -import org.apache.lucene.util.Bits; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util._TestUtil; @@ -1221,225 +1219,4 @@ public class TestIndexWriterDelete exten r.close(); d.close(); } - - // Make sure if we hit a transient IOException (e.g., disk - // full), and then the exception stops (e.g., disk frees - // up), so we successfully close IW or open an NRT - // reader, we don't lose any deletes or updates: - public void testNoLostDeletesOrUpdatesOnIOException() throws Exception { - int deleteCount = 0; - int docBase = 0; - int docCount = 0; - - MockDirectoryWrapper dir = newMockDirectory(); - final AtomicBoolean shouldFail = new AtomicBoolean(); - dir.failOn(new MockDirectoryWrapper.Failure() { - - @Override - public void eval(MockDirectoryWrapper dir) throws IOException { - StackTraceElement[] trace = new Exception().getStackTrace(); - if (shouldFail.get() == false) { - return; - } - - boolean sawSeal = false; - boolean sawWrite = false; - for (int i = 0; i < trace.length; i++) { - if ("sealFlushedSegment".equals(trace[i].getMethodName())) { - sawSeal = true; - break; - } - if ("writeLiveDocs".equals(trace[i].getMethodName())) { - sawWrite = true; - } - } - - // Don't throw exc if we are "flushing", else - // the segment is aborted and docs are lost: - if (sawWrite && sawSeal == false && random().nextInt(3) == 2) { - // Only sometimes throw the exc, so we get - // it sometimes on creating the file, on - // flushing buffer, on closing the file: - if (VERBOSE) { - System.out.println("TEST: now fail; thread=" + Thread.currentThread().getName() + " exc:"); - new Throwable().printStackTrace(System.out); - } - shouldFail.set(false); - throw new FakeIOException(); - } - } - }); - - RandomIndexWriter w = null; - - for(int iter=0;iter<10*RANDOM_MULTIPLIER;iter++) { - int numDocs = atLeast(100); - if (VERBOSE) { - System.out.println("\nTEST: iter=" + iter + " numDocs=" + numDocs + " docBase=" + docBase + " delCount=" + deleteCount); - } - if (w == null) { - IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); - final MergeScheduler ms = iwc.getMergeScheduler(); - if (ms instanceof ConcurrentMergeScheduler) { - final ConcurrentMergeScheduler suppressFakeIOE = new ConcurrentMergeScheduler() { - @Override - protected void handleMergeException(Throwable exc) { - // suppress only FakeIOException: - if (!(exc instanceof FakeIOException)) { - super.handleMergeException(exc); - } - } - }; - final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) ms; - suppressFakeIOE.setMaxMergesAndThreads(cms.getMaxMergeCount(), cms.getMaxThreadCount()); - suppressFakeIOE.setMergeThreadPriority(cms.getMergeThreadPriority()); - iwc.setMergeScheduler(suppressFakeIOE); - } - - w = new RandomIndexWriter(random(), dir, iwc); - // Since we hit exc during merging, a partial - // forceMerge can easily return when there are still - // too many segments in the index: - w.setDoRandomForceMergeAssert(false); - } - for(int i=0;i