Return-Path: X-Original-To: apmail-lucene-commits-archive@www.apache.org Delivered-To: apmail-lucene-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A2CCC72A9 for ; Mon, 1 Aug 2011 21:50:49 +0000 (UTC) Received: (qmail 69645 invoked by uid 500); 1 Aug 2011 21:50:49 -0000 Mailing-List: contact commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@lucene.apache.org Delivered-To: mailing list commits@lucene.apache.org Received: (qmail 69487 invoked by uid 99); 1 Aug 2011 21:50:48 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Aug 2011 21:50:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Aug 2011 21:50:43 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id D4CD023889E3; Mon, 1 Aug 2011 21:50:23 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1152945 [3/5] - in /lucene/dev/branches/blocktree_3030: ./ lucene/contrib/queries/src/java/org/apache/lucene/search/ lucene/src/java/org/apache/lucene/index/ lucene/src/java/org/apache/lucene/index/codecs/ lucene/src/java/org/apache/lucene... Date: Mon, 01 Aug 2011 21:50:19 -0000 To: commits@lucene.apache.org From: mikemccand@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110801215023.D4CD023889E3@eris.apache.org> Added: lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/BlockTreeTermsWriter.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/BlockTreeTermsWriter.java?rev=1152945&view=auto ============================================================================== --- lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/BlockTreeTermsWriter.java (added) +++ lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/BlockTreeTermsWriter.java Mon Aug 1 21:50:05 2011 @@ -0,0 +1,850 @@ +package org.apache.lucene.index.codecs; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfo.IndexOptions; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.RAMOutputStream; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.CodecUtil; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.IntsRef; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.fst.Builder; +import org.apache.lucene.util.fst.ByteSequenceOutputs; +import org.apache.lucene.util.fst.BytesRefFSTEnum; +import org.apache.lucene.util.fst.FST; +import org.apache.lucene.util.fst.NoOutputs; +import org.apache.lucene.util.fst.Util; + +// TODO: currently we encode all terms between two indexed +// terms as a block; but, we could decouple the two, ie +// allow several blocks in between two indexed terms + +/** + * Writes terms dict and index, block-encoding (column + * stride) each term's metadata for each set of terms + * between two index terms. + * + * @lucene.experimental + */ + +// nocommit -- somehow save/print stats for debugging, eg +// how many normal blocks, floor blocks, etc. + +public class BlockTreeTermsWriter extends FieldsConsumer { + + public static boolean DEBUG = false; + public static boolean DEBUG2 = false; + public static boolean SAVE_DOT_FILES = false; + + final static String CODEC_NAME = "BLOCK_TREE_TERMS_DICT"; + + // Initial format + public static final int VERSION_START = 0; + + public static final int VERSION_CURRENT = VERSION_START; + + /** Extension of terms file */ + static final String TERMS_EXTENSION = "tim"; + static final String TERMS_INDEX_EXTENSION = "tip"; + + protected final IndexOutput out; + private final IndexOutput indexOut; + final int minItemsInBlock; + final int maxItemsInBlock; + + final BlockTreePostingsWriterBase postingsWriter; + final FieldInfos fieldInfos; + FieldInfo currentField; + private final List fields = new ArrayList(); + private final String segment; + + // nocommit should take min block size? + public BlockTreeTermsWriter( + SegmentWriteState state, + BlockTreePostingsWriterBase postingsWriter, + int minItemsInBlock, + int maxItemsInBlock) + throws IOException + { + + // nocommit -- make sure minItemsInBlock is > 1 + + if (minItemsInBlock <= 0) { + throw new IllegalArgumentException("minItemsInBlock must be >= 1; got " + minItemsInBlock); + } + if (maxItemsInBlock <= 0) { + throw new IllegalArgumentException("maxItemsInBlock must be >= 1; got " + maxItemsInBlock); + } + if (minItemsInBlock > maxItemsInBlock) { + throw new IllegalArgumentException("maxItemsInBlock must be >= minItemsInBlock; got maxItemsInBlock=" + maxItemsInBlock + " minItemsInBlock=" + minItemsInBlock); + } + if (2*(minItemsInBlock-1) > maxItemsInBlock) { + throw new IllegalArgumentException("maxItemsInBlock must be at least 2*(minItemsInBlock-1); got maxItemsInBlock=" + maxItemsInBlock + " minItemsInBlock=" + minItemsInBlock); + } + + final String termsFileName = IndexFileNames.segmentFileName(state.segmentName, state.codecId, TERMS_EXTENSION); + out = state.directory.createOutput(termsFileName, state.context); + boolean success = false; + IndexOutput indexOut = null; + try { + fieldInfos = state.fieldInfos; + this.minItemsInBlock = minItemsInBlock; + this.maxItemsInBlock = maxItemsInBlock; + writeHeader(out); + + //DEBUG = state.segmentName.equals("_4a"); + + final String termsIndexFileName = IndexFileNames.segmentFileName(state.segmentName, state.codecId, TERMS_INDEX_EXTENSION); + indexOut = state.directory.createOutput(termsIndexFileName, state.context); + writeIndexHeader(indexOut); + + currentField = null; + this.postingsWriter = postingsWriter; + segment = state.segmentName; + + // System.out.println("BTW.init seg=" + state.segmentName); + + postingsWriter.start(out); // have consumer write its format/header + success = true; + } finally { + if (!success) { + IOUtils.closeSafely(true, out, indexOut); + } + } + this.indexOut = indexOut; + } + + protected void writeHeader(IndexOutput out) throws IOException { + CodecUtil.writeHeader(out, CODEC_NAME, VERSION_CURRENT); + out.writeLong(0); // leave space for end index pointer + } + + protected void writeIndexHeader(IndexOutput out) throws IOException { + CodecUtil.writeHeader(out, CODEC_NAME, VERSION_CURRENT); + out.writeLong(0); // leave space for end index pointer + } + + protected void writeTrailer(long dirStart) throws IOException { + out.seek(CodecUtil.headerLength(CODEC_NAME)); + out.writeLong(dirStart); + } + + protected void writeIndexTrailer(long dirStart) throws IOException { + indexOut.seek(CodecUtil.headerLength(CODEC_NAME)); + indexOut.writeLong(dirStart); + } + + @Override + public TermsConsumer addField(FieldInfo field) throws IOException { + //DEBUG = field.name.equals("id"); + if (DEBUG2 || DEBUG) System.out.println("\nBTTW.addField seg=" + segment + " field=" + field.name); + assert currentField == null || currentField.name.compareTo(field.name) < 0; + currentField = field; + final TermsWriter terms = new TermsWriter(field); + fields.add(terms); + return terms; + } + + private static class PendingTerm { + public final BytesRef term; + public final TermStats stats; + + public PendingTerm(BytesRef term, TermStats stats) { + this.term = term; + this.stats = stats; + } + + @Override + public String toString() { + return term.utf8ToString(); + } + } + + static long encodeOutput(long fp, boolean hasTerms, boolean isFloor) { + // nocommit assert fp is "small enough" + // nocommit use constants here instead of 1, 2: + return (fp << 2) | (hasTerms ? 2 : 0) | (isFloor ? 1 : 0); + } + + private static class PendingBlock { + public final BytesRef prefix; + public final long fp; + public FST index; + public List> subIndices; + public final boolean hasTerms; + public final boolean isFloor; + public final int floorLeadByte; + + public PendingBlock(BytesRef prefix, long fp, boolean hasTerms, boolean isFloor, int floorLeadByte, List> subIndices) { + this.prefix = prefix; + this.fp = fp; + this.hasTerms = hasTerms; + this.isFloor = isFloor; + this.floorLeadByte = floorLeadByte; + this.subIndices = subIndices; + } + + @Override + public String toString() { + return "BLOCK: " + prefix.utf8ToString(); + } + + public void compileIndex(List floorBlocks, RAMOutputStream scratchBytes) throws IOException { + + assert (isFloor && floorBlocks != null && floorBlocks.size() != 0) || (!isFloor && floorBlocks == null): "isFloor=" + isFloor + " floorBlocks=" + floorBlocks; + + assert scratchBytes.getFilePointer() == 0; + // nocommit -- vLong is bad for FST!!! it writes LSB + // first which means less byte[] prefix sharing I + // think??? sheesh. + scratchBytes.writeVLong(encodeOutput(fp, hasTerms, isFloor)); + if (isFloor) { + scratchBytes.writeVInt(floorBlocks.size()); + for (PendingBlock sub : floorBlocks) { + assert sub.floorLeadByte != -1; + if (DEBUG) { + System.out.println(" write floorLeadByte=" + Integer.toHexString(sub.floorLeadByte&0xff)); + } + scratchBytes.writeByte((byte) sub.floorLeadByte); + assert sub.fp > fp; + // nocommit -- why do we need hasTerms here? + // nocommit -- need isFloor here? + scratchBytes.writeVLong(((sub.fp - fp) << 1) | (sub.hasTerms ? 1 : 0)); + } + } + + final ByteSequenceOutputs outputs = ByteSequenceOutputs.getSingleton(); + final Builder indexBuilder = new Builder(FST.INPUT_TYPE.BYTE1, + 0, 0, true, true, Integer.MAX_VALUE, + outputs, null); + if (DEBUG) { + System.out.println(" compile index for prefix=" + prefix); + } + indexBuilder.DEBUG = false; + final byte[] bytes = new byte[(int) scratchBytes.getFilePointer()]; + assert bytes.length > 0; + scratchBytes.writeTo(bytes, 0); + indexBuilder.add(prefix, new BytesRef(bytes, 0, bytes.length)); + scratchBytes.reset(); + + // Copy over index for all sub-blocks + + for(FST subIndex : subIndices) { + append(indexBuilder, subIndex); + } + + if (floorBlocks != null) { + for (PendingBlock sub : floorBlocks) { + for(FST subIndex : sub.subIndices) { + append(indexBuilder, subIndex); + } + sub.subIndices = null; + } + } + + index = indexBuilder.finish(); + subIndices = null; + + /* + Writer w = new OutputStreamWriter(new FileOutputStream("out.dot")); + Util.toDot(index, w, false, false); + System.out.println("SAVED to out.dot"); + w.close(); + */ + } + + // TODO: maybe we could add bulk-add method to + // Builder? Takes FST and unions it w/ current + // FST. + private void append(Builder builder, FST subIndex) throws IOException { + final BytesRefFSTEnum subIndexEnum = new BytesRefFSTEnum(subIndex); + BytesRefFSTEnum.InputOutput indexEnt; + while((indexEnt = subIndexEnum.next()) != null) { + if (DEBUG) { + System.out.println(" add sub=" + indexEnt.input + " " + indexEnt.input + " output=" + indexEnt.output); + } + builder.add(indexEnt.input, indexEnt.output); + } + } + } + + final RAMOutputStream scratchBytes = new RAMOutputStream(); + + class TermsWriter extends TermsConsumer { + private final FieldInfo fieldInfo; + private final long termsStartPointer; + private long numTerms; + long sumTotalTermFreq; + long sumDocFreq; + long indexStartFP; + + // Used only to partition terms into the block tree; we + // don't pull an FST from this builder: + private final NoOutputs noOutputs; + private final Builder blockBuilder; + + // PendingTerm or PendingBlock: + private final List pending = new ArrayList(); + + // This class assigns terms to blocks "naturally", ie, + // according to the number of terms under a given prefix + // that we encounter: + private class FindBlocks extends Builder.FreezeTail { + + @Override + public void freeze(final Builder.UnCompiledNode[] frontier, int prefixLenPlus1, final IntsRef lastInput) throws IOException { + + if (DEBUG) System.out.println(" freeze prefixLenPlus1=" + prefixLenPlus1); + + for(int idx=lastInput.length; idx >= prefixLenPlus1; idx--) { + final Builder.UnCompiledNode node = frontier[idx]; + final Builder.UnCompiledNode parent = idx == 0 ? null : frontier[idx-1]; + + long totCount = 0; + + if (node.isFinal) { + totCount++; + } + + //System.out.println("VISIT node=" + node + " + //arcs=" + node.numArcs); + for(int arcIdx=0;arcIdx target = (Builder.UnCompiledNode) node.arcs[arcIdx].target; + totCount += target.inputCount; + target.clear(); + node.arcs[arcIdx].target = null; + } + node.numArcs = 0; + + boolean forceBlock = false; + + // nocommit fixup + + if (idx == 1) { + // nocommit -- make this 1 configurable -- maybe + // 2 is better if there are many terms? + + // We force a block if prefix is length 1 and + // there are any terms, so that the root block + // doesn't have terms. + + // nocommit: instead, we should accum termCount & + // blockCount into UnCompiledNode? + for(int pendingIdx=0;pendingIdx= minItemsInBlock || idx == 0 || forceBlock) { + if (DEBUG2 || DEBUG) { + if (totCount < minItemsInBlock && idx != 0) { + System.out.println(" force block has terms"); + } + } + node.inputCount = writeBlocks(lastInput, idx, (int) totCount); + } else { + // stragglers! carry count upwards + node.inputCount = totCount; + } + frontier[idx] = new Builder.UnCompiledNode(blockBuilder, idx); + } + } + } + + private int[] subBytes = new int[10]; + private int[] subTermCounts = new int[10]; + private int[] subTermCountSums = new int[10]; + private int[] subSubCounts = new int[10]; + + // Write the top count entries on the pending stack as + // one or more blocks. + int writeBlocks(IntsRef prevTerm, int prefixLength, int count) throws IOException { + if (prefixLength == 0 || count <= maxItemsInBlock) { + // Not floor block + final PendingBlock nonFloorBlock = writeBlock(prevTerm, prefixLength, prefixLength, count, count, 0, false, -1, true); + nonFloorBlock.compileIndex(null, scratchBytes); + pending.add(nonFloorBlock); + } else { + + // nocommit -- we could enrich this format so that + // we store min & max label for this block, then it + // can be "authoritative" + + if (DEBUG) { + final BytesRef prefix = new BytesRef(prefixLength); + for(int m=0;m 1: "subCount=" + subCount + " sub=" + sub + " of " + numSubs + " subTermCount=" + subTermCountSums[sub] + " subSubCount=" + subSubCounts[sub] + " depth=" + prefixLength; + subCount = 0; + startLabel = subBytes[sub+1]; + + if (curStart == 0) { + break; + } + + if (curStart <= maxItemsInBlock) { + // remainder is small enough to fit into a + // block. NOTE that this may be too small (< + // minItemsInBlock); need a true segmenter + // here + assert startLabel != -1; + assert firstBlock != null; + prevTerm.ints[prevTerm.offset + prefixLength] = startLabel; + //System.out.println(" final " + (numSubs-sub-1) + " subs"); + floorBlocks.add(writeBlock(prevTerm, prefixLength, prefixLength+1, curStart, curStart, 0, true, startLabel, true)); + break; + } + } + } + + prevTerm.ints[prevTerm.offset + prefixLength] = savLabel; + + assert firstBlock != null; + firstBlock.compileIndex(floorBlocks, scratchBytes); + + pending.add(firstBlock); + if (DEBUG) System.out.println(" done pending.size()=" + pending.size()); + } + + return 1; + } + + String brPrefixToString(BytesRef b) { + // nocommit + return b.toString(); + //return b.utf8ToString() + " " + b; + } + + String brToString(BytesRef b) { + // nocommit + // return b.toString(); + return b.utf8ToString() + " " + b; + } + + // TODO: we could block-write the term suffix pointers; + // this would take more space but would enable binary + // search on lookup + private PendingBlock writeBlock(IntsRef prevTerm, int prefixLength, int indexPrefixLength, int start, int length, int futureTermCount, boolean isFloor, int floorLeadByte, boolean isLastInFloor) throws IOException { + + assert length > 0; + + final BytesRef prefix = new BytesRef(indexPrefixLength); + for(int m=0;m(FST.INPUT_TYPE.BYTE1, + 0, 0, true, + true, Integer.MAX_VALUE, + noOutputs, + new FindBlocks()); + + termsStartPointer = out.getFilePointer(); + postingsWriter.setField(fieldInfo); + } + + @Override + public Comparator getComparator() { + return BytesRef.getUTF8SortedAsUnicodeComparator(); + } + + @Override + public PostingsConsumer startTerm(BytesRef text) throws IOException { + //System.out.println("BTW.startTerm term=" + fieldInfo.name + ":" + text.utf8ToString() + " " + text + " seg=" + segment); + postingsWriter.startTerm(); + /* + if (fieldInfo.name.equals("id")) { + postingsWriter.termID = Integer.parseInt(text.utf8ToString()); + } else { + postingsWriter.termID = -1; + } + */ + return postingsWriter; + } + + @Override + public void finishTerm(BytesRef text, TermStats stats) throws IOException { + + assert stats.docFreq > 0; + if (DEBUG) System.out.println("BTTW.finishTerm term=" + fieldInfo.name + ":" + text.utf8ToString() + " " + text + " seg=" + segment + " df=" + stats.docFreq); + + blockBuilder.add(text, noOutputs.getNoOutput()); + pending.add(new PendingTerm(new BytesRef(text), stats)); + postingsWriter.finishTerm(stats); + numTerms++; + } + + // Finishes all terms in this field + @Override + public void finish(long sumTotalTermFreq, long sumDocFreq) throws IOException { + // nocommit write sumDocFreq + if (numTerms > 0) { + blockBuilder.finish(); + + // We better have one final "root" block: + assert pending.size() == 1 && pending.get(0) instanceof PendingBlock: "pending.size()=" + pending.size() + " pending=" + pending; + final PendingBlock root = (PendingBlock) pending.get(0); + assert root.prefix.length == 0; + assert root.index.getEmptyOutput() != null; + + this.sumTotalTermFreq = sumTotalTermFreq; + this.sumDocFreq = sumDocFreq; + + // Write FST to index + indexStartFP = indexOut.getFilePointer(); + root.index.save(indexOut); + //System.out.println(" write FST " + indexStartFP + " field=" + fieldInfo.name); + + if (SAVE_DOT_FILES || DEBUG2 || DEBUG) { + final String dotFileName = segment + "_" + fieldInfo.name + ".dot"; + Writer w = new OutputStreamWriter(new FileOutputStream(dotFileName)); + Util.toDot(root.index, w, false, false); + System.out.println("SAVED to " + dotFileName); + w.close(); + } + } + } + + private final RAMOutputStream bytesWriter = new RAMOutputStream(); + } + + @Override + public void close() throws IOException { + + IOException ioe = null; + try { + + int nonZeroCount = 0; + for(TermsWriter field : fields) { + if (field.numTerms > 0) { + nonZeroCount++; + } + } + + final long dirStart = out.getFilePointer(); + final long indexDirStart = indexOut.getFilePointer(); + + out.writeVInt(nonZeroCount); + + for(TermsWriter field : fields) { + if (field.numTerms > 0) { + //System.out.println(" field " + field.fieldInfo.name + " " + field.numTerms + " terms"); + out.writeVInt(field.fieldInfo.number); + out.writeVLong(field.numTerms); + // nocommit: we may not need termsStartPointer? + out.writeVLong(field.termsStartPointer); + final BytesRef rootCode = ((PendingBlock) field.pending.get(0)).index.getEmptyOutput(); + assert rootCode != null: "field=" + field.fieldInfo.name + " numTerms=" + field.numTerms; + out.writeVInt(rootCode.length); + out.writeBytes(rootCode.bytes, rootCode.offset, rootCode.length); + if (field.fieldInfo.indexOptions != IndexOptions.DOCS_ONLY) { + out.writeVLong(field.sumTotalTermFreq); + } + out.writeVLong(field.sumDocFreq); + indexOut.writeVLong(field.indexStartFP); + } + } + writeTrailer(dirStart); + writeIndexTrailer(indexDirStart); + } catch (IOException ioe2) { + ioe = ioe2; + } finally { + IOUtils.closeSafely(ioe, out, indexOut, postingsWriter); + } + } +} Modified: lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/CodecProvider.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/CodecProvider.java?rev=1152945&r1=1152944&r2=1152945&view=diff ============================================================================== --- lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/CodecProvider.java (original) +++ lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/CodecProvider.java Mon Aug 1 21:50:05 2011 @@ -43,7 +43,7 @@ public class CodecProvider { private final Set knownExtensions = new HashSet(); - public final static String[] CORE_CODECS = new String[] {"Standard", "Pulsing", "PreFlex", "SimpleText", "Memory"}; + public final static String[] CORE_CODECS = new String[] {"Standard", "StandardTree", "Pulsing", "PulsingTree", "PreFlex", "SimpleText", "Memory"}; public synchronized void register(Codec codec) { if (codec.name == null) { @@ -84,7 +84,7 @@ public class CodecProvider { public synchronized Codec lookup(String name) { final Codec codec = codecs.get(name); if (codec == null) { - throw new IllegalArgumentException("required codec '" + name + "' not found"); + throw new IllegalArgumentException("required codec '" + name + "' not found; known codecs: " + codecs.keySet()); } return codec; } Modified: lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/CoreCodecProvider.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/CoreCodecProvider.java?rev=1152945&r1=1152944&r2=1152945&view=diff ============================================================================== --- lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/CoreCodecProvider.java (original) +++ lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/CoreCodecProvider.java Mon Aug 1 21:50:05 2011 @@ -20,8 +20,10 @@ package org.apache.lucene.index.codecs; import org.apache.lucene.index.codecs.memory.MemoryCodec; import org.apache.lucene.index.codecs.preflex.PreFlexCodec; import org.apache.lucene.index.codecs.pulsing.PulsingCodec; +import org.apache.lucene.index.codecs.pulsingtree.PulsingTreeCodec; import org.apache.lucene.index.codecs.simpletext.SimpleTextCodec; import org.apache.lucene.index.codecs.standard.StandardCodec; +import org.apache.lucene.index.codecs.standardtree.StandardTreeCodec; /** * A CodecProvider that registers all core codecs that ship @@ -43,8 +45,11 @@ import org.apache.lucene.index.codecs.st public class CoreCodecProvider extends CodecProvider { public CoreCodecProvider() { register(new StandardCodec()); + register(new StandardTreeCodec(25, 48)); register(new PreFlexCodec()); register(new PulsingCodec()); + // nocommit: how come no args to this one? + register(new PulsingTreeCodec(1)); register(new SimpleTextCodec()); register(new MemoryCodec()); } Added: lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/pulsingtree/PulsingTreeCodec.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/pulsingtree/PulsingTreeCodec.java?rev=1152945&view=auto ============================================================================== --- lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/pulsingtree/PulsingTreeCodec.java (added) +++ lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/pulsingtree/PulsingTreeCodec.java Mon Aug 1 21:50:05 2011 @@ -0,0 +1,141 @@ +package org.apache.lucene.index.codecs.pulsingtree; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.Set; + +import org.apache.lucene.index.PerDocWriteState; +import org.apache.lucene.index.SegmentInfo; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.codecs.BlockTreePostingsReaderBase; +import org.apache.lucene.index.codecs.BlockTreePostingsWriterBase; +import org.apache.lucene.index.codecs.BlockTreeTermsReader; +import org.apache.lucene.index.codecs.BlockTreeTermsWriter; +import org.apache.lucene.index.codecs.Codec; +import org.apache.lucene.index.codecs.DefaultDocValuesConsumer; +import org.apache.lucene.index.codecs.DefaultDocValuesProducer; +import org.apache.lucene.index.codecs.FieldsConsumer; +import org.apache.lucene.index.codecs.FieldsProducer; +import org.apache.lucene.index.codecs.PerDocConsumer; +import org.apache.lucene.index.codecs.PerDocValues; +import org.apache.lucene.index.codecs.standardtree.StandardTreeCodec; +import org.apache.lucene.index.codecs.standardtree.StandardTreePostingsReader; +import org.apache.lucene.index.codecs.standardtree.StandardTreePostingsWriter; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; + +/** This codec "inlines" the postings for terms that have + * low docFreq. It wraps another codec, which is used for + * writing the non-inlined terms. + * + * Currently in only inlines docFreq=1 terms, and + * otherwise uses the normal "standard" codec. + * @lucene.experimental */ + +public class PulsingTreeCodec extends Codec { + + private final int freqCutoff; + + /** Terms with freq <= freqCutoff are inlined into terms + * dict. */ + public PulsingTreeCodec(int freqCutoff) { + super("PulsingTree"); + this.freqCutoff = freqCutoff; + } + + @Override + public String toString() { + return name + "(freqCutoff=" + freqCutoff + ")"; + } + + @Override + public FieldsConsumer fieldsConsumer(SegmentWriteState state) throws IOException { + // We wrap StandardTreePostingsWriter, but any BlockTreePostingsWriterBase + // will work: + + BlockTreePostingsWriterBase docsWriter = new StandardTreePostingsWriter(state); + + // Terms that have <= freqCutoff number of docs are + // "pulsed" (inlined): + BlockTreePostingsWriterBase pulsingWriter = new PulsingTreePostingsWriter(freqCutoff, docsWriter); + + // Terms dict + boolean success = false; + try { + // nocommit make this 24 configurable + FieldsConsumer ret = new BlockTreeTermsWriter(state, pulsingWriter, 32, 64); + success = true; + return ret; + } finally { + if (!success) { + pulsingWriter.close(); + } + } + } + + @Override + public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException { + + // We wrap StandardTreePostingsReader, but any BlockTreeStandardPostingsReader + // will work: + BlockTreePostingsReaderBase docsReader = new StandardTreePostingsReader(state.dir, state.segmentInfo, state.context, state.codecId); + BlockTreePostingsReaderBase pulsingReader = new PulsingTreePostingsReader(docsReader); + + boolean success = false; + try { + FieldsProducer ret = new BlockTreeTermsReader( + state.dir, state.fieldInfos, state.segmentInfo.name, + pulsingReader, + state.context, + StandardTreeCodec.TERMS_CACHE_SIZE, + state.codecId, + state.termsIndexDivisor); + success = true; + return ret; + } finally { + if (!success) { + pulsingReader.close(); + } + } + } + + @Override + public void files(Directory dir, SegmentInfo segmentInfo, int codecID, Set files) throws IOException { + StandardTreePostingsReader.files(dir, segmentInfo, codecID, files); + BlockTreeTermsReader.files(dir, segmentInfo, codecID, files); + DefaultDocValuesConsumer.files(dir, segmentInfo, codecID, files, getDocValuesUseCFS()); + } + + @Override + public void getExtensions(Set extensions) { + StandardTreeCodec.getStandardExtensions(extensions); + DefaultDocValuesConsumer.getDocValuesExtensions(extensions, getDocValuesUseCFS()); + } + + @Override + public PerDocConsumer docsConsumer(PerDocWriteState state) throws IOException { + return new DefaultDocValuesConsumer(state, getDocValuesSortComparator(), getDocValuesUseCFS()); + } + + @Override + public PerDocValues docsProducer(SegmentReadState state) throws IOException { + return new DefaultDocValuesProducer(state.segmentInfo, state.dir, state.fieldInfos, state.codecId, getDocValuesUseCFS(), getDocValuesSortComparator(), state.context); + } +} Added: lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/pulsingtree/PulsingTreePostingsReader.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/pulsingtree/PulsingTreePostingsReader.java?rev=1152945&view=auto ============================================================================== --- lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/pulsingtree/PulsingTreePostingsReader.java (added) +++ lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/pulsingtree/PulsingTreePostingsReader.java Mon Aug 1 21:50:05 2011 @@ -0,0 +1,498 @@ +package org.apache.lucene.index.codecs.pulsingtree; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.lucene.index.DocsAndPositionsEnum; +import org.apache.lucene.index.DocsEnum; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfo.IndexOptions; +import org.apache.lucene.index.TermState; +import org.apache.lucene.index.codecs.BlockTreePostingsReaderBase; +import org.apache.lucene.index.codecs.BlockTreeTermState; +import org.apache.lucene.store.ByteArrayDataInput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.CodecUtil; + +/** Concrete class that reads the current doc/freq/skip + * postings format + * @lucene.experimental */ + +// TODO: -- should we switch "hasProx" higher up? and +// create two separate docs readers, one that also reads +// prox and one that doesn't? + +public class PulsingTreePostingsReader extends BlockTreePostingsReaderBase { + + // Fallback reader for non-pulsed terms: + final BlockTreePostingsReaderBase wrappedPostingsReader; + int maxPositions; + + public PulsingTreePostingsReader(BlockTreePostingsReaderBase wrappedPostingsReader) throws IOException { + this.wrappedPostingsReader = wrappedPostingsReader; + } + + @Override + public void init(IndexInput termsIn) throws IOException { + CodecUtil.checkHeader(termsIn, PulsingTreePostingsWriter.CODEC, + PulsingTreePostingsWriter.VERSION_START, PulsingTreePostingsWriter.VERSION_START); + maxPositions = termsIn.readVInt(); + wrappedPostingsReader.init(termsIn); + } + + private static class PulsingTermState extends BlockTreeTermState { + private byte[] postings; + private int postingsSize; // -1 if this term was not inlined + private BlockTreeTermState wrappedTermState; + + ByteArrayDataInput inlinedBytesReader; + private byte[] inlinedBytes; + + @Override + public Object clone() { + PulsingTermState clone; + clone = (PulsingTermState) super.clone(); + if (postingsSize != -1) { + clone.postings = new byte[postingsSize]; + System.arraycopy(postings, 0, clone.postings, 0, postingsSize); + } else { + assert wrappedTermState != null; + clone.wrappedTermState = (BlockTreeTermState) wrappedTermState.clone(); + } + return clone; + } + + @Override + public void copyFrom(TermState _other) { + super.copyFrom(_other); + PulsingTermState other = (PulsingTermState) _other; + postingsSize = other.postingsSize; + if (other.postingsSize != -1) { + if (postings == null || postings.length < other.postingsSize) { + postings = new byte[ArrayUtil.oversize(other.postingsSize, 1)]; + } + System.arraycopy(other.postings, 0, postings, 0, other.postingsSize); + } else { + wrappedTermState.copyFrom(other.wrappedTermState); + } + + // NOTE: we do not copy the + // inlinedBytes/inlinedBytesReader; these are only + // stored on the "primary" TermState. They are + // "transient" to cloned term states. + } + + @Override + public String toString() { + if (postingsSize == -1) { + return "PulsingTermState: not inlined: wrapped=" + wrappedTermState; + } else { + return "PulsingTermState: inlined size=" + postingsSize + " " + super.toString(); + } + } + } + + @Override + public void readTermsBlock(IndexInput termsIn, FieldInfo fieldInfo, BlockTreeTermState _termState) throws IOException { + //System.out.println("PR.readTermsBlock"); + final PulsingTermState termState = (PulsingTermState) _termState; + if (termState.inlinedBytes == null) { + termState.inlinedBytes = new byte[128]; + termState.inlinedBytesReader = new ByteArrayDataInput(); + } + int len = termsIn.readVInt(); + //System.out.println(" len=" + len + " fp=" + termsIn.getFilePointer()); + if (termState.inlinedBytes.length < len) { + termState.inlinedBytes = new byte[ArrayUtil.oversize(len, 1)]; + } + termsIn.readBytes(termState.inlinedBytes, 0, len); + termState.inlinedBytesReader.reset(termState.inlinedBytes); + termState.wrappedTermState.termBlockOrd = 0; + wrappedPostingsReader.readTermsBlock(termsIn, fieldInfo, termState.wrappedTermState); + } + + @Override + public void resetTermsBlock(FieldInfo fieldInfo, BlockTreeTermState _termState) throws IOException { + final PulsingTermState termState = (PulsingTermState) _termState; + if (termState.inlinedBytes != null) { + termState.inlinedBytesReader.rewind(); + } + termState.wrappedTermState.termBlockOrd = 0; + wrappedPostingsReader.resetTermsBlock(fieldInfo, termState.wrappedTermState); + } + + @Override + public BlockTreeTermState newTermState() throws IOException { + PulsingTermState state = new PulsingTermState(); + state.wrappedTermState = wrappedPostingsReader.newTermState(); + return state; + } + + @Override + public void nextTerm(FieldInfo fieldInfo, BlockTreeTermState _termState) throws IOException { + //System.out.println("PR nextTerm"); + PulsingTermState termState = (PulsingTermState) _termState; + + // total TF, but in the omitTFAP case its computed based + // on docFreq. + long count = fieldInfo.indexOptions == IndexOptions.DOCS_AND_FREQS_AND_POSITIONS ? termState.totalTermFreq : termState.docFreq; + //System.out.println(" count=" + count + " threshold=" + maxPositions); + + if (count <= maxPositions) { + + // Inlined into terms dict -- just read the byte[] blob in, + // but don't decode it now (we only decode when a DocsEnum + // or D&PEnum is pulled): + termState.postingsSize = termState.inlinedBytesReader.readVInt(); + if (termState.postings == null || termState.postings.length < termState.postingsSize) { + termState.postings = new byte[ArrayUtil.oversize(termState.postingsSize, 1)]; + } + // TODO: sort of silly to copy from one big byte[] + // (the blob holding all inlined terms' blobs for + // current term block) into another byte[] (just the + // blob for this term)... + termState.inlinedBytesReader.readBytes(termState.postings, 0, termState.postingsSize); + //System.out.println(" inlined bytes=" + termState.postingsSize); + } else { + //System.out.println(" not inlined"); + termState.postingsSize = -1; + // TODO: should we do full copyFrom? much heavier...? + termState.wrappedTermState.docFreq = termState.docFreq; + termState.wrappedTermState.totalTermFreq = termState.totalTermFreq; + wrappedPostingsReader.nextTerm(fieldInfo, termState.wrappedTermState); + termState.wrappedTermState.termBlockOrd++; + } + } + + // TODO: we could actually reuse, by having TL that + // holds the last wrapped reuse, and vice-versa + @Override + public DocsEnum docs(FieldInfo field, BlockTreeTermState _termState, Bits liveDocs, DocsEnum reuse) throws IOException { + PulsingTermState termState = (PulsingTermState) _termState; + if (termState.postingsSize != -1) { + PulsingDocsEnum postings; + if (reuse instanceof PulsingDocsEnum) { + postings = (PulsingDocsEnum) reuse; + if (!postings.canReuse(field)) { + postings = new PulsingDocsEnum(field); + } + } else { + postings = new PulsingDocsEnum(field); + } + return postings.reset(liveDocs, termState); + } else { + // TODO: not great that we lose reuse of PulsingDocsEnum in this case: + if (reuse instanceof PulsingDocsEnum) { + return wrappedPostingsReader.docs(field, termState.wrappedTermState, liveDocs, null); + } else { + return wrappedPostingsReader.docs(field, termState.wrappedTermState, liveDocs, reuse); + } + } + } + + // TODO: -- not great that we can't always reuse + @Override + public DocsAndPositionsEnum docsAndPositions(FieldInfo field, BlockTreeTermState _termState, Bits liveDocs, DocsAndPositionsEnum reuse) throws IOException { + if (field.indexOptions != IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) { + return null; + } + //System.out.println("D&P: field=" + field.name); + + final PulsingTermState termState = (PulsingTermState) _termState; + + if (termState.postingsSize != -1) { + PulsingDocsAndPositionsEnum postings; + if (reuse instanceof PulsingDocsAndPositionsEnum) { + postings = (PulsingDocsAndPositionsEnum) reuse; + if (!postings.canReuse(field)) { + postings = new PulsingDocsAndPositionsEnum(field); + } + } else { + postings = new PulsingDocsAndPositionsEnum(field); + } + + return postings.reset(liveDocs, termState); + } else { + if (reuse instanceof PulsingDocsAndPositionsEnum) { + return wrappedPostingsReader.docsAndPositions(field, termState.wrappedTermState, liveDocs, null); + } else { + return wrappedPostingsReader.docsAndPositions(field, termState.wrappedTermState, liveDocs, reuse); + } + } + } + + private static class PulsingDocsEnum extends DocsEnum { + private final ByteArrayDataInput postings = new ByteArrayDataInput(); + private final IndexOptions indexOptions; + private final boolean storePayloads; + private Bits liveDocs; + private int docID; + private int freq; + + public PulsingDocsEnum(FieldInfo fieldInfo) { + indexOptions = fieldInfo.indexOptions; + storePayloads = fieldInfo.storePayloads; + } + + public PulsingDocsEnum reset(Bits liveDocs, PulsingTermState termState) { + //System.out.println("PR docsEnum termState=" + termState + " docFreq=" + termState.docFreq); + assert termState.postingsSize != -1; + // nocommit -- reuse the last byte[] if we can? or + // can we directly ref termState's bytes...? dangerous? + final byte[] bytes = new byte[termState.postingsSize]; + System.arraycopy(termState.postings, 0, bytes, 0, termState.postingsSize); + postings.reset(bytes); + docID = 0; + freq = 1; + this.liveDocs = liveDocs; + return this; + } + + boolean canReuse(FieldInfo fieldInfo) { + return indexOptions == fieldInfo.indexOptions && storePayloads == fieldInfo.storePayloads; + } + + @Override + public int nextDoc() throws IOException { + //System.out.println("PR nextDoc this= "+ this); + while(true) { + if (postings.eof()) { + //System.out.println("PR END"); + return docID = NO_MORE_DOCS; + } + + final int code = postings.readVInt(); + //System.out.println(" read code=" + code); + if (indexOptions == IndexOptions.DOCS_ONLY) { + docID += code; + } else { + docID += code >>> 1; // shift off low bit + if ((code & 1) != 0) { // if low bit is set + freq = 1; // freq is one + } else { + freq = postings.readVInt(); // else read freq + } + + // Skip positions + if (storePayloads) { + int payloadLength = -1; + for(int pos=0;pos= target) + return doc; + } + return docID = NO_MORE_DOCS; + } + } + + private static class PulsingDocsAndPositionsEnum extends DocsAndPositionsEnum { + private final ByteArrayDataInput postings = new ByteArrayDataInput(); + private final boolean storePayloads; + + private Bits liveDocs; + private int docID; + private int freq; + private int posPending; + private int position; + private int payloadLength; + private BytesRef payload; + + private boolean payloadRetrieved; + + public PulsingDocsAndPositionsEnum(FieldInfo fieldInfo) { + storePayloads = fieldInfo.storePayloads; + } + + boolean canReuse(FieldInfo fieldInfo) { + return storePayloads == fieldInfo.storePayloads; + } + + public PulsingDocsAndPositionsEnum reset(Bits liveDocs, PulsingTermState termState) { + assert termState.postingsSize != -1; + final byte[] bytes = new byte[termState.postingsSize]; + System.arraycopy(termState.postings, 0, bytes, 0, termState.postingsSize); + postings.reset(bytes); + this.liveDocs = liveDocs; + payloadLength = 0; + docID = 0; + //System.out.println("PR d&p reset storesPayloads=" + storePayloads + " bytes=" + bytes.length + " this=" + this); + return this; + } + + @Override + public int nextDoc() throws IOException { + //System.out.println("PR d&p nextDoc this=" + this); + + while(true) { + //System.out.println(" cycle skip posPending=" + posPending); + + skipPositions(); + + if (postings.eof()) { + //System.out.println("PR END"); + return docID = NO_MORE_DOCS; + } + + final int code = postings.readVInt(); + docID += code >>> 1; // shift off low bit + if ((code & 1) != 0) { // if low bit is set + freq = 1; // freq is one + } else { + freq = postings.readVInt(); // else read freq + } + posPending = freq; + + if (liveDocs == null || liveDocs.get(docID)) { + //System.out.println(" return docID=" + docID + " freq=" + freq); + position = 0; + return docID; + } + } + } + + @Override + public int freq() { + return freq; + } + + @Override + public int docID() { + return docID; + } + + @Override + public int advance(int target) throws IOException { + int doc; + while((doc=nextDoc()) != NO_MORE_DOCS) { + if (doc >= target) { + return doc; + } + } + return docID = NO_MORE_DOCS; + } + + @Override + public int nextPosition() throws IOException { + //System.out.println("PR d&p nextPosition posPending=" + posPending + " vs freq=" + freq); + + assert posPending > 0; + posPending--; + + if (storePayloads) { + if (!payloadRetrieved) { + //System.out.println("PR skip payload=" + payloadLength); + postings.skipBytes(payloadLength); + } + final int code = postings.readVInt(); + //System.out.println("PR code=" + code); + if ((code & 1) != 0) { + payloadLength = postings.readVInt(); + //System.out.println("PR new payload len=" + payloadLength); + } + position += code >> 1; + payloadRetrieved = false; + } else { + position += postings.readVInt(); + } + + //System.out.println("PR d&p nextPos return pos=" + position + " this=" + this); + return position; + } + + private void skipPositions() throws IOException { + while(posPending != 0) { + nextPosition(); + } + if (storePayloads && !payloadRetrieved) { + //System.out.println(" skip payload len=" + payloadLength); + postings.skipBytes(payloadLength); + payloadRetrieved = true; + } + } + + @Override + public boolean hasPayload() { + return storePayloads && !payloadRetrieved && payloadLength > 0; + } + + @Override + public BytesRef getPayload() throws IOException { + //System.out.println("PR getPayload payloadLength=" + payloadLength + " this=" + this); + if (payloadRetrieved) { + throw new IOException("Either no payload exists at this term position or an attempt was made to load it more than once."); + } + payloadRetrieved = true; + if (payloadLength > 0) { + if (payload == null) { + payload = new BytesRef(payloadLength); + } else { + payload.grow(payloadLength); + } + postings.readBytes(payload.bytes, 0, payloadLength); + payload.length = payloadLength; + return payload; + } else { + return null; + } + } + } + + @Override + public void close() throws IOException { + wrappedPostingsReader.close(); + } +} Added: lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/pulsingtree/PulsingTreePostingsWriter.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/pulsingtree/PulsingTreePostingsWriter.java?rev=1152945&view=auto ============================================================================== --- lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/pulsingtree/PulsingTreePostingsWriter.java (added) +++ lucene/dev/branches/blocktree_3030/lucene/src/java/org/apache/lucene/index/codecs/pulsingtree/PulsingTreePostingsWriter.java Mon Aug 1 21:50:05 2011 @@ -0,0 +1,380 @@ +package org.apache.lucene.index.codecs.pulsingtree; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; + +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfo.IndexOptions; +import org.apache.lucene.index.codecs.BlockTreePostingsWriterBase; +import org.apache.lucene.index.codecs.BlockTreeTermsWriter; +import org.apache.lucene.index.codecs.TermStats; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.RAMOutputStream; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.CodecUtil; + +// TODO: we now inline based on total TF of the term, +// but it might be better to inline by "net bytes used" +// so that a term that has only 1 posting but a huge +// payload would not be inlined. Though this is +// presumably rare in practice... + +/** @lucene.experimental */ +public final class PulsingTreePostingsWriter extends BlockTreePostingsWriterBase { + + final static String CODEC = "PulsedPostingsTree"; + + // To add a new version, increment from the last one, and + // change VERSION_CURRENT to point to your new version: + final static int VERSION_START = 0; + + final static int VERSION_CURRENT = VERSION_START; + + private IndexOutput termsOut; + + private IndexOptions indexOptions; + private boolean storePayloads; + + private static class PendingTerm { + private final byte[] bytes; + public PendingTerm(byte[] bytes) { + this.bytes = bytes; + } + } + + private final List pendingTerms = new ArrayList(); + + // one entry per position + private final Position[] pending; + private int pendingCount = 0; // -1 once we've hit too many positions + private Position currentDoc; // first Position entry of current doc + + private static final class Position { + BytesRef payload; + int termFreq; // only incremented on first position for a given doc + int pos; + int docID; + } + + // TODO: -- lazy init this? ie, if every single term + // was inlined (eg for a "primary key" field) then we + // never need to use this fallback? Fallback writer for + // non-inlined terms: + final BlockTreePostingsWriterBase wrappedPostingsWriter; + + /** If the total number of positions (summed across all docs + * for this term) is <= maxPositions, then the postings are + * inlined into terms dict */ + public PulsingTreePostingsWriter(int maxPositions, BlockTreePostingsWriterBase wrappedPostingsWriter) throws IOException { + super(); + + pending = new Position[maxPositions]; + for(int i=0;i= the cutoff: + this.wrappedPostingsWriter = wrappedPostingsWriter; + } + + @Override + public void start(IndexOutput termsOut) throws IOException { + this.termsOut = termsOut; + CodecUtil.writeHeader(termsOut, CODEC, VERSION_CURRENT); + termsOut.writeVInt(pending.length); // encode maxPositions in header + wrappedPostingsWriter.start(termsOut); + } + + @Override + public void startTerm() { + //System.out.println("PW startTerm"); + assert pendingCount == 0; + } + + // TODO: -- should we NOT reuse across fields? would + // be cleaner + + // Currently, this instance is re-used across fields, so + // our parent calls setField whenever the field changes + @Override + public void setField(FieldInfo fieldInfo) { + this.indexOptions = fieldInfo.indexOptions; + //System.out.println("PW field=" + fieldInfo.name + " omitTF=" + omitTF); + storePayloads = fieldInfo.storePayloads; + wrappedPostingsWriter.setField(fieldInfo); + if (BlockTreeTermsWriter.DEBUG && fieldInfo.name.equals("id")) { + DEBUG = true; + } else { + DEBUG = false; + } + } + + private boolean DEBUG; + + @Override + public void startDoc(int docID, int termDocFreq) throws IOException { + assert docID >= 0: "got docID=" + docID; + + /* + if (termID != -1) { + if (docID == 0) { + baseDocID = termID; + } else if (baseDocID + docID != termID) { + throw new RuntimeException("WRITE: baseDocID=" + baseDocID + " docID=" + docID + " termID=" + termID); + } + } + */ + + //System.out.println("PW doc=" + docID); + if (DEBUG) { + System.out.println("PW docID=" + docID); + } + + if (pendingCount == pending.length) { + push(); + //System.out.println("PW: wrapped.finishDoc"); + wrappedPostingsWriter.finishDoc(); + } + + if (pendingCount != -1) { + assert pendingCount < pending.length; + currentDoc = pending[pendingCount]; + currentDoc.docID = docID; + if (indexOptions == IndexOptions.DOCS_ONLY) { + pendingCount++; + } else { + currentDoc.termFreq = termDocFreq; + } + } else { + // We've already seen too many docs for this term -- + // just forward to our fallback writer + wrappedPostingsWriter.startDoc(docID, termDocFreq); + } + } + + @Override + public void addPosition(int position, BytesRef payload) throws IOException { + + //System.out.println("PW pos=" + position + " payload=" + (payload == null ? "null" : payload.length + " bytes")); + if (pendingCount == pending.length) { + push(); + } + + if (pendingCount == -1) { + // We've already seen too many docs for this term -- + // just forward to our fallback writer + wrappedPostingsWriter.addPosition(position, payload); + } else { + // buffer up + final Position pos = pending[pendingCount++]; + pos.pos = position; + pos.docID = currentDoc.docID; + if (payload != null && payload.length > 0) { + if (pos.payload == null) { + pos.payload = new BytesRef(payload); + } else { + pos.payload.copy(payload); + } + } else if (pos.payload != null) { + pos.payload.length = 0; + } + } + } + + @Override + public void finishDoc() throws IOException { + //System.out.println("PW finishDoc"); + if (pendingCount == -1) { + wrappedPostingsWriter.finishDoc(); + } + } + + private final RAMOutputStream buffer = new RAMOutputStream(); + + private int baseDocID; + + /** Called when we are done adding docs to this term */ + @Override + public void finishTerm(TermStats stats) throws IOException { + //System.out.println("PW finishTerm docCount=" + stats.docFreq); + + assert pendingCount > 0 || pendingCount == -1; + + if (pendingCount == -1) { + wrappedPostingsWriter.finishTerm(stats); + // Must add null entry to record terms that our + // wrapped postings impl added + pendingTerms.add(null); + } else { + + // There were few enough total occurrences for this + // term, so we fully inline our postings data into + // terms dict, now: + + // TODO: it'd be better to share this encoding logic + // in some inner codec that knows how to write a + // single doc / single position, etc. This way if a + // given codec wants to store other interesting + // stuff, it could use this pulsing codec to do so + + if (indexOptions != IndexOptions.DOCS_ONLY) { + int lastDocID = 0; + int pendingIDX = 0; + while(pendingIDX < pendingCount) { + final Position doc = pending[pendingIDX]; + + final int delta = doc.docID - lastDocID; + lastDocID = doc.docID; + + //System.out.println(" write doc=" + doc.docID + " freq=" + doc.termFreq); + + if (doc.termFreq == 1) { + buffer.writeVInt((delta<<1)|1); + } else { + buffer.writeVInt(delta<<1); + buffer.writeVInt(doc.termFreq); + } + + int lastPos = 0; + int lastPayloadLength = -1; + for(int posIDX=0;posIDX files) throws IOException { + StandardTreePostingsReader.files(dir, segmentInfo, codecID, files); + BlockTreeTermsReader.files(dir, segmentInfo, codecID, files); + DefaultDocValuesConsumer.files(dir, segmentInfo, codecID, files, getDocValuesUseCFS()); + } + + @Override + public void getExtensions(Set extensions) { + getStandardExtensions(extensions); + DefaultDocValuesConsumer.getDocValuesExtensions(extensions, getDocValuesUseCFS()); + } + + public static void getStandardExtensions(Set extensions) { + extensions.add(FREQ_EXTENSION); + extensions.add(PROX_EXTENSION); + BlockTreeTermsReader.getExtensions(extensions); + } + + @Override + public PerDocConsumer docsConsumer(PerDocWriteState state) throws IOException { + return new DefaultDocValuesConsumer(state, getDocValuesSortComparator(), getDocValuesUseCFS()); + } + + @Override + public PerDocValues docsProducer(SegmentReadState state) throws IOException { + return new DefaultDocValuesProducer(state.segmentInfo, state.dir, state.fieldInfos, state.codecId, getDocValuesUseCFS(), getDocValuesSortComparator(), state.context); + } +}