Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 96EDE200B9F for ; Tue, 6 Sep 2016 07:19:21 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 954F9160ABC; Tue, 6 Sep 2016 05:19:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7D953160AD0 for ; Tue, 6 Sep 2016 07:19:19 +0200 (CEST) Received: (qmail 19744 invoked by uid 500); 6 Sep 2016 05:19:18 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 19435 invoked by uid 99); 6 Sep 2016 05:19:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Sep 2016 05:19:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 77BD5E08B8; Tue, 6 Sep 2016 05:19:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: xedin@apache.org To: commits@cassandra.apache.org Date: Tue, 06 Sep 2016 05:19:18 -0000 Message-Id: In-Reply-To: <5845c3e825ff40f8a3d185c1e6164cf4@git.apache.org> References: <5845c3e825ff40f8a3d185c1e6164cf4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] cassandra git commit: Add row offset support to SASI archived-at: Tue, 06 Sep 2016 05:19:21 -0000 http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java index 2210964..07804d6 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java @@ -20,28 +20,62 @@ package org.apache.cassandra.index.sasi.disk; import java.io.IOException; import java.util.*; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.io.util.*; +import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.obs.BitUtil; -import com.carrotsearch.hppc.LongSet; - -public interface TokenTreeBuilder extends Iterable> +public interface TokenTreeBuilder extends Iterable> { - int BLOCK_BYTES = 4096; - int BLOCK_HEADER_BYTES = 64; - int OVERFLOW_TRAILER_BYTES = 64; - int OVERFLOW_TRAILER_CAPACITY = OVERFLOW_TRAILER_BYTES / 8; - int TOKENS_PER_BLOCK = (BLOCK_BYTES - BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / 16; - long MAX_OFFSET = (1L << 47) - 1; // 48 bits for (signed) offset - byte LAST_LEAF_SHIFT = 1; - byte SHARED_HEADER_BYTES = 19; - byte ENTRY_TYPE_MASK = 0x03; - short AB_MAGIC = 0x5A51; + final static int BLOCK_BYTES = 4096; + + final static int LEAF_ENTRY_TYPE_BYTES = Short.BYTES; + final static int TOKEN_OFFSET_BYTES = LEAF_ENTRY_TYPE_BYTES; + final static int LEAF_PARTITON_OFFSET_BYTES = Long.BYTES; + final static int LEAF_ROW_OFFSET_BYTES = Long.BYTES; + + final static int LEAF_PARTITON_OFFSET_PACKED_BYTES = Integer.BYTES; + final static int LEAF_ROW_OFFSET_PACKED_BYTES = Integer.BYTES; + final static int COLLISION_ENTRY_BYTES = LEAF_PARTITON_OFFSET_BYTES + LEAF_ROW_OFFSET_BYTES; + + final static int HEADER_INFO_BYTE_BYTES = Byte.BYTES; + final static int HEADER_TOKEN_COUNT_BYTES = Short.BYTES; + + final static int ROOT_HEADER_MAGIC_SIZE = Short.BYTES; + final static int ROOT_HEADER_TOKEN_COUNT_SIZE = Long.BYTES; + + // Partitioner token size in bytes + final static int TOKEN_BYTES = Long.BYTES; + + // Leaf entry size in bytes, see {@class SimpleLeafEntry} for a full description + final static int LEAF_ENTRY_BYTES = LEAF_ENTRY_TYPE_BYTES + TOKEN_BYTES + LEAF_PARTITON_OFFSET_BYTES + LEAF_ROW_OFFSET_BYTES; + // Shared header size in bytes, see {@class AbstractTreeBuilder$Header} for a full description + final static int SHARED_HEADER_BYTES = HEADER_INFO_BYTE_BYTES + HEADER_TOKEN_COUNT_BYTES + 2 * TOKEN_BYTES; + // Block header size in bytes, see {@class AbstractTreeBuilder$RootHeader} + final static int BLOCK_HEADER_BYTES = BitUtil.nextHighestPowerOfTwo(SHARED_HEADER_BYTES + ROOT_HEADER_MAGIC_SIZE + ROOT_HEADER_TOKEN_COUNT_SIZE + 2 * TOKEN_BYTES); + + // Overflow trailer capacity is currently 8 overflow items. Each overflow item consists of two longs. + final static int OVERFLOW_TRAILER_CAPACITY = 8; + final static int OVERFLOW_TRAILER_BYTES = OVERFLOW_TRAILER_CAPACITY * COLLISION_ENTRY_BYTES;; + final static int TOKENS_PER_BLOCK = (TokenTreeBuilder.BLOCK_BYTES - BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / LEAF_ENTRY_BYTES; + + final static int LEGACY_LEAF_ENTRY_BYTES = Short.BYTES + Short.BYTES + TOKEN_BYTES + Integer.BYTES; + final static int LEGACY_TOKEN_OFFSET_BYTES = 2 * Short.BYTES; + final static byte LAST_LEAF_SHIFT = 1; + + /** + * {@code Header} size in bytes. + */ + final byte ENTRY_TYPE_MASK = 0x03; + final short AB_MAGIC = 0x5A51; + final short AC_MAGIC = 0x7C63; // note: ordinal positions are used here, do not change order enum EntryType { - SIMPLE, FACTORED, PACKED, OVERFLOW; + SIMPLE, + FACTORED, + PACKED, + OVERFLOW; public static EntryType of(int ordinal) { @@ -61,9 +95,9 @@ public interface TokenTreeBuilder extends Iterable> } } - void add(Long token, long keyPosition); - void add(SortedMap data); - void add(Iterator> data); + void add(Long token, long partitionOffset, long rowOffset); + void add(SortedMap data); + void add(Iterator> data); void add(TokenTreeBuilder ttb); boolean isEmpty(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java b/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java index e55a806..a7b22f3 100644 --- a/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java +++ b/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java @@ -19,14 +19,14 @@ package org.apache.cassandra.index.sasi.memory; import java.nio.ByteBuffer; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.index.sasi.conf.ColumnIndex; +import org.apache.cassandra.index.sasi.disk.*; import org.apache.cassandra.index.sasi.disk.Token; import org.apache.cassandra.index.sasi.plan.Expression; import org.apache.cassandra.index.sasi.utils.RangeIterator; import org.apache.cassandra.index.sasi.utils.TypeUtil; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +42,7 @@ public class IndexMemtable this.index = MemIndex.forColumn(columnIndex.keyValidator(), columnIndex); } - public long index(DecoratedKey key, ByteBuffer value) + public long index(RowKey key, ByteBuffer value) { if (value == null || value.remaining() == 0) return 0; @@ -55,7 +55,7 @@ public class IndexMemtable { logger.error("Can't add column {} to index for key: {}, value size {}, validator: {}.", index.columnIndex.getColumnName(), - index.columnIndex.keyValidator().getString(key.getKey()), + index.columnIndex.keyValidator().getString(key.decoratedKey.getKey()), FBUtilities.prettyPrintMemory(size), validator); return 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java index a2f2c0e..b4365dc 100644 --- a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java +++ b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java @@ -18,28 +18,27 @@ package org.apache.cassandra.index.sasi.memory; import java.io.IOException; -import java.util.Iterator; -import java.util.SortedSet; -import java.util.TreeSet; +import java.util.*; import java.util.concurrent.ConcurrentSkipListSet; -import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.index.sasi.disk.*; import org.apache.cassandra.index.sasi.disk.Token; import org.apache.cassandra.index.sasi.utils.AbstractIterator; import org.apache.cassandra.index.sasi.utils.CombinedValue; import org.apache.cassandra.index.sasi.utils.RangeIterator; -import com.carrotsearch.hppc.LongOpenHashSet; -import com.carrotsearch.hppc.LongSet; import com.google.common.collect.PeekingIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KeyRangeIterator extends RangeIterator { private final DKIterator iterator; - public KeyRangeIterator(ConcurrentSkipListSet keys) + public KeyRangeIterator(ConcurrentSkipListSet keys) { - super((Long) keys.first().getToken().getTokenValue(), (Long) keys.last().getToken().getTokenValue(), keys.size()); + super((Long) keys.first().decoratedKey.getToken().getTokenValue(), (Long) keys.last().decoratedKey.getToken().getTokenValue(), keys.size()); this.iterator = new DKIterator(keys.iterator()); } @@ -52,8 +51,8 @@ public class KeyRangeIterator extends RangeIterator { while (iterator.hasNext()) { - DecoratedKey key = iterator.peek(); - if (Long.compare((long) key.getToken().getTokenValue(), nextToken) >= 0) + RowKey key = iterator.peek(); + if (Long.compare((Long) key.decoratedKey.getToken().getTokenValue(), nextToken) >= 0) break; // consume smaller key @@ -64,16 +63,16 @@ public class KeyRangeIterator extends RangeIterator public void close() throws IOException {} - private static class DKIterator extends AbstractIterator implements PeekingIterator + private static class DKIterator extends AbstractIterator implements PeekingIterator { - private final Iterator keys; + private final Iterator keys; - public DKIterator(Iterator keys) + public DKIterator(Iterator keys) { this.keys = keys; } - protected DecoratedKey computeNext() + protected RowKey computeNext() { return keys.hasNext() ? keys.next() : endOfData(); } @@ -81,25 +80,21 @@ public class KeyRangeIterator extends RangeIterator private static class DKToken extends Token { - private final SortedSet keys; + private final SortedSet keys; - public DKToken(final DecoratedKey key) + public DKToken(RowKey key) { - super((long) key.getToken().getTokenValue()); + super((Long) key.decoratedKey.getToken().getTokenValue()); - keys = new TreeSet(DecoratedKey.comparator) + keys = new TreeSet(RowKey.COMPARATOR) {{ add(key); }}; } - public LongSet getOffsets() + public KeyOffsets getOffsets() { - LongSet offsets = new LongOpenHashSet(4); - for (DecoratedKey key : keys) - offsets.add((long) key.getToken().getTokenValue()); - - return offsets; + throw new IllegalStateException("DecoratedKey tokens are used in memtables and do not have on-disk offsets"); } public void merge(CombinedValue other) @@ -116,14 +111,14 @@ public class KeyRangeIterator extends RangeIterator } else { - for (DecoratedKey key : o) + for (RowKey key : o) keys.add(key); } } - public Iterator iterator() + public Iterator iterator() { return keys.iterator(); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java index cc1eb3f..bfba4cb 100644 --- a/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java @@ -19,8 +19,8 @@ package org.apache.cassandra.index.sasi.memory; import java.nio.ByteBuffer; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.index.sasi.conf.ColumnIndex; +import org.apache.cassandra.index.sasi.disk.*; import org.apache.cassandra.index.sasi.disk.Token; import org.apache.cassandra.index.sasi.plan.Expression; import org.apache.cassandra.index.sasi.utils.RangeIterator; @@ -37,7 +37,7 @@ public abstract class MemIndex this.columnIndex = columnIndex; } - public abstract long add(DecoratedKey key, ByteBuffer value); + public abstract long add(RowKey key, ByteBuffer value); public abstract RangeIterator search(Expression expression); public static MemIndex forColumn(AbstractType keyValidator, ColumnIndex columnIndex) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java index 69b57d0..9c3562a 100644 --- a/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java @@ -22,8 +22,8 @@ import java.util.*; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.index.sasi.conf.ColumnIndex; +import org.apache.cassandra.index.sasi.disk.*; import org.apache.cassandra.index.sasi.disk.Token; import org.apache.cassandra.index.sasi.plan.Expression; import org.apache.cassandra.index.sasi.utils.RangeUnionIterator; @@ -34,7 +34,7 @@ public class SkipListMemIndex extends MemIndex { public static final int CSLM_OVERHEAD = 128; // average overhead of CSLM - private final ConcurrentSkipListMap> index; + private final ConcurrentSkipListMap> index; public SkipListMemIndex(AbstractType keyValidator, ColumnIndex columnIndex) { @@ -42,14 +42,14 @@ public class SkipListMemIndex extends MemIndex index = new ConcurrentSkipListMap<>(columnIndex.getValidator()); } - public long add(DecoratedKey key, ByteBuffer value) + public long add(RowKey key, ByteBuffer value) { long overhead = CSLM_OVERHEAD; // DKs are shared - ConcurrentSkipListSet keys = index.get(value); + ConcurrentSkipListSet keys = index.get(value); if (keys == null) { - ConcurrentSkipListSet newKeys = new ConcurrentSkipListSet<>(DecoratedKey.comparator); + ConcurrentSkipListSet newKeys = new ConcurrentSkipListSet<>(); keys = index.putIfAbsent(value, newKeys); if (keys == null) { @@ -68,7 +68,7 @@ public class SkipListMemIndex extends MemIndex ByteBuffer min = expression.lower == null ? null : expression.lower.value; ByteBuffer max = expression.upper == null ? null : expression.upper.value; - SortedMap> search; + SortedMap> search; if (min == null && max == null) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java index ca60ac5..e1c273d 100644 --- a/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java @@ -23,9 +23,8 @@ import java.util.List; import java.util.concurrent.ConcurrentSkipListSet; import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.index.sasi.conf.ColumnIndex; -import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder; +import org.apache.cassandra.index.sasi.disk.*; import org.apache.cassandra.index.sasi.disk.Token; import org.apache.cassandra.index.sasi.plan.Expression; import org.apache.cassandra.index.sasi.plan.Expression.Op; @@ -38,7 +37,7 @@ import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree; import com.googlecode.concurrenttrees.suffix.ConcurrentSuffixTree; import com.googlecode.concurrenttrees.radix.node.concrete.SmartArrayBasedNodeFactory; import com.googlecode.concurrenttrees.radix.node.Node; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.*; import org.slf4j.Logger; @@ -71,7 +70,7 @@ public class TrieMemIndex extends MemIndex } } - public long add(DecoratedKey key, ByteBuffer value) + public long add(RowKey key, ByteBuffer value) { AbstractAnalyzer analyzer = columnIndex.getAnalyzer(); analyzer.reset(value.duplicate()); @@ -85,7 +84,7 @@ public class TrieMemIndex extends MemIndex { logger.info("Can't add term of column {} to index for key: {}, term size {}, max allowed size {}, use analyzed = true (if not yet set) for that column.", columnIndex.getColumnName(), - keyValidator.getString(key.getKey()), + keyValidator.getString(key.decoratedKey.getKey()), FBUtilities.prettyPrintMemory(term.remaining()), FBUtilities.prettyPrintMemory(OnDiskIndexBuilder.MAX_TERM_SIZE)); continue; @@ -113,13 +112,13 @@ public class TrieMemIndex extends MemIndex definition = column; } - public long add(String value, DecoratedKey key) + public long add(String value, RowKey key) { long overhead = CSLM_OVERHEAD; - ConcurrentSkipListSet keys = get(value); + ConcurrentSkipListSet keys = get(value); if (keys == null) { - ConcurrentSkipListSet newKeys = new ConcurrentSkipListSet<>(DecoratedKey.comparator); + ConcurrentSkipListSet newKeys = new ConcurrentSkipListSet<>(); keys = putIfAbsent(value, newKeys); if (keys == null) { @@ -141,10 +140,10 @@ public class TrieMemIndex extends MemIndex { ByteBuffer prefix = expression.lower == null ? null : expression.lower.value; - Iterable> search = search(expression.getOp(), definition.cellValueType().getString(prefix)); + Iterable> search = search(expression.getOp(), definition.cellValueType().getString(prefix)); RangeUnionIterator.Builder builder = RangeUnionIterator.builder(); - for (ConcurrentSkipListSet keys : search) + for (ConcurrentSkipListSet keys : search) { if (!keys.isEmpty()) builder.add(new KeyRangeIterator(keys)); @@ -153,14 +152,14 @@ public class TrieMemIndex extends MemIndex return builder.build(); } - protected abstract ConcurrentSkipListSet get(String value); - protected abstract Iterable> search(Op operator, String value); - protected abstract ConcurrentSkipListSet putIfAbsent(String value, ConcurrentSkipListSet key); + protected abstract ConcurrentSkipListSet get(String value); + protected abstract Iterable> search(Op operator, String value); + protected abstract ConcurrentSkipListSet putIfAbsent(String value, ConcurrentSkipListSet key); } protected static class ConcurrentPrefixTrie extends ConcurrentTrie { - private final ConcurrentRadixTree> trie; + private final ConcurrentRadixTree> trie; private ConcurrentPrefixTrie(ColumnDefinition column) { @@ -168,23 +167,23 @@ public class TrieMemIndex extends MemIndex trie = new ConcurrentRadixTree<>(NODE_FACTORY); } - public ConcurrentSkipListSet get(String value) + public ConcurrentSkipListSet get(String value) { return trie.getValueForExactKey(value); } - public ConcurrentSkipListSet putIfAbsent(String value, ConcurrentSkipListSet newKeys) + public ConcurrentSkipListSet putIfAbsent(String value, ConcurrentSkipListSet newKeys) { return trie.putIfAbsent(value, newKeys); } - public Iterable> search(Op operator, String value) + public Iterable> search(Op operator, String value) { switch (operator) { case EQ: case MATCH: - ConcurrentSkipListSet keys = trie.getValueForExactKey(value); + ConcurrentSkipListSet keys = trie.getValueForExactKey(value); return keys == null ? Collections.emptyList() : Collections.singletonList(keys); case PREFIX: @@ -198,7 +197,7 @@ public class TrieMemIndex extends MemIndex protected static class ConcurrentSuffixTrie extends ConcurrentTrie { - private final ConcurrentSuffixTree> trie; + private final ConcurrentSuffixTree> trie; private ConcurrentSuffixTrie(ColumnDefinition column) { @@ -206,23 +205,23 @@ public class TrieMemIndex extends MemIndex trie = new ConcurrentSuffixTree<>(NODE_FACTORY); } - public ConcurrentSkipListSet get(String value) + public ConcurrentSkipListSet get(String value) { return trie.getValueForExactKey(value); } - public ConcurrentSkipListSet putIfAbsent(String value, ConcurrentSkipListSet newKeys) + public ConcurrentSkipListSet putIfAbsent(String value, ConcurrentSkipListSet newKeys) { return trie.putIfAbsent(value, newKeys); } - public Iterable> search(Op operator, String value) + public Iterable> search(Op operator, String value) { switch (operator) { case EQ: case MATCH: - ConcurrentSkipListSet keys = trie.getValueForExactKey(value); + ConcurrentSkipListSet keys = trie.getValueForExactKey(value); return keys == null ? Collections.emptyList() : Collections.singletonList(keys); case SUFFIX: http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java index fa1181f..af4e249 100644 --- a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java +++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java @@ -17,15 +17,18 @@ */ package org.apache.cassandra.index.sasi.plan; +import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.DataLimits; -import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.index.Index; import org.apache.cassandra.index.sasi.SASIIndex; @@ -42,10 +45,12 @@ import org.apache.cassandra.index.sasi.utils.RangeUnionIterator; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.*; public class QueryController { + private static final Logger logger = LoggerFactory.getLogger(QueryController.class); + private final long executionQuota; private final long executionStart; @@ -94,22 +99,26 @@ public class QueryController return index.isPresent() ? ((SASIIndex) index.get()).getIndex() : null; } - - public UnfilteredRowIterator getPartition(DecoratedKey key, ReadExecutionController executionController) + public UnfilteredRowIterator getPartition(DecoratedKey key, NavigableSet clusterings, ReadExecutionController executionController) { if (key == null) throw new NullPointerException(); + try { - SinglePartitionReadCommand partition = SinglePartitionReadCommand.create(command.isForThrift(), - cfs.metadata, + ClusteringIndexFilter filter; + if (clusterings == null) + filter = new ClusteringIndexSliceFilter(Slices.ALL, false); + else + filter = new ClusteringIndexNamesFilter(clusterings, false); + + SinglePartitionReadCommand partition = SinglePartitionReadCommand.create(cfs.metadata, command.nowInSec(), command.columnFilter(), - command.rowFilter().withoutExpressions(), + command.rowFilter(), DataLimits.NONE, key, - command.clusteringIndexFilter(key)); - + filter); return partition.queryMemtableAndDisk(cfs, executionController); } finally @@ -135,20 +144,24 @@ public class QueryController RangeIterator.Builder builder = op == OperationType.OR ? RangeUnionIterator.builder() - : RangeIntersectionIterator.builder(); + : RangeIntersectionIterator.builder(); List> perIndexUnions = new ArrayList<>(); for (Map.Entry> e : getView(op, expressions).entrySet()) { - @SuppressWarnings("resource") // RangeIterators are closed by releaseIndexes - RangeIterator index = TermIterator.build(e.getKey(), e.getValue()); - - if (index == null) - continue; + try (RangeIterator index = TermIterator.build(e.getKey(), e.getValue())) + { + if (index == null) + continue; - builder.add(index); - perIndexUnions.add(index); + builder.add(index); + perIndexUnions.add(index); + } + catch (IOException ex) + { + logger.error("Failed to release index: ", ex); + } } resources.put(expressions, perIndexUnions); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java index 4410756..ccb369c 100644 --- a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java +++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java @@ -19,16 +19,19 @@ package org.apache.cassandra.index.sasi.plan; import java.util.*; -import org.apache.cassandra.config.CFMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.*; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.*; +import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.index.sasi.disk.*; import org.apache.cassandra.index.sasi.disk.Token; -import org.apache.cassandra.index.sasi.plan.Operation.OperationType; -import org.apache.cassandra.exceptions.RequestTimeoutException; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.AbstractIterator; +import org.apache.cassandra.index.sasi.plan.Operation.*; +import org.apache.cassandra.utils.btree.*; public class QueryPlan { @@ -68,14 +71,16 @@ public class QueryPlan return new ResultIterator(analyze(), controller, executionController); } - private static class ResultIterator extends AbstractIterator implements UnfilteredPartitionIterator + private static class ResultIterator implements UnfilteredPartitionIterator { private final AbstractBounds keyRange; private final Operation operationTree; private final QueryController controller; private final ReadExecutionController executionController; - private Iterator currentKeys = null; + private Iterator currentKeys = null; + private UnfilteredRowIterator nextPartition = null; + private DecoratedKey lastPartitionKey = null; public ResultIterator(Operation operationTree, QueryController controller, ReadExecutionController executionController) { @@ -87,53 +92,152 @@ public class QueryPlan operationTree.skipTo((Long) keyRange.left.getToken().getTokenValue()); } - protected UnfilteredRowIterator computeNext() + public boolean hasNext() + { + return prepareNext(); + } + + public UnfilteredRowIterator next() + { + if (nextPartition == null) + prepareNext(); + + UnfilteredRowIterator toReturn = nextPartition; + nextPartition = null; + return toReturn; + } + + private boolean prepareNext() { if (operationTree == null) - return endOfData(); + return false; + + if (nextPartition != null) + nextPartition.close(); for (;;) { if (currentKeys == null || !currentKeys.hasNext()) { if (!operationTree.hasNext()) - return endOfData(); + return false; Token token = operationTree.next(); currentKeys = token.iterator(); } - while (currentKeys.hasNext()) + CFMetaData metadata = controller.metadata(); + BTreeSet.Builder clusterings = BTreeSet.builder(metadata.comparator); + // results have static clustering, the whole partition has to be read + boolean fetchWholePartition = false; + + while (true) { - DecoratedKey key = currentKeys.next(); + if (!currentKeys.hasNext()) + { + // No more keys for this token. + // If no clusterings were collected yet, exit this inner loop so the operation + // tree iterator can move on to the next token. + // If some clusterings were collected, build an iterator for those rows + // and return. + if ((clusterings.isEmpty() && !fetchWholePartition) || lastPartitionKey == null) + break; + + UnfilteredRowIterator partition = fetchPartition(lastPartitionKey, clusterings.build(), fetchWholePartition); + // Prepare for next partition, reset partition key and clusterings + lastPartitionKey = null; + clusterings = BTreeSet.builder(metadata.comparator); + + if (partition.isEmpty()) + { + partition.close(); + continue; + } + + nextPartition = partition; + return true; + } + + RowKey fullKey = currentKeys.next(); + DecoratedKey key = fullKey.decoratedKey; if (!keyRange.right.isMinimum() && keyRange.right.compareTo(key) < 0) - return endOfData(); + return false; - try (UnfilteredRowIterator partition = controller.getPartition(key, executionController)) + if (lastPartitionKey != null && metadata.getKeyValidator().compare(lastPartitionKey.getKey(), key.getKey()) != 0) { - Row staticRow = partition.staticRow(); - List clusters = new ArrayList<>(); + UnfilteredRowIterator partition = fetchPartition(lastPartitionKey, clusterings.build(), fetchWholePartition); - while (partition.hasNext()) + if (partition.isEmpty()) + partition.close(); + else { - Unfiltered row = partition.next(); - if (operationTree.satisfiedBy(row, staticRow, true)) - clusters.add(row); + nextPartition = partition; + return true; } - - if (!clusters.isEmpty()) - return new PartitionIterator(partition, clusters); } + + lastPartitionKey = key; + + // We fetch whole partition for versions before AC and in case static column index is queried in AC + if (fullKey.clustering == null || fullKey.clustering.clustering().kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING) + fetchWholePartition = true; + else + clusterings.add(fullKey.clustering); + } } } + private UnfilteredRowIterator fetchPartition(DecoratedKey key, NavigableSet clusterings, boolean fetchWholePartition) + { + if (fetchWholePartition) + clusterings = null; + + try (UnfilteredRowIterator partition = controller.getPartition(key, clusterings, executionController)) + { + Row staticRow = partition.staticRow(); + List clusters = new ArrayList<>(); + + while (partition.hasNext()) + { + Unfiltered row = partition.next(); + if (operationTree.satisfiedBy(row, staticRow, true)) + clusters.add(row); + } + + if (!clusters.isEmpty()) + return new PartitionIterator(partition, clusters); + else + return UnfilteredRowIterators.noRowsIterator(partition.metadata(), + partition.partitionKey(), + Rows.EMPTY_STATIC_ROW, + partition.partitionLevelDeletion(), + partition.isReverseOrder()); + } + } + + public void close() + { + if (nextPartition != null) + nextPartition.close(); + } + + public boolean isForThrift() + { + return controller.isForThrift(); + } + + public CFMetaData metadata() + { + return controller.metadata(); + } + private static class PartitionIterator extends AbstractUnfilteredRowIterator { private final Iterator rows; - public PartitionIterator(UnfilteredRowIterator partition, Collection content) + public PartitionIterator(UnfilteredRowIterator partition, Collection filteredRows) { super(partition.metadata(), partition.partitionKey(), @@ -143,7 +247,7 @@ public class QueryPlan partition.isReverseOrder(), partition.stats()); - rows = content.iterator(); + rows = filteredRows.iterator(); } @Override @@ -152,21 +256,5 @@ public class QueryPlan return rows.hasNext() ? rows.next() : endOfData(); } } - - public boolean isForThrift() - { - return controller.isForThrift(); - } - - public CFMetaData metadata() - { - return controller.metadata(); - } - - public void close() - { - FileUtils.closeQuietly(operationTree); - controller.finish(); - } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java index f0b6bac..35898aa 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java @@ -46,6 +46,11 @@ public interface SSTableFlushObserver * * @param unfilteredCluster The unfiltered cluster being added to SSTable. */ + default void nextUnfilteredCluster(Unfiltered unfilteredCluster, long position) + { + nextUnfilteredCluster(unfilteredCluster); + } + void nextUnfilteredCluster(Unfiltered unfilteredCluster); /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 87d2a6e..8442ed7 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -50,8 +50,7 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.db.rows.EncodingStats; -import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -63,6 +62,7 @@ import org.apache.cassandra.io.sstable.metadata.*; import org.apache.cassandra.io.util.*; import org.apache.cassandra.metrics.RestorableMeter; import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.net.*; import org.apache.cassandra.schema.CachingParams; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.service.ActiveRepairService; @@ -1780,6 +1780,35 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted o.startPartition(key, iwriter.indexFile.position())); + observers.forEach((o) -> { + o.startPartition(key, iwriter.indexFile.position()); + }); //Reuse the writer for each row columnIndexWriter.reset(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/utils/obs/BitUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/obs/BitUtil.java b/src/java/org/apache/cassandra/utils/obs/BitUtil.java index e04de2b..c438d1b 100644 --- a/src/java/org/apache/cassandra/utils/obs/BitUtil.java +++ b/src/java/org/apache/cassandra/utils/obs/BitUtil.java @@ -20,7 +20,7 @@ package org.apache.cassandra.utils.obs; /** A variety of high efficiency bit twiddling routines. * @lucene.internal */ -final class BitUtil +public final class BitUtil { /** Returns the number of bits set in the long */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/data/legacy-sasi/on-disk-sa-int2.db ---------------------------------------------------------------------- diff --git a/test/data/legacy-sasi/on-disk-sa-int2.db b/test/data/legacy-sasi/on-disk-sa-int2.db new file mode 100644 index 0000000..71f662f Binary files /dev/null and b/test/data/legacy-sasi/on-disk-sa-int2.db differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java index 0b4e9e2..fc5afac 100644 --- a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java +++ b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java @@ -76,6 +76,7 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; +import com.google.common.collect.Sets; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; @@ -92,7 +93,7 @@ public class SASIIndexTest PARTITIONER = Murmur3Partitioner.instance; } - private static final String KS_NAME = "sasi"; + private static final String KS_NAME = "sasi_index_test"; private static final String CF_NAME = "test_cf"; private static final String CLUSTERING_CF_NAME_1 = "clustering_test_cf_1"; private static final String CLUSTERING_CF_NAME_2 = "clustering_test_cf_2"; @@ -448,9 +449,15 @@ public class SASIIndexTest if (forceFlush) store.forceBlockingFlush(); - final UntypedResultSet results = executeCQL(FTS_CF_NAME, "SELECT * FROM %s.%s WHERE artist LIKE 'lady%%'"); - Assert.assertNotNull(results); - Assert.assertEquals(3, results.size()); + CQLTester.assertRowsIgnoringOrder(executeCQL(FTS_CF_NAME, "SELECT * FROM %s.%s WHERE artist LIKE 'lady%%'"), + CQLTester.row(UUID.fromString("1a4abbcd-b5de-4c69-a578-31231e01ff09"), "Lady Gaga", "Poker Face"), + CQLTester.row(UUID.fromString("4f8dc18e-54e6-4e16-b507-c5324b61523b"), "Lady Pank", "Zamki na piasku"), + CQLTester.row(UUID.fromString("eaf294fa-bad5-49d4-8f08-35ba3636a706"), "Lady Pank", "Koncertowa")); + + CQLTester.assertRowsIgnoringOrder(executeCQL(FTS_CF_NAME, "SELECT artist, title FROM %s.%s WHERE artist LIKE 'lady%%'"), + CQLTester.row("Lady Gaga", "Poker Face"), + CQLTester.row("Lady Pank", "Zamki na piasku"), + CQLTester.row("Lady Pank", "Koncertowa")); } @Test @@ -664,7 +671,7 @@ public class SASIIndexTest add("key21"); }}; - Assert.assertEquals(expected, convert(uniqueKeys)); + Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys))); // now let's test a single equals condition @@ -690,7 +697,7 @@ public class SASIIndexTest add("key21"); }}; - Assert.assertEquals(expected, convert(uniqueKeys)); + Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys))); // now let's test something which is smaller than a single page uniqueKeys = getPaged(store, 4, @@ -704,7 +711,7 @@ public class SASIIndexTest add("key07"); }}; - Assert.assertEquals(expected, convert(uniqueKeys)); + Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys))); // the same but with the page size of 2 to test minimal pagination windows @@ -712,7 +719,7 @@ public class SASIIndexTest buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")), buildExpression(age, Operator.EQ, Int32Type.instance.decompose(36))); - Assert.assertEquals(expected, convert(uniqueKeys)); + Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys))); // and last but not least, test age range query with pagination uniqueKeys = getPaged(store, 4, @@ -736,7 +743,7 @@ public class SASIIndexTest add("key21"); }}; - Assert.assertEquals(expected, convert(uniqueKeys)); + Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys))); Set rows; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java new file mode 100644 index 0000000..21ef070 --- /dev/null +++ b/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sasi.disk; + +import org.junit.Assert; +import org.junit.Test; + +public class KeyOffsetsTest +{ + @Test + public void testDuplicates() + { + KeyOffsets offsets = new KeyOffsets(); + long[] arr = new long[]{ 1, 2, 3, 4, 5, 6 }; + offsets.put(1, arr); + Assert.assertArrayEquals(offsets.get(1), arr); + offsets.put(1, arr); + Assert.assertArrayEquals(offsets.get(1), arr); + for (long l : arr) + offsets.put(1, l); + Assert.assertArrayEquals(offsets.get(1), arr); + + for (long l : arr) + offsets.put(2, l); + Assert.assertArrayEquals(offsets.get(2), arr); + offsets.put(2, arr); + Assert.assertArrayEquals(offsets.get(2), arr); + offsets.put(2, arr); + Assert.assertArrayEquals(offsets.get(2), arr); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java index 10dc7a8..b56cb4e 100644 --- a/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java +++ b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java @@ -24,13 +24,16 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import org.apache.cassandra.config.DatabaseDescriptor; +import com.carrotsearch.hppc.cursors.LongObjectCursor; import org.apache.cassandra.cql3.Operator; -import org.apache.cassandra.db.BufferDecoratedKey; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ClusteringComparator; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.index.sasi.plan.Expression; import org.apache.cassandra.index.sasi.utils.CombinedTerm; import org.apache.cassandra.index.sasi.utils.CombinedTermIterator; +import org.apache.cassandra.index.sasi.utils.KeyConverter; import org.apache.cassandra.index.sasi.utils.OnDiskIndexIterator; import org.apache.cassandra.index.sasi.utils.RangeIterator; import org.apache.cassandra.db.marshal.AbstractType; @@ -38,13 +41,8 @@ import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.utils.MurmurHash; import org.apache.cassandra.utils.Pair; -import com.carrotsearch.hppc.LongSet; -import com.carrotsearch.hppc.cursors.LongCursor; - -import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; @@ -87,7 +85,7 @@ public class OnDiskIndexTest builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter()); + OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance); // first check if we can find exact matches for (Map.Entry e : data.entrySet()) @@ -95,11 +93,13 @@ public class OnDiskIndexTest if (UTF8Type.instance.getString(e.getKey()).equals("cat")) continue; // cat is embedded into scat, we'll test it in next section - Assert.assertEquals("Key was: " + UTF8Type.instance.compose(e.getKey()), convert(e.getValue()), convert(onDisk.search(expressionFor(UTF8Type.instance, e.getKey())))); + Assert.assertEquals("Key was: " + UTF8Type.instance.compose(e.getKey()), + convert(e.getValue()), + convert(onDisk.search(expressionFor(UTF8Type.instance, e.getKey())))); } // check that cat returns positions for scat & cat - Assert.assertEquals(convert(1, 4), convert(onDisk.search(expressionFor("cat")))); + Assert.assertEquals(convert(1L, 4L), convert(onDisk.search(expressionFor("cat")))); // random suffix queries Assert.assertEquals(convert(9, 10), convert(onDisk.search(expressionFor("ar")))); @@ -143,7 +143,7 @@ public class OnDiskIndexTest builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new KeyConverter()); + OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, KeyConverter.instance); for (Map.Entry e : data.entrySet()) { @@ -224,14 +224,14 @@ public class OnDiskIndexTest OnDiskIndexBuilder iterTest = new OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, OnDiskIndexBuilder.Mode.PREFIX); for (int i = 0; i < iterCheckNums.size(); i++) - iterTest.add(iterCheckNums.get(i), keyAt((long) i), i); + iterTest.add(iterCheckNums.get(i), keyAt((long) i), i, i + 5); File iterIndex = File.createTempFile("sa-iter", ".db"); iterIndex.deleteOnExit(); iterTest.finish(iterIndex); - onDisk = new OnDiskIndex(iterIndex, Int32Type.instance, new KeyConverter()); + onDisk = new OnDiskIndex(iterIndex, Int32Type.instance, KeyConverter.instance); ByteBuffer number = Int32Type.instance.decompose(1); Assert.assertEquals(0, Iterators.size(onDisk.iteratorAt(number, OnDiskIndex.IteratorOrder.ASC, false))); @@ -283,7 +283,7 @@ public class OnDiskIndexTest builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter()); + OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance); Assert.assertEquals(convert(1, 2, 3, 4, 5, 6), convert(onDisk.search(expressionFor("liz")))); Assert.assertEquals(convert(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), convert(onDisk.search(expressionFor("a")))); @@ -315,14 +315,14 @@ public class OnDiskIndexTest final int numIterations = 100000; for (long i = 0; i < numIterations; i++) - builder.add(LongType.instance.decompose(start + i), keyAt(i), i); + builder.add(LongType.instance.decompose(start + i), keyAt(i), i, clusteringOffset(i)); File index = File.createTempFile("on-disk-sa-sparse", "db"); index.deleteOnExit(); builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, new KeyConverter()); + OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, KeyConverter.instance); ThreadLocalRandom random = ThreadLocalRandom.current(); @@ -343,9 +343,9 @@ public class OnDiskIndexTest if (upperInclusive) upperKey += 1; - Set actual = convert(rows); + Set actual = convert(rows); for (long key = lowerKey; key < upperKey; key++) - Assert.assertTrue("key" + key + " wasn't found", actual.contains(keyAt(key))); + Assert.assertTrue("key" + key + " wasn't found", actual.contains(new RowKey(keyAt(key), ck(clusteringOffset(key)), CLUSTERING_COMPARATOR))); Assert.assertEquals((upperKey - lowerKey), actual.size()); } @@ -353,7 +353,7 @@ public class OnDiskIndexTest // let's also explicitly test whole range search RangeIterator rows = onDisk.search(expressionFor(start, true, start + numIterations, true)); - Set actual = convert(rows); + Set actual = convert(rows); Assert.assertEquals(numIterations, actual.size()); } @@ -380,7 +380,7 @@ public class OnDiskIndexTest builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter()); + OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance); // test whole words first Assert.assertEquals(convert(3, 4, 5, 6, 7, 8, 9, 10), convert(onDisk.search(expressionForNot("Aleksey", "Vijay", "Pavel")))); @@ -424,7 +424,7 @@ public class OnDiskIndexTest builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new KeyConverter()); + OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, KeyConverter.instance); Assert.assertEquals(convert(1, 2, 4, 5, 6, 7, 8, 9, 10, 11, 12), convert(onDisk.search(expressionForNot(0, 10, 1)))); Assert.assertEquals(convert(1, 2, 4, 5, 7, 9, 10, 11, 12), convert(onDisk.search(expressionForNot(0, 10, 1, 8)))); @@ -439,16 +439,16 @@ public class OnDiskIndexTest final long lower = 0; final long upper = 100000; - OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.SPARSE); + OnDiskIndexBuilder builder = new OnDiskIndexBuilder(LongType.instance, LongType.instance, OnDiskIndexBuilder.Mode.SPARSE); for (long i = lower; i <= upper; i++) - builder.add(LongType.instance.decompose(i), keyAt(i), i); + builder.add(LongType.instance.decompose(i), keyAt(i), i, clusteringOffset(i)); File index = File.createTempFile("on-disk-sa-except-long-ranges", "db"); index.deleteOnExit(); builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, new KeyConverter()); + OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, KeyConverter.instance); ThreadLocalRandom random = ThreadLocalRandom.current(); @@ -503,10 +503,10 @@ public class OnDiskIndexTest private int validateExclusions(OnDiskIndex sa, long lower, long upper, Set exclusions, boolean checkCount) { int count = 0; - for (DecoratedKey key : convert(sa.search(rangeWithExclusions(lower, true, upper, true, exclusions)))) + for (RowKey key : convert(sa.search(rangeWithExclusions(lower, true, upper, true, exclusions)))) { - String keyId = UTF8Type.instance.getString(key.getKey()).split("key")[1]; - Assert.assertFalse("key" + keyId + " is present.", exclusions.contains(Long.valueOf(keyId))); + long keyId = LongType.instance.compose(key.decoratedKey.getKey()); + Assert.assertFalse("key" + keyId + " is present.", exclusions.contains(keyId)); count++; } @@ -519,40 +519,49 @@ public class OnDiskIndexTest @Test public void testDescriptor() throws Exception { - final Map> data = new HashMap>() + final Map> data = new HashMap>() {{ - put(Int32Type.instance.decompose(5), Pair.create(keyAt(1L), 1L)); + put(Int32Type.instance.decompose(5), Pair.create(new RowKey(keyAt(1L), ck(clusteringOffset(1L)), CLUSTERING_COMPARATOR) , 1L)); }}; OnDiskIndexBuilder builder1 = new OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, OnDiskIndexBuilder.Mode.PREFIX); - OnDiskIndexBuilder builder2 = new OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, OnDiskIndexBuilder.Mode.PREFIX); - for (Map.Entry> e : data.entrySet()) + for (Map.Entry> e : data.entrySet()) { - DecoratedKey key = e.getValue().left; + DecoratedKey key = e.getValue().left.decoratedKey; Long position = e.getValue().right; - builder1.add(e.getKey(), key, position); - builder2.add(e.getKey(), key, position); + builder1.add(e.getKey(), key, position, clusteringOffset(position)); } File index1 = File.createTempFile("on-disk-sa-int", "db"); - File index2 = File.createTempFile("on-disk-sa-int2", "db"); + index1.deleteOnExit(); - index2.deleteOnExit(); builder1.finish(index1); - builder2.finish(new Descriptor(Descriptor.VERSION_AA), index2); + OnDiskIndex onDisk1 = new OnDiskIndex(index1, Int32Type.instance, KeyConverter.instance); + ByteBuffer number = Int32Type.instance.decompose(5); + Assert.assertEquals(Collections.singleton(data.get(number).left), convert(onDisk1.search(expressionFor(Operator.EQ, Int32Type.instance, number)))); + Assert.assertEquals(onDisk1.descriptor.version, Descriptor.CURRENT_VERSION); + } - OnDiskIndex onDisk1 = new OnDiskIndex(index1, Int32Type.instance, new KeyConverter()); - OnDiskIndex onDisk2 = new OnDiskIndex(index2, Int32Type.instance, new KeyConverter()); - ByteBuffer number = Int32Type.instance.decompose(5); + static final String DATA_DIR = "test/data/legacy-sasi/"; - Assert.assertEquals(Collections.singleton(data.get(number).left), convert(onDisk1.search(expressionFor(Operator.EQ, Int32Type.instance, number)))); + @Test + public void testLegacyDescriptor() throws Exception + { + final Map> data = new HashMap>() + {{ + put(Int32Type.instance.decompose(5), Pair.create(new RowKey(keyAt(1L), ck(KeyOffsets.NO_OFFSET), CLUSTERING_COMPARATOR) , 1L)); + }}; + + File index2 = new File(DATA_DIR + "on-disk-sa-int2.db"); + OnDiskIndex onDisk2 = new OnDiskIndex(index2, Int32Type.instance, KeyConverter.instance); + + ByteBuffer number = Int32Type.instance.decompose(5); Assert.assertEquals(Collections.singleton(data.get(number).left), convert(onDisk2.search(expressionFor(Operator.EQ, Int32Type.instance, number)))); - Assert.assertEquals(onDisk1.descriptor.version.version, Descriptor.CURRENT_VERSION); - Assert.assertEquals(onDisk2.descriptor.version.version, Descriptor.VERSION_AA); + Assert.assertEquals(onDisk2.descriptor.version, Descriptor.VERSION_AA); } @Test @@ -574,7 +583,7 @@ public class OnDiskIndexTest builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new KeyConverter()); + OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, KeyConverter.instance); OnDiskIndex.OnDiskSuperBlock superBlock = onDisk.dataLevel.getSuperBlock(0); Iterator iter = superBlock.iterator(); @@ -595,14 +604,14 @@ public class OnDiskIndexTest { OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.SPARSE); for (long i = 0; i < 100000; i++) - builder.add(LongType.instance.decompose(i), keyAt(i), i); + builder.add(LongType.instance.decompose(i), keyAt(i), i, clusteringOffset(i)); File index = File.createTempFile("on-disk-sa-multi-superblock-match", ".db"); index.deleteOnExit(); builder.finish(index); - OnDiskIndex onDiskIndex = new OnDiskIndex(index, LongType.instance, new KeyConverter()); + OnDiskIndex onDiskIndex = new OnDiskIndex(index, LongType.instance, KeyConverter.instance); testSearchRangeWithSuperBlocks(onDiskIndex, 0, 500); testSearchRangeWithSuperBlocks(onDiskIndex, 300, 93456); @@ -617,9 +626,9 @@ public class OnDiskIndexTest } } - public void putAll(SortedMap offsets, TokenTreeBuilder ttb) + public void putAll(SortedMap offsets, TokenTreeBuilder ttb) { - for (Pair entry : ttb) + for (Pair entry : ttb) offsets.put(entry.left, entry.right); } @@ -629,26 +638,26 @@ public class OnDiskIndexTest OnDiskIndexBuilder builderA = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.PREFIX); OnDiskIndexBuilder builderB = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.PREFIX); - TreeMap> expected = new TreeMap<>(); + TreeMap> expected = new TreeMap<>(); for (long i = 0; i <= 100; i++) { - TreeMap offsets = expected.get(i); + TreeMap offsets = expected.get(i); if (offsets == null) expected.put(i, (offsets = new TreeMap<>())); - builderA.add(LongType.instance.decompose(i), keyAt(i), i); + builderA.add(LongType.instance.decompose(i), keyAt(i), i, clusteringOffset(i)); putAll(offsets, keyBuilder(i)); } for (long i = 50; i < 100; i++) { - TreeMap offsets = expected.get(i); + TreeMap offsets = expected.get(i); if (offsets == null) expected.put(i, (offsets = new TreeMap<>())); long position = 100L + i; - builderB.add(LongType.instance.decompose(i), keyAt(position), position); + builderB.add(LongType.instance.decompose(i), keyAt(position), position, clusteringOffset(position)); putAll(offsets, keyBuilder(100L + i)); } @@ -661,19 +670,19 @@ public class OnDiskIndexTest builderA.finish(indexA); builderB.finish(indexB); - OnDiskIndex a = new OnDiskIndex(indexA, LongType.instance, new KeyConverter()); - OnDiskIndex b = new OnDiskIndex(indexB, LongType.instance, new KeyConverter()); + OnDiskIndex a = new OnDiskIndex(indexA, LongType.instance, KeyConverter.instance); + OnDiskIndex b = new OnDiskIndex(indexB, LongType.instance, KeyConverter.instance); RangeIterator union = OnDiskIndexIterator.union(a, b); - TreeMap> actual = new TreeMap<>(); + TreeMap> actual = new TreeMap<>(); while (union.hasNext()) { CombinedTerm term = union.next(); Long composedTerm = LongType.instance.compose(term.getTerm()); - TreeMap offsets = actual.get(composedTerm); + TreeMap offsets = actual.get(composedTerm); if (offsets == null) actual.put(composedTerm, (offsets = new TreeMap<>())); @@ -688,7 +697,7 @@ public class OnDiskIndexTest OnDiskIndexBuilder combined = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.PREFIX); combined.finish(Pair.create(keyAt(0).getKey(), keyAt(100).getKey()), indexC, new CombinedTermIterator(a, b)); - OnDiskIndex c = new OnDiskIndex(indexC, LongType.instance, new KeyConverter()); + OnDiskIndex c = new OnDiskIndex(indexC, LongType.instance, KeyConverter.instance); union = OnDiskIndexIterator.union(c); actual.clear(); @@ -698,7 +707,7 @@ public class OnDiskIndexTest Long composedTerm = LongType.instance.compose(term.getTerm()); - TreeMap offsets = actual.get(composedTerm); + TreeMap offsets = actual.get(composedTerm); if (offsets == null) actual.put(composedTerm, (offsets = new TreeMap<>())); @@ -738,7 +747,7 @@ public class OnDiskIndexTest builder.finish(index); - OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter()); + OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance); // check that lady% return lady gaga (1) and lady pank (3) but not lady of bells(2) Assert.assertEquals(convert(1, 3), convert(onDisk.search(expressionFor("lady", Operator.LIKE_PREFIX)))); @@ -762,7 +771,7 @@ public class OnDiskIndexTest while (tokens.hasNext()) { Token token = tokens.next(); - Iterator keys = token.iterator(); + Iterator keys = token.iterator(); // each of the values should have exactly a single key Assert.assertTrue(keys.hasNext()); @@ -771,7 +780,7 @@ public class OnDiskIndexTest // and it's last should always smaller than current if (lastToken != null) - Assert.assertTrue("last should be less than current", lastToken.compareTo(token.get()) < 0); + Assert.assertTrue("last should be less than current", lastToken < token.get()); lastToken = token.get(); keyCount++; @@ -780,61 +789,84 @@ public class OnDiskIndexTest Assert.assertEquals(end - start, keyCount); } - private static DecoratedKey keyAt(long rawKey) + private static DecoratedKey keyAt(long partitionOffset) + { + return KeyConverter.dk(partitionOffset); + } + + private static Clustering ck(long rowOffset) + { + return KeyConverter.ck(rowOffset); + } + + private TokenTreeBuilder keyBuilder(long... offsets) { - ByteBuffer key = ByteBuffer.wrap(("key" + rawKey).getBytes()); - return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(MurmurHash.hash2_64(key, key.position(), key.remaining(), 0)), key); + TokenTreeBuilder builder = new DynamicTokenTreeBuilder(); + + for (final long pkOffset : offsets) + { + DecoratedKey k = keyAt(pkOffset); + builder.add((Long) k.getToken().getTokenValue(), pkOffset, clusteringOffset(pkOffset)); + } + + return builder.finish(); } - private static TokenTreeBuilder keyBuilder(Long... keys) + private static long clusteringOffset(long offset) + { + return offset + 100; + } + + private TokenTreeBuilder keyBuilder(Pair... offsets) { TokenTreeBuilder builder = new DynamicTokenTreeBuilder(); - for (final Long key : keys) + for (final Pair key : offsets) { - DecoratedKey dk = keyAt(key); - builder.add((Long) dk.getToken().getTokenValue(), key); + DecoratedKey k = keyAt(key.left); + builder.add((Long) k.getToken().getTokenValue(), key.left, key.right); } return builder.finish(); } - private static Set convert(TokenTreeBuilder offsets) + private static final ClusteringComparator CLUSTERING_COMPARATOR = new ClusteringComparator(BytesType.instance); + + private static Set convert(TokenTreeBuilder offsets) { - Set result = new HashSet<>(); + Set result = new HashSet<>(); - Iterator> offsetIter = offsets.iterator(); + Iterator> offsetIter = offsets.iterator(); while (offsetIter.hasNext()) { - LongSet v = offsetIter.next().right; + Pair pair = offsetIter.next(); - for (LongCursor offset : v) - result.add(keyAt(offset.value)); + for (LongObjectCursor cursor : pair.right) + for (long l : cursor.value) + result.add(new RowKey(keyAt(cursor.key), ck(l), CLUSTERING_COMPARATOR)); } return result; } - private static Set convert(long... keyOffsets) + private static Set convert(long... keyOffsets) { - Set result = new HashSet<>(); - for (long offset : keyOffsets) - result.add(keyAt(offset)); + Set result = new HashSet<>(); + for (final long offset : keyOffsets) + result.add(new RowKey(keyAt(offset), ck(clusteringOffset(offset)), CLUSTERING_COMPARATOR)); return result; } - private static Set convert(RangeIterator results) + private static Set convert(RangeIterator results) { if (results == null) return Collections.emptySet(); - Set keys = new TreeSet<>(DecoratedKey.comparator); + Set keys = new TreeSet<>(); while (results.hasNext()) - { - for (DecoratedKey key : results.next()) + for (RowKey key: results.next()) keys.add(key); - } return keys; } @@ -908,19 +940,11 @@ public class OnDiskIndexTest private static void addAll(OnDiskIndexBuilder builder, ByteBuffer term, TokenTreeBuilder tokens) { - for (Pair token : tokens) - { - for (long position : token.right.toArray()) - builder.add(term, keyAt(position), position); - } - } - - private static class KeyConverter implements Function - { - @Override - public DecoratedKey apply(Long offset) + for (Pair token : tokens) { - return keyAt(offset); + for (LongObjectCursor cursor : token.right) + for (long clusteringOffset : cursor.value) + builder.add(term, keyAt(cursor.key), cursor.key, clusteringOffset); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java index f19d962..61e4d67 100644 --- a/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java +++ b/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java @@ -22,11 +22,14 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Supplier; +import com.carrotsearch.hppc.cursors.LongObjectCursor; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ClusteringComparator; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; @@ -35,6 +38,9 @@ import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.rows.BTreeRow; import org.apache.cassandra.db.rows.BufferCell; import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.index.sasi.KeyFetcher; import org.apache.cassandra.index.sasi.SASIIndex; import org.apache.cassandra.index.sasi.utils.RangeIterator; import org.apache.cassandra.db.marshal.Int32Type; @@ -70,6 +76,8 @@ public class PerSSTableIndexWriterTest extends SchemaLoader Tables.of(SchemaLoader.sasiCFMD(KS_NAME, CF_NAME)))); } + private static final ClusteringComparator CLUSTERING_COMPARATOR = new ClusteringComparator(LongType.instance); + @Test public void testPartialIndexWrites() throws Exception { @@ -86,19 +94,20 @@ public class PerSSTableIndexWriterTest extends SchemaLoader Descriptor descriptor = Descriptor.fromFilename(cfs.getSSTablePath(directory)); PerSSTableIndexWriter indexWriter = (PerSSTableIndexWriter) sasi.getFlushObserver(descriptor, OperationType.FLUSH); - SortedMap expectedKeys = new TreeMap<>(DecoratedKey.comparator); + SortedMap expectedKeys = new TreeMap<>(); for (int i = 0; i < maxKeys; i++) { ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, i)); - expectedKeys.put(cfs.metadata.partitioner.decorateKey(key), - BTreeRow.singleCellRow(Clustering.EMPTY, + Clustering clustering = Clustering.make(ByteBufferUtil.bytes(i * 1L)); + expectedKeys.put(new RowKey(cfs.metadata.partitioner.decorateKey(key), clustering, CLUSTERING_COMPARATOR), + BTreeRow.singleCellRow(clustering, BufferCell.live(column, timestamp, Int32Type.instance.decompose(i)))); } indexWriter.begin(); - Iterator> keyIterator = expectedKeys.entrySet().iterator(); + Iterator> keyIterator = expectedKeys.entrySet().iterator(); long position = 0; Set segments = new HashSet<>(); @@ -110,10 +119,11 @@ public class PerSSTableIndexWriterTest extends SchemaLoader if (!keyIterator.hasNext()) break outer; - Map.Entry key = keyIterator.next(); + Map.Entry key = keyIterator.next(); - indexWriter.startPartition(key.getKey(), position++); - indexWriter.nextUnfilteredCluster(key.getValue()); + indexWriter.startPartition(key.getKey().decoratedKey, position); + indexWriter.nextUnfilteredCluster(key.getValue(), position); + position++; } PerSSTableIndexWriter.Index index = indexWriter.getIndex(column); @@ -134,15 +144,12 @@ public class PerSSTableIndexWriterTest extends SchemaLoader for (String segment : segments) Assert.assertFalse(new File(segment).exists()); - OnDiskIndex index = new OnDiskIndex(new File(indexFile), Int32Type.instance, keyPosition -> { - ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, keyPosition)); - return cfs.metadata.partitioner.decorateKey(key); - }); + OnDiskIndex index = new OnDiskIndex(new File(indexFile), Int32Type.instance, new FakeKeyFetcher(cfs, keyFormat)); Assert.assertEquals(0, UTF8Type.instance.compare(index.minKey(), ByteBufferUtil.bytes(String.format(keyFormat, 0)))); Assert.assertEquals(0, UTF8Type.instance.compare(index.maxKey(), ByteBufferUtil.bytes(String.format(keyFormat, maxKeys - 1)))); - Set actualKeys = new HashSet<>(); + Set actualKeys = new HashSet<>(); int count = 0; for (OnDiskIndex.DataTerm term : index) { @@ -150,7 +157,7 @@ public class PerSSTableIndexWriterTest extends SchemaLoader while (tokens.hasNext()) { - for (DecoratedKey key : tokens.next()) + for (RowKey key : tokens.next()) actualKeys.add(key); } @@ -158,8 +165,8 @@ public class PerSSTableIndexWriterTest extends SchemaLoader } Assert.assertEquals(expectedKeys.size(), actualKeys.size()); - for (DecoratedKey key : expectedKeys.keySet()) - Assert.assertTrue(actualKeys.contains(key)); + for (RowKey key : expectedKeys.keySet()) + Assert.assertTrue("Key was not present : " + key, actualKeys.contains(key)); FileUtils.closeQuietly(index); } @@ -183,11 +190,14 @@ public class PerSSTableIndexWriterTest extends SchemaLoader indexWriter.begin(); indexWriter.indexes.put(column, indexWriter.newIndex(sasi.getIndex())); - populateSegment(cfs.metadata, indexWriter.getIndex(column), new HashMap>() + populateSegment(cfs.metadata, indexWriter.getIndex(column), new HashMap() {{ - put(now, new HashSet<>(Arrays.asList(0, 1))); - put(now + 1, new HashSet<>(Arrays.asList(2, 3))); - put(now + 2, new HashSet<>(Arrays.asList(4, 5, 6, 7, 8, 9))); + put(now, new KeyOffsets() {{ put(0, 0); put(1, 1); }}); + put(now + 1, new KeyOffsets() {{ put(2, 2); put(3, 3); }}); + put(now + 2, new KeyOffsets() {{ + put(4, 4); put(5, 5); put(6, 6); + put(7, 7); put(8, 8); put(9, 9); + }}); }}); Callable segmentBuilder = indexWriter.getIndex(column).scheduleSegmentFlush(false); @@ -197,15 +207,21 @@ public class PerSSTableIndexWriterTest extends SchemaLoader PerSSTableIndexWriter.Index index = indexWriter.getIndex(column); Random random = ThreadLocalRandom.current(); + Supplier offsetSupplier = () -> new KeyOffsets() {{ + put(random.nextInt(), random.nextInt()); + put(random.nextInt(), random.nextInt()); + put(random.nextInt(), random.nextInt()); + }}; + Set segments = new HashSet<>(); // now let's test multiple correct segments with yield incorrect final segment for (int i = 0; i < 3; i++) { - populateSegment(cfs.metadata, index, new HashMap>() + populateSegment(cfs.metadata, index, new HashMap() {{ - put(now, new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(), random.nextInt()))); - put(now + 1, new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(), random.nextInt()))); - put(now + 2, new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(), random.nextInt()))); + put(now, offsetSupplier.get()); + put(now + 1, offsetSupplier.get()); + put(now + 2, offsetSupplier.get()); }}); try @@ -236,16 +252,56 @@ public class PerSSTableIndexWriterTest extends SchemaLoader Assert.assertFalse(new File(index.outputFile).exists()); } - private static void populateSegment(CFMetaData metadata, PerSSTableIndexWriter.Index index, Map> data) + private static void populateSegment(CFMetaData metadata, PerSSTableIndexWriter.Index index, Map data) { - for (Map.Entry> value : data.entrySet()) + for (Map.Entry value : data.entrySet()) { ByteBuffer term = LongType.instance.decompose(value.getKey()); - for (Integer keyPos : value.getValue()) + for (LongObjectCursor cursor : value.getValue()) { - ByteBuffer key = ByteBufferUtil.bytes(String.format("key%06d", keyPos)); - index.add(term, metadata.partitioner.decorateKey(key), ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1)); + ByteBuffer key = ByteBufferUtil.bytes(String.format("key%06d", cursor.key)); + for (long rowOffset : cursor.value) + { + index.add(term, + metadata.partitioner.decorateKey(key), + ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1), + ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1)); + } } } } + + private final class FakeKeyFetcher implements KeyFetcher + { + private final ColumnFamilyStore cfs; + private final String keyFormat; + + public FakeKeyFetcher(ColumnFamilyStore cfs, String keyFormat) + { + this.cfs = cfs; + this.keyFormat = keyFormat; + } + + public DecoratedKey getPartitionKey(long keyPosition) + { + ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, keyPosition)); + return cfs.metadata.partitioner.decorateKey(key); + } + + public Clustering getClustering(long offset) + { + return Clustering.make(ByteBufferUtil.bytes(offset)); + } + + public RowKey getRowKey(long partitionOffset, long rowOffset) + { + return new RowKey(getPartitionKey(partitionOffset), getClustering(rowOffset), CLUSTERING_COMPARATOR); + } + } + + public IPartitioner getPartitioner() + { + return Murmur3Partitioner.instance; + } + }