Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BADDD200BCE for ; Fri, 18 Nov 2016 00:27:41 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B9637160B0B; Thu, 17 Nov 2016 23:27:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9A692160B18 for ; Fri, 18 Nov 2016 00:27:39 +0100 (CET) Received: (qmail 67407 invoked by uid 500); 17 Nov 2016 23:27:38 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 67165 invoked by uid 99); 17 Nov 2016 23:27:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Nov 2016 23:27:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6BD61F1715; Thu, 17 Nov 2016 23:27:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: xedin@apache.org To: commits@cassandra.apache.org Date: Thu, 17 Nov 2016 23:27:40 -0000 Message-Id: <553c256d640b475182f4de02736d9d26@git.apache.org> In-Reply-To: <400adb2ff8454fdba2749dfae39a70a1@git.apache.org> References: <400adb2ff8454fdba2749dfae39a70a1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] cassandra git commit: Revert "Add row offset support to SASI" archived-at: Thu, 17 Nov 2016 23:27:41 -0000 Revert "Add row offset support to SASI" This reverts commit 7d857b46fb070548bf5e5f6ff81db588f08ec22a. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/490c1c27 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/490c1c27 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/490c1c27 Branch: refs/heads/trunk Commit: 490c1c27c9b700f14212d9591a516ddb8d0865c7 Parents: a1eef56 Author: Pavel Yaskevich Authored: Thu Nov 17 15:20:04 2016 -0800 Committer: Pavel Yaskevich Committed: Thu Nov 17 15:20:04 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 - .../org/apache/cassandra/db/ColumnIndex.java | 6 +- .../apache/cassandra/index/sasi/KeyFetcher.java | 98 ------- .../apache/cassandra/index/sasi/SASIIndex.java | 11 +- .../cassandra/index/sasi/SASIIndexBuilder.java | 13 +- .../cassandra/index/sasi/SSTableIndex.java | 41 ++- .../cassandra/index/sasi/conf/ColumnIndex.java | 4 +- .../index/sasi/conf/view/RangeTermTree.java | 4 - .../sasi/disk/AbstractTokenTreeBuilder.java | 276 ++++++++---------- .../cassandra/index/sasi/disk/Descriptor.java | 33 +-- .../sasi/disk/DynamicTokenTreeBuilder.java | 59 ++-- .../cassandra/index/sasi/disk/KeyOffsets.java | 115 -------- .../cassandra/index/sasi/disk/OnDiskIndex.java | 12 +- .../index/sasi/disk/OnDiskIndexBuilder.java | 16 +- .../index/sasi/disk/PerSSTableIndexWriter.java | 13 +- .../cassandra/index/sasi/disk/RowKey.java | 108 ------- .../index/sasi/disk/StaticTokenTreeBuilder.java | 18 +- .../apache/cassandra/index/sasi/disk/Token.java | 9 +- .../cassandra/index/sasi/disk/TokenTree.java | 288 +++++++------------ .../index/sasi/disk/TokenTreeBuilder.java | 72 ++--- .../index/sasi/memory/IndexMemtable.java | 8 +- .../index/sasi/memory/KeyRangeIterator.java | 49 ++-- .../cassandra/index/sasi/memory/MemIndex.java | 4 +- .../index/sasi/memory/SkipListMemIndex.java | 12 +- .../index/sasi/memory/TrieMemIndex.java | 45 +-- .../index/sasi/plan/QueryController.java | 49 ++-- .../cassandra/index/sasi/plan/QueryPlan.java | 174 +++-------- .../io/sstable/format/SSTableFlushObserver.java | 5 - .../io/sstable/format/SSTableReader.java | 33 +-- .../io/sstable/format/big/BigTableWriter.java | 8 +- .../org/apache/cassandra/utils/obs/BitUtil.java | 2 +- test/data/legacy-sasi/on-disk-sa-int2.db | Bin 12312 -> 0 bytes .../cassandra/index/sasi/SASIIndexTest.java | 25 +- .../index/sasi/disk/KeyOffsetsTest.java | 48 ---- .../index/sasi/disk/OnDiskIndexTest.java | 216 +++++++------- .../sasi/disk/PerSSTableIndexWriterTest.java | 112 ++------ .../index/sasi/disk/TokenTreeTest.java | 208 +++++++------- .../index/sasi/plan/OperationTest.java | 2 +- .../index/sasi/utils/KeyConverter.java | 69 ----- .../index/sasi/utils/LongIterator.java | 8 +- .../sasi/utils/RangeUnionIteratorTest.java | 17 -- 41 files changed, 745 insertions(+), 1546 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index db06341..6ca26f9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -45,7 +45,6 @@ * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031) * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585) * Add JMH benchmarks.jar (CASSANDRA-12586) - * Add row offset support to SASI (CASSANDRA-11990) * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567) * Add keep-alive to streaming (CASSANDRA-11841) * Tracing payload is passed through newSession(..) (CASSANDRA-11706) http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/db/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java index 8ea1272..de1b1df 100644 --- a/src/java/org/apache/cassandra/db/ColumnIndex.java +++ b/src/java/org/apache/cassandra/db/ColumnIndex.java @@ -121,10 +121,9 @@ public class ColumnIndex { Row staticRow = iterator.staticRow(); - long startPosition = currentPosition(); UnfilteredSerializer.serializer.serializeStaticRow(staticRow, header, writer, version); if (!observers.isEmpty()) - observers.forEach((o) -> o.nextUnfilteredCluster(staticRow, startPosition)); + observers.forEach((o) -> o.nextUnfilteredCluster(staticRow)); } } @@ -235,7 +234,6 @@ public class ColumnIndex private void add(Unfiltered unfiltered) throws IOException { - final long origPos = writer.position(); long pos = currentPosition(); if (firstClustering == null) @@ -249,7 +247,7 @@ public class ColumnIndex // notify observers about each new row if (!observers.isEmpty()) - observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered, origPos)); + observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered)); lastClustering = unfiltered.clustering(); previousRowStart = pos; http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java b/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java deleted file mode 100644 index 80ee167..0000000 --- a/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.index.sasi; - -import java.io.IOException; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.index.sasi.disk.*; -import org.apache.cassandra.io.*; -import org.apache.cassandra.io.sstable.format.*; - - -public interface KeyFetcher -{ - public Clustering getClustering(long offset); - public DecoratedKey getPartitionKey(long offset); - - public RowKey getRowKey(long partitionOffset, long rowOffset); - - /** - * Fetches clustering and partition key from the sstable. - * - * Currently, clustering key is fetched from the data file of the sstable and partition key is - * read from the index file. Reading from index file helps us to warm up key cache in this case. - */ - public static class SSTableKeyFetcher implements KeyFetcher - { - private final SSTableReader sstable; - - public SSTableKeyFetcher(SSTableReader reader) - { - sstable = reader; - } - - @Override - public Clustering getClustering(long offset) - { - try - { - return sstable.clusteringAt(offset); - } - catch (IOException e) - { - throw new FSReadError(new IOException("Failed to read clustering from " + sstable.descriptor, e), sstable.getFilename()); - } - } - - @Override - public DecoratedKey getPartitionKey(long offset) - { - try - { - return sstable.keyAt(offset); - } - catch (IOException e) - { - throw new FSReadError(new IOException("Failed to read key from " + sstable.descriptor, e), sstable.getFilename()); - } - } - - @Override - public RowKey getRowKey(long partitionOffset, long rowOffset) - { - if (rowOffset == KeyOffsets.NO_OFFSET) - return new RowKey(getPartitionKey(partitionOffset), null, sstable.metadata.comparator); - else - return new RowKey(getPartitionKey(partitionOffset), getClustering(rowOffset), sstable.metadata.comparator); - } - - public int hashCode() - { - return sstable.descriptor.hashCode(); - } - - public boolean equals(Object other) - { - return other instanceof SSTableKeyFetcher - && sstable.descriptor.equals(((SSTableKeyFetcher) other).sstable.descriptor); - } - } - -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/SASIIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java index 65953a9..4375964 100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java @@ -46,7 +46,6 @@ import org.apache.cassandra.index.sasi.conf.ColumnIndex; import org.apache.cassandra.index.sasi.conf.IndexMode; import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder.Mode; import org.apache.cassandra.index.sasi.disk.PerSSTableIndexWriter; -import org.apache.cassandra.index.sasi.disk.RowKey; import org.apache.cassandra.index.sasi.plan.QueryPlan; import org.apache.cassandra.index.transactions.IndexTransaction; import org.apache.cassandra.io.sstable.Descriptor; @@ -183,14 +182,6 @@ public class SASIIndex implements Index, INotificationConsumer return getTruncateTask(FBUtilities.timestampMicros()); } - public Callable getTruncateTask(Collection sstablesToRebuild) - { - return () -> { - index.dropData(sstablesToRebuild); - return null; - }; - } - public Callable getTruncateTask(long truncatedAt) { return () -> { @@ -261,7 +252,7 @@ public class SASIIndex implements Index, INotificationConsumer public void insertRow(Row row) { if (isNewData()) - adjustMemtableSize(index.index(new RowKey(key, row.clustering(), baseCfs.getComparator()), row), opGroup); + adjustMemtableSize(index.index(key, row), opGroup); } public void updateRow(Row oldRow, Row newRow) http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java index d6706ea..d50875a 100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java @@ -94,23 +94,16 @@ class SASIIndexBuilder extends SecondaryIndexBuilder { RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ); dataFile.seek(indexEntry.position); - int staticOffset = ByteBufferUtil.readWithShortLength(dataFile).remaining(); // key + ByteBufferUtil.readWithShortLength(dataFile); // key try (SSTableIdentityIterator partition = SSTableIdentityIterator.create(sstable, dataFile, key)) { // if the row has statics attached, it has to be indexed separately if (cfs.metadata.hasStaticColumns()) - { - long staticPosition = indexEntry.position + staticOffset; - indexWriter.nextUnfilteredCluster(partition.staticRow(), staticPosition); - } + indexWriter.nextUnfilteredCluster(partition.staticRow()); - long position = dataFile.getPosition(); while (partition.hasNext()) - { - indexWriter.nextUnfilteredCluster(partition.next(), position); - position = dataFile.getPosition(); - } + indexWriter.nextUnfilteredCluster(partition.next()); } } catch (IOException ex) http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java b/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java index f9c8abf..c67c39c 100644 --- a/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java @@ -18,22 +18,28 @@ package org.apache.cassandra.index.sasi; import java.io.File; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.index.sasi.conf.ColumnIndex; -import org.apache.cassandra.index.sasi.disk.*; +import org.apache.cassandra.index.sasi.disk.OnDiskIndex; +import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder; import org.apache.cassandra.index.sasi.disk.Token; import org.apache.cassandra.index.sasi.plan.Expression; import org.apache.cassandra.index.sasi.utils.RangeIterator; +import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.concurrent.Ref; import org.apache.commons.lang3.builder.HashCodeBuilder; +import com.google.common.base.Function; + public class SSTableIndex { private final ColumnIndex columnIndex; @@ -59,7 +65,7 @@ public class SSTableIndex sstable.getFilename(), columnIndex.getIndexName()); - this.index = new OnDiskIndex(indexFile, validator, new KeyFetcher.SSTableKeyFetcher(sstable)); + this.index = new OnDiskIndex(indexFile, validator, new DecoratedKeyFetcher(sstable)); } public OnDiskIndexBuilder.Mode mode() @@ -157,5 +163,36 @@ public class SSTableIndex return String.format("SSTableIndex(column: %s, SSTable: %s)", columnIndex.getColumnName(), sstable.descriptor); } + private static class DecoratedKeyFetcher implements Function + { + private final SSTableReader sstable; + + DecoratedKeyFetcher(SSTableReader reader) + { + sstable = reader; + } + + public DecoratedKey apply(Long offset) + { + try + { + return sstable.keyAt(offset); + } + catch (IOException e) + { + throw new FSReadError(new IOException("Failed to read key from " + sstable.descriptor, e), sstable.getFilename()); + } + } + + public int hashCode() + { + return sstable.descriptor.hashCode(); + } + public boolean equals(Object other) + { + return other instanceof DecoratedKeyFetcher + && sstable.descriptor.equals(((DecoratedKeyFetcher) other).sstable.descriptor); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java index 459e5c3..0958113 100644 --- a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java @@ -30,6 +30,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Memtable; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.AsciiType; @@ -39,7 +40,6 @@ import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer; import org.apache.cassandra.index.sasi.conf.view.View; import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder; -import org.apache.cassandra.index.sasi.disk.RowKey; import org.apache.cassandra.index.sasi.disk.Token; import org.apache.cassandra.index.sasi.memory.IndexMemtable; import org.apache.cassandra.index.sasi.plan.Expression; @@ -99,7 +99,7 @@ public class ColumnIndex return keyValidator; } - public long index(RowKey key, Row row) + public long index(DecoratedKey key, Row row) { return getCurrentMemtable().index(key, getValueOf(column, row, FBUtilities.nowInSeconds())); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java index 2775c29..d6b4551 100644 --- a/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java +++ b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java @@ -19,7 +19,6 @@ package org.apache.cassandra.index.sasi.conf.view; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -47,9 +46,6 @@ public class RangeTermTree implements TermTree public Set search(Expression e) { - if (e == null) - return Collections.emptySet(); - ByteBuffer minTerm = e.lower == null ? min : e.lower.value; ByteBuffer maxTerm = e.upper == null ? max : e.upper.value; http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java index 9245960..18994de 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java @@ -20,18 +20,19 @@ package org.apache.cassandra.index.sasi.disk; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; -import java.util.function.*; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; -import com.carrotsearch.hppc.LongArrayList; -import com.carrotsearch.hppc.cursors.LongCursor; -import com.carrotsearch.hppc.cursors.LongObjectCursor; -import org.apache.cassandra.dht.*; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; +import com.carrotsearch.hppc.LongArrayList; +import com.carrotsearch.hppc.LongSet; +import com.carrotsearch.hppc.cursors.LongCursor; + public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder { protected int numBlocks; @@ -64,7 +65,7 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder public int serializedSize() { if (numBlocks == 1) - return BLOCK_HEADER_BYTES + ((int) tokenCount * LEAF_ENTRY_BYTES); + return (BLOCK_HEADER_BYTES + ((int) tokenCount * 16)); else return numBlocks * BLOCK_BYTES; } @@ -111,15 +112,6 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder buffer.clear(); } - /** - * Tree node, - * - * B+-tree consists of root, interior nodes and leaves. Root can be either a node or a leaf. - * - * Depending on the concrete implementation of {@code TokenTreeBuilder} - * leaf can be partial or static (in case of {@code StaticTokenTreeBuilder} or dynamic in case - * of {@code DynamicTokenTreeBuilder} - */ protected abstract class Node { protected InteriorNode parent; @@ -187,16 +179,8 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder alignBuffer(buf, BLOCK_HEADER_BYTES); } - /** - * Shared header part, written for all node types: - * [ info byte ] [ token count ] [ min node token ] [ max node token ] - * [ 1b ] [ 2b (short) ] [ 8b (long) ] [ 8b (long) ] - **/ private abstract class Header { - /** - * Serializes the shared part of the header - */ public void serialize(ByteBuffer buf) { buf.put(infoByte()) @@ -208,12 +192,6 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder protected abstract byte infoByte(); } - /** - * In addition to shared header part, root header stores version information, - * overall token count and min/max tokens for the whole tree: - * [ magic ] [ overall token count ] [ min tree token ] [ max tree token ] - * [ 2b (short) ] [ 8b (long) ] [ 8b (long) ] [ 8b (long) ] - */ private class RootHeader extends Header { public void serialize(ByteBuffer buf) @@ -229,21 +207,19 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder { // if leaf, set leaf indicator and last leaf indicator (bits 0 & 1) // if not leaf, clear both bits - return isLeaf() ? ENTRY_TYPE_MASK : 0; + return (byte) ((isLeaf()) ? 3 : 0); } protected void writeMagic(ByteBuffer buf) { switch (Descriptor.CURRENT_VERSION) { - case ab: + case Descriptor.VERSION_AB: buf.putShort(AB_MAGIC); break; - case ac: - buf.putShort(AC_MAGIC); - break; + default: - throw new RuntimeException("Unsupported version"); + break; } } @@ -273,12 +249,6 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder } - /** - * Leaf consists of - * - header (format described in {@code Header} ) - * - data (format described in {@code LeafEntry}) - * - overflow collision entries, that hold {@value OVERFLOW_TRAILER_CAPACITY} of {@code RowOffset}. - */ protected abstract class Leaf extends Node { protected LongArrayList overflowCollisions; @@ -309,98 +279,82 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder protected abstract void serializeData(ByteBuffer buf); - protected LeafEntry createEntry(final long tok, final KeyOffsets offsets) + protected LeafEntry createEntry(final long tok, final LongSet offsets) { - LongArrayList rawOffsets = new LongArrayList(offsets.size()); - - offsets.forEach(new Consumer>() - { - public void accept(LongObjectCursor cursor) - { - for (long l : cursor.value) - { - rawOffsets.add(cursor.key); - rawOffsets.add(l); - } - } - }); - - int offsetCount = rawOffsets.size(); + int offsetCount = offsets.size(); switch (offsetCount) { case 0: throw new AssertionError("no offsets for token " + tok); + case 1: + long offset = offsets.toArray()[0]; + if (offset > MAX_OFFSET) + throw new AssertionError("offset " + offset + " cannot be greater than " + MAX_OFFSET); + else if (offset <= Integer.MAX_VALUE) + return new SimpleLeafEntry(tok, offset); + else + return new FactoredOffsetLeafEntry(tok, offset); case 2: - return new SimpleLeafEntry(tok, rawOffsets.get(0), rawOffsets.get(1)); + long[] rawOffsets = offsets.toArray(); + if (rawOffsets[0] <= Integer.MAX_VALUE && rawOffsets[1] <= Integer.MAX_VALUE && + (rawOffsets[0] <= Short.MAX_VALUE || rawOffsets[1] <= Short.MAX_VALUE)) + return new PackedCollisionLeafEntry(tok, rawOffsets); + else + return createOverflowEntry(tok, offsetCount, offsets); default: - assert offsetCount % 2 == 0; - if (offsetCount == 4) - { - if (rawOffsets.get(0) < Integer.MAX_VALUE && rawOffsets.get(1) < Integer.MAX_VALUE && - rawOffsets.get(2) < Integer.MAX_VALUE && rawOffsets.get(3) < Integer.MAX_VALUE) - { - return new PackedCollisionLeafEntry(tok, (int)rawOffsets.get(0), (int) rawOffsets.get(1), - (int) rawOffsets.get(2), (int) rawOffsets.get(3)); - } - } - return createOverflowEntry(tok, offsetCount, rawOffsets); + return createOverflowEntry(tok, offsetCount, offsets); } } - private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongArrayList offsets) + private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongSet offsets) { if (overflowCollisions == null) - overflowCollisions = new LongArrayList(offsetCount); - - int overflowCount = (overflowCollisions.size() + offsetCount) / 2; - if (overflowCount >= OVERFLOW_TRAILER_CAPACITY) - throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf, but had: " + overflowCount); + overflowCollisions = new LongArrayList(); - LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) (overflowCollisions.size() / 2), (short) (offsetCount / 2)); - overflowCollisions.addAll(offsets); + LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) overflowCollisions.size(), (short) offsetCount); + for (LongCursor o : offsets) + { + if (overflowCollisions.size() == OVERFLOW_TRAILER_CAPACITY) + throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf"); + else + overflowCollisions.add(o.value); + } return entry; } - /** - * A leaf of the B+-Tree, that holds information about the row offset(s) for - * the current token. - * - * Main 3 types of leaf entries are: - * 1) simple leaf entry: holding just a single row offset - * 2) packed collision leaf entry: holding two entries that would fit together into 168 bytes - * 3) overflow entry: only holds offset in overflow trailer and amount of entries belonging to this leaf - */ protected abstract class LeafEntry { protected final long token; abstract public EntryType type(); + abstract public int offsetData(); + abstract public short offsetExtra(); public LeafEntry(final long tok) { token = tok; } - public abstract void serialize(ByteBuffer buf); + public void serialize(ByteBuffer buf) + { + buf.putShort((short) type().ordinal()) + .putShort(offsetExtra()) + .putLong(token) + .putInt(offsetData()); + } } - /** - * Simple leaf, that can store a single row offset, having the following format: - * - * [ type ] [ token ] [ partition offset ] [ row offset ] - * [ 2b (short) ] [ 8b (long) ] [ 8b (long) ] [ 8b (long) ] - */ + + // assumes there is a single offset and the offset is <= Integer.MAX_VALUE protected class SimpleLeafEntry extends LeafEntry { - private final long partitionOffset; - private final long rowOffset; + private final long offset; - public SimpleLeafEntry(final long tok, final long partitionOffset, final long rowOffset) + public SimpleLeafEntry(final long tok, final long off) { super(tok); - this.partitionOffset = partitionOffset; - this.rowOffset = rowOffset; + offset = off; } public EntryType type() @@ -408,38 +362,61 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder return EntryType.SIMPLE; } - @Override - public void serialize(ByteBuffer buf) + public int offsetData() { - buf.putShort((short) type().ordinal()) - .putLong(token) - .putLong(partitionOffset) - .putLong(rowOffset); + return (int) offset; + } + + public short offsetExtra() + { + return 0; } } - /** - * Packed collision entry, can store two offsets, if each one of their positions - * fit into 4 bytes. - * [ type ] [ token ] [ partition offset 1 ] [ row offset 1] [ partition offset 1 ] [ row offset 1] - * [ 2b (short) ] [ 8b (long) ] [ 4b (int) ] [ 4b (int) ] [ 4b (int) ] [ 4b (int) ] - */ - protected class PackedCollisionLeafEntry extends LeafEntry + // assumes there is a single offset and Integer.MAX_VALUE < offset <= MAX_OFFSET + // take the middle 32 bits of offset (or the top 32 when considering offset is max 48 bits) + // and store where offset is normally stored. take bottom 16 bits of offset and store in entry header + private class FactoredOffsetLeafEntry extends LeafEntry { - private final int partitionOffset1; - private final int rowOffset1; - private final int partitionOffset2; - private final int rowOffset2; + private final long offset; - public PackedCollisionLeafEntry(final long tok, final int partitionOffset1, final int rowOffset1, - final int partitionOffset2, final int rowOffset2) + public FactoredOffsetLeafEntry(final long tok, final long off) { super(tok); - this.partitionOffset1 = partitionOffset1; - this.rowOffset1 = rowOffset1; - this.partitionOffset2 = partitionOffset2; - this.rowOffset2 = rowOffset2; + offset = off; + } + public EntryType type() + { + return EntryType.FACTORED; + } + + public int offsetData() + { + return (int) (offset >>> Short.SIZE); + } + + public short offsetExtra() + { + // exta offset is supposed to be an unsigned 16-bit integer + return (short) offset; + } + } + + // holds an entry with two offsets that can be packed in an int & a short + // the int offset is stored where offset is normally stored. short offset is + // stored in entry header + private class PackedCollisionLeafEntry extends LeafEntry + { + private short smallerOffset; + private int largerOffset; + + public PackedCollisionLeafEntry(final long tok, final long[] offs) + { + super(tok); + + smallerOffset = (short) Math.min(offs[0], offs[1]); + largerOffset = (int) Math.max(offs[0], offs[1]); } public EntryType type() @@ -447,27 +424,21 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder return EntryType.PACKED; } - @Override - public void serialize(ByteBuffer buf) + public int offsetData() { - buf.putShort((short) type().ordinal()) - .putLong(token) - .putInt(partitionOffset1) - .putInt(rowOffset1) - .putInt(partitionOffset2) - .putInt(rowOffset2); - } - } - - /** - * Overflow collision entry, holds an entry with three or more offsets, or two offsets - * that cannot be packed into 16 bytes. - * [ type ] [ token ] [ start index ] [ count ] - * [ 2b (short) ] [ 8b (long) ] [ 8b (long) ] [ 8b (long) ] - * - * - [ start index ] is a position of first item belonging to this leaf entry in the overflow trailer - * - [ count ] is the amount of items belonging to this leaf entry that are stored in the overflow trailer - */ + return largerOffset; + } + + public short offsetExtra() + { + return smallerOffset; + } + } + + // holds an entry with three or more offsets, or two offsets that cannot + // be packed into an int & a short. the index into the overflow list + // is stored where the offset is normally stored. the number of overflowed offsets + // for the entry is stored in the entry header private class OverflowCollisionLeafEntry extends LeafEntry { private final short startIndex; @@ -485,23 +456,20 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder return EntryType.OVERFLOW; } - @Override - public void serialize(ByteBuffer buf) + public int offsetData() { - buf.putShort((short) type().ordinal()) - .putLong(token) - .putLong(startIndex) - .putLong(count); + return startIndex; } + + public short offsetExtra() + { + return count; + } + } + } - /** - * Interior node consists of: - * - (interior node) header - * - tokens (serialized as longs, with count stored in header) - * - child offsets - */ protected class InteriorNode extends Node { protected List tokens = new ArrayList<>(TOKENS_PER_BLOCK); http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java index 3fa0f06..3aa6f14 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java @@ -22,29 +22,30 @@ package org.apache.cassandra.index.sasi.disk; */ public class Descriptor { - public static enum Version + public static final String VERSION_AA = "aa"; + public static final String VERSION_AB = "ab"; + public static final String CURRENT_VERSION = VERSION_AB; + public static final Descriptor CURRENT = new Descriptor(CURRENT_VERSION); + + public static class Version { - aa, - ab, - ac - } + public final String version; - public static final Version VERSION_AA = Version.aa; - public static final Version VERSION_AB = Version.ab; - public static final Version VERSION_AC = Version.ac; + public Version(String version) + { + this.version = version; + } - public static final Version CURRENT_VERSION = Version.ac; - public static final Descriptor CURRENT = new Descriptor(CURRENT_VERSION); + public String toString() + { + return version; + } + } public final Version version; public Descriptor(String v) { - this.version = Version.valueOf(v); - } - - public Descriptor(Version v) - { - this.version = v; + this.version = new Version(v); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java index 6e3e163..2ddfd89 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java @@ -20,14 +20,17 @@ package org.apache.cassandra.index.sasi.disk; import java.nio.ByteBuffer; import java.util.*; -import com.carrotsearch.hppc.cursors.LongObjectCursor; -import org.apache.cassandra.dht.*; -import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.AbstractIterator; +import org.apache.cassandra.utils.Pair; + +import com.carrotsearch.hppc.LongOpenHashSet; +import com.carrotsearch.hppc.LongSet; +import com.carrotsearch.hppc.cursors.LongCursor; public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder { + private final SortedMap tokens = new TreeMap<>(); - private final SortedMap tokens = new TreeMap<>(); public DynamicTokenTreeBuilder() {} @@ -37,52 +40,54 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder add(data); } - public DynamicTokenTreeBuilder(SortedMap data) + public DynamicTokenTreeBuilder(SortedMap data) { add(data); } - public void add(Long token, long partitionOffset, long rowOffset) + public void add(Long token, long keyPosition) { - KeyOffsets found = tokens.get(token); + LongSet found = tokens.get(token); if (found == null) - tokens.put(token, (found = new KeyOffsets(2))); + tokens.put(token, (found = new LongOpenHashSet(2))); - found.put(partitionOffset, rowOffset); + found.add(keyPosition); } - public void add(Iterator> data) + public void add(Iterator> data) { while (data.hasNext()) { - Pair entry = data.next(); - for (LongObjectCursor cursor : entry.right) - for (long l : cursor.value) - add(entry.left, cursor.key, l); + Pair entry = data.next(); + for (LongCursor l : entry.right) + add(entry.left, l.value); } } - public void add(SortedMap data) + public void add(SortedMap data) { - for (Map.Entry newEntry : data.entrySet()) + for (Map.Entry newEntry : data.entrySet()) { - for (LongObjectCursor cursor : newEntry.getValue()) - for (long l : cursor.value) - add(newEntry.getKey(), cursor.key, l); + LongSet found = tokens.get(newEntry.getKey()); + if (found == null) + tokens.put(newEntry.getKey(), (found = new LongOpenHashSet(4))); + + for (LongCursor offset : newEntry.getValue()) + found.add(offset.value); } } - public Iterator> iterator() + public Iterator> iterator() { - final Iterator> iterator = tokens.entrySet().iterator(); - return new AbstractIterator>() + final Iterator> iterator = tokens.entrySet().iterator(); + return new AbstractIterator>() { - protected Pair computeNext() + protected Pair computeNext() { if (!iterator.hasNext()) return endOfData(); - Map.Entry entry = iterator.next(); + Map.Entry entry = iterator.next(); return Pair.create(entry.getKey(), entry.getValue()); } }; @@ -156,9 +161,9 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder private class DynamicLeaf extends Leaf { - private final SortedMap tokens; + private final SortedMap tokens; - DynamicLeaf(SortedMap data) + DynamicLeaf(SortedMap data) { super(data.firstKey(), data.lastKey()); tokens = data; @@ -176,7 +181,7 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder protected void serializeData(ByteBuffer buf) { - for (Map.Entry entry : tokens.entrySet()) + for (Map.Entry entry : tokens.entrySet()) createEntry(entry.getKey(), entry.getValue()).serialize(buf); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java b/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java deleted file mode 100644 index db849fe..0000000 --- a/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.index.sasi.disk; - -import java.util.*; - -import org.apache.commons.lang3.ArrayUtils; - -import com.carrotsearch.hppc.LongObjectOpenHashMap; -import com.carrotsearch.hppc.cursors.LongObjectCursor; - -public class KeyOffsets extends LongObjectOpenHashMap -{ - public static final long NO_OFFSET = Long.MIN_VALUE; - - public KeyOffsets() { - super(4); - } - - public KeyOffsets(int initialCapacity) { - super(initialCapacity); - } - - public void put(long currentPartitionOffset, long currentRowOffset) - { - if (containsKey(currentPartitionOffset)) - super.put(currentPartitionOffset, append(get(currentPartitionOffset), currentRowOffset)); - else - super.put(currentPartitionOffset, asArray(currentRowOffset)); - } - - public long[] put(long currentPartitionOffset, long[] currentRowOffset) - { - if (containsKey(currentPartitionOffset)) - return super.put(currentPartitionOffset, merge(get(currentPartitionOffset), currentRowOffset)); - else - return super.put(currentPartitionOffset, currentRowOffset); - } - - public boolean equals(Object obj) - { - if (!(obj instanceof KeyOffsets)) - return false; - - KeyOffsets other = (KeyOffsets) obj; - if (other.size() != this.size()) - return false; - - for (LongObjectCursor cursor : this) - if (!Arrays.equals(cursor.value, other.get(cursor.key))) - return false; - - return true; - } - - @Override - public String toString() - { - StringBuilder sb = new StringBuilder("KeyOffsets { "); - forEach((a, b) -> { - sb.append(a).append(": ").append(Arrays.toString(b)); - }); - sb.append(" }"); - return sb.toString(); - } - - // primitive array creation - public static long[] asArray(long... vals) - { - return vals; - } - - private static long[] merge(long[] arr1, long[] arr2) - { - long[] copy = new long[arr2.length]; - int written = 0; - for (long l : arr2) - { - if (!ArrayUtils.contains(arr1, l)) - copy[written++] = l; - } - - if (written == 0) - return arr1; - - long[] merged = new long[arr1.length + written]; - System.arraycopy(arr1, 0, merged, 0, arr1.length); - System.arraycopy(copy, 0, merged, arr1.length, written); - return merged; - } - - private static long[] append(long[] arr1, long v) - { - if (ArrayUtils.contains(arr1, v)) - return arr1; - else - return ArrayUtils.add(arr1, v); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java index 70d24a7..4d43cd9 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java @@ -22,7 +22,8 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.stream.Collectors; -import org.apache.cassandra.index.sasi.*; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.index.sasi.Term; import org.apache.cassandra.index.sasi.plan.Expression; import org.apache.cassandra.index.sasi.plan.Expression.Op; import org.apache.cassandra.index.sasi.utils.MappedBuffer; @@ -36,12 +37,12 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; import static org.apache.cassandra.index.sasi.disk.OnDiskBlock.SearchResult; -import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.TOKEN_BYTES; public class OnDiskIndex implements Iterable, Closeable { @@ -105,7 +106,7 @@ public class OnDiskIndex implements Iterable, Closeable protected final long indexSize; protected final boolean hasMarkedPartials; - protected final KeyFetcher keyFetcher; + protected final Function keyFetcher; protected final String indexPath; @@ -115,7 +116,7 @@ public class OnDiskIndex implements Iterable, Closeable protected final ByteBuffer minTerm, maxTerm, minKey, maxKey; @SuppressWarnings("resource") - public OnDiskIndex(File index, AbstractType cmp, KeyFetcher keyReader) + public OnDiskIndex(File index, AbstractType cmp, Function keyReader) { keyFetcher = keyReader; @@ -634,7 +635,6 @@ public class OnDiskIndex implements Iterable, Closeable { final long blockEnd = FBUtilities.align(content.position(), OnDiskIndexBuilder.BLOCK_SIZE); - // ([int] -1 for sparse, offset for non-sparse) if (isSparse()) return new PrefetchedTokensIterator(getSparseTokens()); @@ -658,7 +658,7 @@ public class OnDiskIndex implements Iterable, Closeable NavigableMap individualTokens = new TreeMap<>(); for (int i = 0; i < size; i++) { - Token token = perBlockIndex.get(content.getLong(ptrOffset + 1 + TOKEN_BYTES * i), keyFetcher); + Token token = perBlockIndex.get(content.getLong(ptrOffset + 1 + (8 * i)), keyFetcher); assert token != null; individualTokens.put(token.get(), token); http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java index b6e2da5..4946f06 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java @@ -23,7 +23,6 @@ import java.nio.ByteBuffer; import java.util.*; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.dht.*; import org.apache.cassandra.index.sasi.plan.Expression.Op; import org.apache.cassandra.index.sasi.sa.IndexedTerm; import org.apache.cassandra.index.sasi.sa.IntegralSA; @@ -38,6 +37,7 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import com.carrotsearch.hppc.LongArrayList; +import com.carrotsearch.hppc.LongSet; import com.carrotsearch.hppc.ShortArrayList; import com.google.common.annotations.VisibleForTesting; @@ -163,7 +163,7 @@ public class OnDiskIndexBuilder this.marksPartials = marksPartials; } - public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long partitionOffset, long rowOffset) + public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long keyPosition) { if (term.remaining() >= MAX_TERM_SIZE) { @@ -183,16 +183,16 @@ public class OnDiskIndexBuilder estimatedBytes += 64 + 48 + term.remaining(); } - tokens.add((Long) key.getToken().getTokenValue(), partitionOffset, rowOffset); + tokens.add((Long) key.getToken().getTokenValue(), keyPosition); // calculate key range (based on actual key values) for current index minKey = (minKey == null || keyComparator.compare(minKey, key.getKey()) > 0) ? key.getKey() : minKey; maxKey = (maxKey == null || keyComparator.compare(maxKey, key.getKey()) < 0) ? key.getKey() : maxKey; - // 84 ((boolean(1)*4) + (long(8)*4) + 24 + 24) bytes for the LongObjectOpenHashMap created - // when the keyPosition was added + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key). + // 60 ((boolean(1)*4) + (long(8)*4) + 24) bytes for the LongOpenHashSet created when the keyPosition was added + // + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key). // in the case of hash collision for the token we may overestimate but this is extremely rare - estimatedBytes += 84 + 40 + 8; + estimatedBytes += 60 + 40 + 8; return this; } @@ -569,7 +569,7 @@ public class OnDiskIndexBuilder } } - private class MutableDataBlock extends MutableBlock + private static class MutableDataBlock extends MutableBlock { private static final int MAX_KEYS_SPARSE = 5; @@ -651,7 +651,7 @@ public class OnDiskIndexBuilder { term.serialize(buffer); buffer.writeByte((byte) keys.getTokenCount()); - for (Pair key : keys) + for (Pair key : keys) buffer.writeLong(key.left); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java index c204883..9fa4e87 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java @@ -109,7 +109,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver currentKeyPosition = curPosition; } - public void nextUnfilteredCluster(Unfiltered unfiltered, long currentRowOffset) + public void nextUnfilteredCluster(Unfiltered unfiltered) { if (!unfiltered.isRow()) return; @@ -129,15 +129,10 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver if (index == null) indexes.put(column, (index = newIndex(columnIndex))); - index.add(value.duplicate(), currentKey, currentKeyPosition, currentRowOffset); + index.add(value.duplicate(), currentKey, currentKeyPosition); }); } - public void nextUnfilteredCluster(Unfiltered unfilteredCluster) - { - throw new UnsupportedOperationException("SASI Index does not support direct row access."); - } - public void complete() { if (isComplete) @@ -202,7 +197,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver this.currentBuilder = newIndexBuilder(); } - public void add(ByteBuffer term, DecoratedKey key, long partitoinOffset, long rowOffset) + public void add(ByteBuffer term, DecoratedKey key, long keyPosition) { if (term.remaining() == 0) return; @@ -240,7 +235,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver } } - currentBuilder.add(token, key, partitoinOffset, rowOffset); + currentBuilder.add(token, key, keyPosition); isAdded = true; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java b/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java deleted file mode 100644 index fc5a2c0..0000000 --- a/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyclustering ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.index.sasi.disk; - -import java.util.*; -import java.util.stream.*; - -import org.apache.commons.lang3.builder.HashCodeBuilder; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.*; -import org.apache.cassandra.utils.*; - -/** - * Primary key of the found row, a combination of the Partition Key - * and clustering that belongs to the row. - */ -public class RowKey implements Comparable -{ - - public final DecoratedKey decoratedKey; - public final Clustering clustering; - - private final ClusteringComparator comparator; - - public RowKey(DecoratedKey primaryKey, Clustering clustering, ClusteringComparator comparator) - { - this.decoratedKey = primaryKey; - this.clustering = clustering; - this.comparator = comparator; - } - - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - RowKey rowKey = (RowKey) o; - - if (decoratedKey != null ? !decoratedKey.equals(rowKey.decoratedKey) : rowKey.decoratedKey != null) - return false; - return clustering != null ? clustering.equals(rowKey.clustering) : rowKey.clustering == null; - } - - public int hashCode() - { - return new HashCodeBuilder().append(decoratedKey).append(clustering).toHashCode(); - } - - public int compareTo(RowKey other) - { - int cmp = this.decoratedKey.compareTo(other.decoratedKey); - if (cmp == 0 && clustering != null) - { - // Both clustering and rows should match - if (clustering.kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING || other.clustering.kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING) - return 0; - - return comparator.compare(this.clustering, other.clustering); - } - else - { - return cmp; - } - } - - public static RowKeyComparator COMPARATOR = new RowKeyComparator(); - - public String toString(CFMetaData metadata) - { - return String.format("RowKey: { pk : %s, clustering: %s}", - metadata.getKeyValidator().getString(decoratedKey.getKey()), - clustering.toString(metadata)); - } - - @Override - public String toString() - { - return String.format("RowKey: { pk : %s, clustering: %s}", - ByteBufferUtil.bytesToHex(decoratedKey.getKey()), - String.join(",", Arrays.stream(clustering.getRawValues()).map(ByteBufferUtil::bytesToHex).collect(Collectors.toList()))); - } - - private static class RowKeyComparator implements Comparator - { - public int compare(RowKey o1, RowKey o2) - { - return o1.compareTo(o2); - } - } - -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java index 8a11d60..6e64c56 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java @@ -19,7 +19,8 @@ package org.apache.cassandra.index.sasi.disk; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.Iterator; +import java.util.SortedMap; import org.apache.cassandra.index.sasi.utils.CombinedTerm; import org.apache.cassandra.index.sasi.utils.RangeIterator; @@ -27,6 +28,7 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.Pair; +import com.carrotsearch.hppc.LongSet; import com.google.common.collect.Iterators; /** @@ -61,17 +63,17 @@ public class StaticTokenTreeBuilder extends AbstractTokenTreeBuilder combinedTerm = term; } - public void add(Long token, long partitionOffset, long rowOffset) + public void add(Long token, long keyPosition) { throw new UnsupportedOperationException(); } - public void add(SortedMap data) + public void add(SortedMap data) { throw new UnsupportedOperationException(); } - public void add(Iterator> data) + public void add(Iterator> data) { throw new UnsupportedOperationException(); } @@ -81,12 +83,12 @@ public class StaticTokenTreeBuilder extends AbstractTokenTreeBuilder return tokenCount == 0; } - public Iterator> iterator() + public Iterator> iterator() { - @SuppressWarnings("resource") Iterator iterator = combinedTerm.getTokenIterator(); - return new AbstractIterator>() + Iterator iterator = combinedTerm.getTokenIterator(); + return new AbstractIterator>() { - protected Pair computeNext() + protected Pair computeNext() { if (!iterator.hasNext()) return endOfData(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/Token.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Token.java b/src/java/org/apache/cassandra/index/sasi/disk/Token.java index 8ea864f..4cd1ea3 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/Token.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/Token.java @@ -19,9 +19,12 @@ package org.apache.cassandra.index.sasi.disk; import com.google.common.primitives.Longs; -import org.apache.cassandra.index.sasi.utils.*; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.index.sasi.utils.CombinedValue; -public abstract class Token implements CombinedValue, Iterable +import com.carrotsearch.hppc.LongSet; + +public abstract class Token implements CombinedValue, Iterable { protected final long token; @@ -35,7 +38,7 @@ public abstract class Token implements CombinedValue, Iterable return token; } - public abstract KeyOffsets getOffsets(); + public abstract LongSet getOffsets(); public int compareTo(CombinedValue o) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java index 1969627..c69ce00 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java @@ -19,21 +19,22 @@ package org.apache.cassandra.index.sasi.disk; import java.io.IOException; import java.util.*; -import java.util.stream.*; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.index.sasi.utils.AbstractIterator; +import org.apache.cassandra.index.sasi.utils.CombinedValue; +import org.apache.cassandra.index.sasi.utils.MappedBuffer; +import org.apache.cassandra.index.sasi.utils.RangeIterator; +import org.apache.cassandra.utils.MergeIterator; + +import com.carrotsearch.hppc.LongOpenHashSet; +import com.carrotsearch.hppc.LongSet; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.collect.Iterators; import org.apache.commons.lang3.builder.HashCodeBuilder; -import com.carrotsearch.hppc.cursors.LongObjectCursor; -import org.apache.cassandra.index.sasi.*; -import org.apache.cassandra.index.sasi.disk.Descriptor.*; -import org.apache.cassandra.index.sasi.utils.AbstractIterator; -import org.apache.cassandra.index.sasi.utils.*; -import org.apache.cassandra.utils.*; - -import static org.apache.cassandra.index.sasi.disk.Descriptor.Version.*; -import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.*; +import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.EntryType; // Note: all of the seek-able offsets contained in TokenTree should be sizeof(long) // even if currently only lower int portion of them if used, because that makes @@ -41,6 +42,9 @@ import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.*; // without any on-disk format changes and/or re-indexing if one day we'll have a need to. public class TokenTree { + private static final int LONG_BYTES = Long.SIZE / 8; + private static final int SHORT_BYTES = Short.SIZE / 8; + private final Descriptor descriptor; private final MappedBuffer file; private final long startPos; @@ -62,7 +66,8 @@ public class TokenTree file.position(startPos + TokenTreeBuilder.SHARED_HEADER_BYTES); - validateMagic(); + if (!validateMagic()) + throw new IllegalArgumentException("invalid token tree"); tokenCount = file.getLong(); treeMinToken = file.getLong(); @@ -74,12 +79,12 @@ public class TokenTree return tokenCount; } - public RangeIterator iterator(KeyFetcher keyFetcher) + public RangeIterator iterator(Function keyFetcher) { return new TokenTreeIterator(file.duplicate(), keyFetcher); } - public OnDiskToken get(final long searchToken, KeyFetcher keyFetcher) + public OnDiskToken get(final long searchToken, Function keyFetcher) { seekToLeaf(searchToken, file); long leafStart = file.position(); @@ -90,24 +95,21 @@ public class TokenTree file.position(leafStart + TokenTreeBuilder.BLOCK_HEADER_BYTES); - OnDiskToken token = getTokenAt(file, tokenIndex, leafSize, keyFetcher); - + OnDiskToken token = OnDiskToken.getTokenAt(file, tokenIndex, leafSize, keyFetcher); return token.get().equals(searchToken) ? token : null; } - private void validateMagic() + private boolean validateMagic() { - if (descriptor.version == aa) - return; - - short magic = file.getShort(); - if (descriptor.version == Version.ab && magic == TokenTreeBuilder.AB_MAGIC) - return; - - if (descriptor.version == Version.ac && magic == TokenTreeBuilder.AC_MAGIC) - return; - - throw new IllegalArgumentException("invalid token tree. Written magic: '" + ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(magic)) + "'"); + switch (descriptor.version.toString()) + { + case Descriptor.VERSION_AA: + return true; + case Descriptor.VERSION_AB: + return TokenTreeBuilder.AB_MAGIC == file.getShort(); + default: + return false; + } } // finds leaf that *could* contain token @@ -134,16 +136,18 @@ public class TokenTree long minToken = file.getLong(); long maxToken = file.getLong(); - long seekBase = blockStart + BLOCK_HEADER_BYTES; + long seekBase = blockStart + TokenTreeBuilder.BLOCK_HEADER_BYTES; if (minToken > token) { // seek to beginning of child offsets to locate first child - file.position(seekBase + tokenCount * TOKEN_BYTES); + file.position(seekBase + tokenCount * LONG_BYTES); + blockStart = (startPos + (int) file.getLong()); } else if (maxToken < token) { // seek to end of child offsets to locate last child - file.position(seekBase + (2 * tokenCount) * TOKEN_BYTES); + file.position(seekBase + (2 * tokenCount) * LONG_BYTES); + blockStart = (startPos + (int) file.getLong()); } else { @@ -154,11 +158,12 @@ public class TokenTree // file pointer is now at beginning of offsets if (offsetIndex == tokenCount) - file.position(file.position() + (offsetIndex * TOKEN_BYTES)); + file.position(file.position() + (offsetIndex * LONG_BYTES)); else - file.position(file.position() + ((tokenCount - offsetIndex - 1) + offsetIndex) * TOKEN_BYTES); + file.position(file.position() + ((tokenCount - offsetIndex - 1) + offsetIndex) * LONG_BYTES); + + blockStart = (startPos + (int) file.getLong()); } - blockStart = (startPos + (int) file.getLong()); } } @@ -167,7 +172,8 @@ public class TokenTree short offsetIndex = 0; for (int i = 0; i < tokenCount; i++) { - if (searchToken < file.getLong()) + long readToken = file.getLong(); + if (searchToken < readToken) break; offsetIndex++; @@ -187,7 +193,10 @@ public class TokenTree while (start <= end) { middle = start + ((end - start) >> 1); - long token = file.getLong(base + middle * LEAF_ENTRY_BYTES + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES)); + + // each entry is 16 bytes wide, token is in bytes 4-11 + long token = file.getLong(base + (middle * (2 * LONG_BYTES) + 4)); + if (token == searchToken) break; @@ -200,9 +209,9 @@ public class TokenTree return (short) middle; } - private class TokenTreeIterator extends RangeIterator + public class TokenTreeIterator extends RangeIterator { - private final KeyFetcher keyFetcher; + private final Function keyFetcher; private final MappedBuffer file; private long currentLeafStart; @@ -215,7 +224,7 @@ public class TokenTree protected boolean firstIteration = true; private boolean lastLeaf; - TokenTreeIterator(MappedBuffer file, KeyFetcher keyFetcher) + TokenTreeIterator(MappedBuffer file, Function keyFetcher) { super(treeMinToken, treeMaxToken, tokenCount); @@ -305,13 +314,13 @@ public class TokenTree private Token getTokenAt(int idx) { - return TokenTree.this.getTokenAt(file, idx, leafSize, keyFetcher); + return OnDiskToken.getTokenAt(file, idx, leafSize, keyFetcher); } private long getTokenPosition(int idx) { - // skip entry header to get position pointing directly at the entry's token - return TokenTree.this.getEntryPosition(idx, file, descriptor) + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES); + // skip 4 byte entry header to get position pointing directly at the entry's token + return OnDiskToken.getEntryPosition(idx, file) + (2 * SHORT_BYTES); } private void seekToNextLeaf() @@ -338,15 +347,15 @@ public class TokenTree } } - public class OnDiskToken extends Token + public static class OnDiskToken extends Token { private final Set info = new HashSet<>(2); - private final Set loadedKeys = new TreeSet<>(RowKey.COMPARATOR); + private final Set loadedKeys = new TreeSet<>(DecoratedKey.comparator); - private OnDiskToken(MappedBuffer buffer, long position, short leafSize, KeyFetcher keyFetcher) + public OnDiskToken(MappedBuffer buffer, long position, short leafSize, Function keyFetcher) { - super(buffer.getLong(position + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES))); - info.add(new TokenInfo(buffer, position, leafSize, keyFetcher, descriptor)); + super(buffer.getLong(position + (2 * SHORT_BYTES))); + info.add(new TokenInfo(buffer, position, leafSize, keyFetcher)); } public void merge(CombinedValue other) @@ -368,9 +377,9 @@ public class TokenTree } } - public Iterator iterator() + public Iterator iterator() { - List> keys = new ArrayList<>(info.size()); + List> keys = new ArrayList<>(info.size()); for (TokenInfo i : info) keys.add(i.iterator()); @@ -378,72 +387,68 @@ public class TokenTree if (!loadedKeys.isEmpty()) keys.add(loadedKeys.iterator()); - return MergeIterator.get(keys, RowKey.COMPARATOR, new MergeIterator.Reducer() + return MergeIterator.get(keys, DecoratedKey.comparator, new MergeIterator.Reducer() { - RowKey reduced = null; + DecoratedKey reduced = null; public boolean trivialReduceIsTrivial() { return true; } - public void reduce(int idx, RowKey current) + public void reduce(int idx, DecoratedKey current) { reduced = current; } - protected RowKey getReduced() + protected DecoratedKey getReduced() { return reduced; } }); } - public KeyOffsets getOffsets() + public LongSet getOffsets() { - KeyOffsets offsets = new KeyOffsets(); + LongSet offsets = new LongOpenHashSet(4); for (TokenInfo i : info) { - for (LongObjectCursor offset : i.fetchOffsets()) - offsets.put(offset.key, offset.value); + for (long offset : i.fetchOffsets()) + offsets.add(offset); } return offsets; } - } - private OnDiskToken getTokenAt(MappedBuffer buffer, int idx, short leafSize, KeyFetcher keyFetcher) - { - return new OnDiskToken(buffer, getEntryPosition(idx, buffer, descriptor), leafSize, keyFetcher); - } - - private long getEntryPosition(int idx, MappedBuffer file, Descriptor descriptor) - { - if (descriptor.version.compareTo(Version.ac) < 0) - return file.position() + (idx * LEGACY_LEAF_ENTRY_BYTES); + public static OnDiskToken getTokenAt(MappedBuffer buffer, int idx, short leafSize, Function keyFetcher) + { + return new OnDiskToken(buffer, getEntryPosition(idx, buffer), leafSize, keyFetcher); + } - // skip n entries, to the entry with the given index - return file.position() + (idx * LEAF_ENTRY_BYTES); + private static long getEntryPosition(int idx, MappedBuffer file) + { + // info (4 bytes) + token (8 bytes) + offset (4 bytes) = 16 bytes + return file.position() + (idx * (2 * LONG_BYTES)); + } } private static class TokenInfo { private final MappedBuffer buffer; - private final KeyFetcher keyFetcher; - private final Descriptor descriptor; + private final Function keyFetcher; + private final long position; private final short leafSize; - public TokenInfo(MappedBuffer buffer, long position, short leafSize, KeyFetcher keyFetcher, Descriptor descriptor) + public TokenInfo(MappedBuffer buffer, long position, short leafSize, Function keyFetcher) { this.keyFetcher = keyFetcher; this.buffer = buffer; this.position = position; this.leafSize = leafSize; - this.descriptor = descriptor; } - public Iterator iterator() + public Iterator iterator() { return new KeyIterator(keyFetcher, fetchOffsets()); } @@ -460,154 +465,59 @@ public class TokenTree TokenInfo o = (TokenInfo) other; return keyFetcher == o.keyFetcher && position == o.position; - } - /** - * Legacy leaf storage format (used for reading data formats before AC): - * - * [(short) leaf type][(short) offset extra bytes][(long) token][(int) offsetData] - * - * Many pairs can be encoded into long+int. - * - * Simple entry: offset fits into (int) - * - * [(short) leaf type][(short) offset extra bytes][(long) token][(int) offsetData] - * - * FactoredOffset: a single offset, offset fits into (long)+(int) bits: - * - * [(short) leaf type][(short) 16 bytes of remained offset][(long) token][(int) top 32 bits of offset] - * - * PackedCollisionEntry: packs the two offset entries into int and a short (if both of them fit into - * (long) and one of them fits into (int)) - * - * [(short) leaf type][(short) 16 the offset that'd fit into short][(long) token][(int) 32 bits of offset that'd fit into int] - * - * Otherwise, the rest gets packed into limited-size overflow collision entry - * - * [(short) leaf type][(short) count][(long) token][(int) start index] - */ - private KeyOffsets fetchOffsetsLegacy() + private long[] fetchOffsets() { short info = buffer.getShort(position); // offset extra is unsigned short (right-most 16 bits of 48 bits allowed for an offset) - int offsetExtra = buffer.getShort(position + Short.BYTES) & 0xFFFF; + int offsetExtra = buffer.getShort(position + SHORT_BYTES) & 0xFFFF; // is the it left-most (32-bit) base of the actual offset in the index file - int offsetData = buffer.getInt(position + (2 * Short.BYTES) + Long.BYTES); + int offsetData = buffer.getInt(position + (2 * SHORT_BYTES) + LONG_BYTES); EntryType type = EntryType.of(info & TokenTreeBuilder.ENTRY_TYPE_MASK); - KeyOffsets rowOffsets = new KeyOffsets(); switch (type) { case SIMPLE: - rowOffsets.put(offsetData, KeyOffsets.NO_OFFSET); - break; + return new long[] { offsetData }; + case OVERFLOW: - long offsetPos = (buffer.position() + (2 * (leafSize * Long.BYTES)) + (offsetData * Long.BYTES)); + long[] offsets = new long[offsetExtra]; // offsetShort contains count of tokens + long offsetPos = (buffer.position() + (2 * (leafSize * LONG_BYTES)) + (offsetData * LONG_BYTES)); for (int i = 0; i < offsetExtra; i++) - { - long offset = buffer.getLong(offsetPos + (i * Long.BYTES));; - rowOffsets.put(offset, KeyOffsets.NO_OFFSET); - } - break; - case FACTORED: - long offset = (((long) offsetData) << Short.SIZE) + offsetExtra; - rowOffsets.put(offset, KeyOffsets.NO_OFFSET); - break; - case PACKED: - rowOffsets.put(offsetExtra, KeyOffsets.NO_OFFSET); - rowOffsets.put(offsetData, KeyOffsets.NO_OFFSET); - default: - throw new IllegalStateException("Unknown entry type: " + type); - } - return rowOffsets; - } + offsets[i] = buffer.getLong(offsetPos + (i * LONG_BYTES)); - private KeyOffsets fetchOffsets() - { - if (descriptor.version.compareTo(Version.ac) < 0) - return fetchOffsetsLegacy(); - - short info = buffer.getShort(position); - EntryType type = EntryType.of(info & TokenTreeBuilder.ENTRY_TYPE_MASK); + return offsets; - KeyOffsets rowOffsets = new KeyOffsets(); - long baseOffset = position + LEAF_ENTRY_TYPE_BYTES + TOKEN_BYTES; - switch (type) - { - case SIMPLE: - long partitionOffset = buffer.getLong(baseOffset); - long rowOffset = buffer.getLong(baseOffset + LEAF_PARTITON_OFFSET_BYTES); + case FACTORED: + return new long[] { (((long) offsetData) << Short.SIZE) + offsetExtra }; - rowOffsets.put(partitionOffset, rowOffset); - break; case PACKED: - long partitionOffset1 = buffer.getInt(baseOffset); - long rowOffset1 = buffer.getInt(baseOffset + LEAF_PARTITON_OFFSET_PACKED_BYTES); - - long partitionOffset2 = buffer.getInt(baseOffset + LEAF_PARTITON_OFFSET_PACKED_BYTES + LEAF_ROW_OFFSET_PACKED_BYTES); - long rowOffset2 = buffer.getInt(baseOffset + 2 * LEAF_PARTITON_OFFSET_PACKED_BYTES + LEAF_ROW_OFFSET_PACKED_BYTES); + return new long[] { offsetExtra, offsetData }; - rowOffsets.put(partitionOffset1, rowOffset1); - rowOffsets.put(partitionOffset2, rowOffset2); - break; - case OVERFLOW: - long collisionOffset = buffer.getLong(baseOffset); - long count = buffer.getLong(baseOffset + LEAF_PARTITON_OFFSET_BYTES); - - // Skip leaves and collision offsets that do not belong to current token - long offsetPos = buffer.position() + leafSize * LEAF_ENTRY_BYTES + collisionOffset * COLLISION_ENTRY_BYTES; - - for (int i = 0; i < count; i++) - { - long currentPartitionOffset = buffer.getLong(offsetPos + i * COLLISION_ENTRY_BYTES); - long currentRowOffset = buffer.getLong(offsetPos + i * COLLISION_ENTRY_BYTES + LEAF_PARTITON_OFFSET_BYTES); - - rowOffsets.put(currentPartitionOffset, currentRowOffset); - } - break; default: throw new IllegalStateException("Unknown entry type: " + type); } - - - return rowOffsets; } } - private static class KeyIterator extends AbstractIterator + private static class KeyIterator extends AbstractIterator { - private final KeyFetcher keyFetcher; - private final Iterator> offsets; - private long currentPatitionKey; - private PrimitiveIterator.OfLong currentCursor = null; + private final Function keyFetcher; + private final long[] offsets; + private int index = 0; - public KeyIterator(KeyFetcher keyFetcher, KeyOffsets offsets) + public KeyIterator(Function keyFetcher, long[] offsets) { this.keyFetcher = keyFetcher; - this.offsets = offsets.iterator(); + this.offsets = offsets; } - public RowKey computeNext() + public DecoratedKey computeNext() { - if (currentCursor != null && currentCursor.hasNext()) - { - return keyFetcher.getRowKey(currentPatitionKey, currentCursor.nextLong()); - } - else if (offsets.hasNext()) - { - LongObjectCursor cursor = offsets.next(); - currentPatitionKey = cursor.key; - currentCursor = LongStream.of(cursor.value).iterator(); - - return keyFetcher.getRowKey(currentPatitionKey, currentCursor.nextLong()); - } - else - { - return endOfData(); - } + return index < offsets.length ? keyFetcher.apply(offsets[index++]) : endOfData(); } } -} +} \ No newline at end of file