Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4200818575 for ; Tue, 11 Aug 2015 15:46:33 +0000 (UTC) Received: (qmail 92037 invoked by uid 500); 11 Aug 2015 15:46:33 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 91893 invoked by uid 500); 11 Aug 2015 15:46:32 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 91783 invoked by uid 99); 11 Aug 2015 15:46:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Aug 2015 15:46:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AE844DFFA9; Tue, 11 Aug 2015 15:46:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: benedict@apache.org To: commits@cassandra.apache.org Date: Tue, 11 Aug 2015 15:46:33 -0000 Message-Id: <281497a9d35d44eb8d14aeb51e85a684@git.apache.org> In-Reply-To: <97950913fe444609b9a6b3a8744d2c87@git.apache.org> References: <97950913fe444609b9a6b3a8744d2c87@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] cassandra git commit: Improve SerializationHeader response serialization Improve SerializationHeader response serialization Replicas serving requests now retain the columns that were requested, and use these to efficiently encode the subset of columns it responds with. The expectation is that a majority of those requested will be present in the response. For fewer than 64 requested columns, or where all requested are returned, a bitmap of missing columns is sent (i.e. a 0 when all are present), encoded as a vint. Otherwise a count is sent, followed by a sequence of either present or missing columns, whichever is more efficient. patch by benedict; reviewed by ariel for CASSANDRA-9894 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fe388d40 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fe388d40 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fe388d40 Branch: refs/heads/trunk Commit: fe388d40c0b019c55c63c914ef5708f245af6cdd Parents: dea1509 Author: Benedict Elliott Smith Authored: Wed Jul 29 18:51:51 2015 +0100 Committer: Benedict Elliott Smith Committed: Tue Aug 11 17:45:49 2015 +0200 ---------------------------------------------------------------------- .../org/apache/cassandra/config/CFMetaData.java | 2 +- src/java/org/apache/cassandra/db/Columns.java | 230 +++++++++++++++- .../org/apache/cassandra/db/ReadCommand.java | 4 +- .../cassandra/db/ReadCommandVerbHandler.java | 2 +- .../org/apache/cassandra/db/ReadResponse.java | 87 ++++-- .../cassandra/db/SerializationHeader.java | 50 +++- .../partitions/ArrayBackedCachedPartition.java | 6 +- .../db/partitions/PartitionUpdate.java | 6 +- .../UnfilteredPartitionIterators.java | 8 +- .../rows/UnfilteredRowIteratorSerializer.java | 25 +- .../apache/cassandra/service/StorageProxy.java | 2 +- .../org/apache/cassandra/db/ColumnsTest.java | 275 +++++++++++++++---- .../cassandra/service/DataResolverTest.java | 2 +- 13 files changed, 582 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 7719587..0db3814 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -106,7 +106,7 @@ public final class CFMetaData private final Map columnMetadata = new ConcurrentHashMap<>(); // not on any hot path private volatile List partitionKeyColumns; // Always of size keyValidator.componentsCount, null padded if necessary private volatile List clusteringColumns; // Of size comparator.componentsCount or comparator.componentsCount -1, null padded if necessary - private volatile PartitionColumns partitionColumns; + private volatile PartitionColumns partitionColumns; // Always non-PK, non-clustering columns // For dense tables, this alias the single non-PK column the table contains (since it can only have one). We keep // that as convenience to access that column more easily (but we could replace calls by partitionColumns().iterator().next() http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/Columns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java index 231b529..c584b4c 100644 --- a/src/java/org/apache/cassandra/db/Columns.java +++ b/src/java/org/apache/cassandra/db/Columns.java @@ -26,6 +26,7 @@ import java.security.MessageDigest; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; +import net.nicoulaj.compilecommand.annotations.DontInline; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.cql3.ColumnIdentifier; @@ -295,9 +296,9 @@ public class Columns implements Iterable * * @return an iterator over all the columns of this object. */ - public Iterator iterator() + public BTreeSearchIterator iterator() { - return BTree.iterator(columns); + return BTree.slice(columns, Comparator.naturalOrder(), BTree.Dir.ASC); } /** @@ -311,8 +312,13 @@ public class Columns implements Iterable { // In wildcard selection, we want to return all columns in alphabetical order, // irregarding of whether they are complex or not - return Iterators.mergeSorted(ImmutableList.of(simpleColumns(), complexColumns()), - (s, c) -> s.name.compareTo(c.name)); + return Iterators. + mergeSorted(ImmutableList.of(simpleColumns(), complexColumns()), + (s, c) -> + { + assert !s.kind.isPrimaryKeyKind(); + return s.name.bytes.compareTo(c.name.bytes); + }); } /** @@ -420,5 +426,221 @@ public class Columns implements Iterable } return new Columns(builder.build()); } + + /** + * If both ends have a pre-shared superset of the columns we are serializing, we can send them much + * more efficiently. Both ends must provide the identically same set of columns. + */ + public void serializeSubset(Columns columns, Columns superset, DataOutputPlus out) throws IOException + { + /** + * We weight this towards small sets, and sets where the majority of items are present, since + * we expect this to mostly be used for serializing result sets. + * + * For supersets with fewer than 64 columns, we encode a bitmap of *missing* columns, + * which equates to a zero (single byte) when all columns are present, and otherwise + * a positive integer that can typically be vint encoded efficiently. + * + * If we have 64 or more columns, we cannot neatly perform a bitmap encoding, so we just switch + * to a vint encoded set of deltas, either adding or subtracting (whichever is most efficient). + * We indicate this switch by sending our bitmap with every bit set, i.e. -1L + */ + int columnCount = columns.columnCount(); + int supersetCount = superset.columnCount(); + if (columnCount == supersetCount) + { + out.writeUnsignedVInt(0); + } + else if (supersetCount < 64) + { + out.writeUnsignedVInt(encodeBitmap(columns, superset, supersetCount)); + } + else + { + serializeLargeSubset(columns, columnCount, superset, supersetCount, out); + } + } + + public long serializedSubsetSize(Columns columns, Columns superset) + { + int columnCount = columns.columnCount(); + int supersetCount = superset.columnCount(); + if (columnCount == supersetCount) + { + return TypeSizes.sizeofUnsignedVInt(0); + } + else if (supersetCount < 64) + { + return TypeSizes.sizeofUnsignedVInt(encodeBitmap(columns, superset, supersetCount)); + } + else + { + return serializeLargeSubsetSize(columns, columnCount, superset, supersetCount); + } + } + + public Columns deserializeSubset(Columns superset, DataInputPlus in) throws IOException + { + long encoded = in.readUnsignedVInt(); + if (encoded == -1L) + { + return deserializeLargeSubset(in, superset); + } + else if (encoded == 0L) + { + return superset; + } + else + { + BTree.Builder builder = BTree.builder(Comparator.naturalOrder()); + int firstComplexIdx = 0; + for (ColumnDefinition column : superset) + { + if ((encoded & 1) == 0) + { + builder.add(column); + if (column.isSimple()) + ++firstComplexIdx; + } + encoded >>>= 1; + } + return new Columns(builder.build(), firstComplexIdx); + } + } + + // encodes a 1 bit for every *missing* column, on the assumption presence is more common, + // and because this is consistent with encoding 0 to represent all present + private static long encodeBitmap(Columns columns, Columns superset, int supersetCount) + { + long bitmap = 0L; + BTreeSearchIterator iter = superset.iterator(); + // the index we would encounter next if all columns are present + int expectIndex = 0; + for (ColumnDefinition column : columns) + { + if (iter.next(column) == null) + throw new IllegalStateException(); + + int currentIndex = iter.indexOfCurrent(); + int count = currentIndex - expectIndex; + // (1L << count) - 1 gives us count bits set at the bottom of the register + // so << expectIndex moves these bits to start at expectIndex, which is where our missing portion + // begins (assuming count > 0; if not, we're adding 0 bits, so it's a no-op) + bitmap |= ((1L << count) - 1) << expectIndex; + expectIndex = currentIndex + 1; + } + int count = supersetCount - expectIndex; + bitmap |= ((1L << count) - 1) << expectIndex; + return bitmap; + } + + @DontInline + private void serializeLargeSubset(Columns columns, int columnCount, Columns superset, int supersetCount, DataOutputPlus out) throws IOException + { + // write flag indicating we're in lengthy mode + out.writeUnsignedVInt(-1L); + out.writeUnsignedVInt(supersetCount - columnCount); + BTreeSearchIterator iter = superset.iterator(); + if (columnCount < supersetCount / 2) + { + // write present columns + for (ColumnDefinition column : columns) + { + if (iter.next(column) == null) + throw new IllegalStateException(); + out.writeUnsignedVInt(iter.indexOfCurrent()); + } + } + else + { + // write missing columns + int prev = -1; + for (ColumnDefinition column : columns) + { + if (iter.next(column) == null) + throw new IllegalStateException(); + int cur = iter.indexOfCurrent(); + while (++prev != cur) + out.writeUnsignedVInt(prev); + } + while (++prev != supersetCount) + out.writeUnsignedVInt(prev); + } + } + + @DontInline + private Columns deserializeLargeSubset(DataInputPlus in, Columns superset) throws IOException + { + int supersetCount = superset.columnCount(); + int delta = (int) in.readUnsignedVInt(); + int columnCount = supersetCount - delta; + + BTree.Builder builder = BTree.builder(Comparator.naturalOrder()); + if (columnCount < supersetCount / 2) + { + for (int i = 0 ; i < columnCount ; i++) + { + int idx = (int) in.readUnsignedVInt(); + builder.add(BTree.findByIndex(superset.columns, idx)); + } + } + else + { + Iterator iter = superset.iterator(); + int idx = 0; + int skipped = 0; + while (true) + { + int nextMissingIndex = skipped < delta ? (int)in.readUnsignedVInt() : supersetCount; + while (idx < nextMissingIndex) + { + ColumnDefinition def = iter.next(); + builder.add(def); + idx++; + } + if (idx == supersetCount) + break; + iter.next(); + idx++; + skipped++; + } + } + return new Columns(builder.build()); + } + + @DontInline + private int serializeLargeSubsetSize(Columns columns, int columnCount, Columns superset, int supersetCount) + { + // write flag indicating we're in lengthy mode + int size = TypeSizes.sizeofUnsignedVInt(-1L) + TypeSizes.sizeofUnsignedVInt(supersetCount - columnCount); + BTreeSearchIterator iter = superset.iterator(); + if (columnCount < supersetCount / 2) + { + // write present columns + for (ColumnDefinition column : columns) + { + if (iter.next(column) == null) + throw new IllegalStateException(); + size += TypeSizes.sizeofUnsignedVInt(iter.indexOfCurrent()); + } + } + else + { + // write missing columns + int prev = -1; + for (ColumnDefinition column : columns) + { + if (iter.next(column) == null) + throw new IllegalStateException(); + int cur = iter.indexOfCurrent(); + while (++prev != cur) + size += TypeSizes.sizeofUnsignedVInt(prev); + } + while (++prev != supersetCount) + size += TypeSizes.sizeofUnsignedVInt(prev); + } + return size; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 5c40492..4830124 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -251,11 +251,11 @@ public abstract class ReadCommand implements ReadQuery protected abstract int oldestUnrepairedTombstone(); - public ReadResponse createResponse(UnfilteredPartitionIterator iterator) + public ReadResponse createResponse(UnfilteredPartitionIterator iterator, ColumnFilter selection) { return isDigestQuery() ? ReadResponse.createDigestResponse(iterator) - : ReadResponse.createDataResponse(iterator); + : ReadResponse.createDataResponse(iterator, selection); } protected SecondaryIndexSearcher getIndexSearcher(ColumnFamilyStore cfs) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java index 9cde8dc..72a6fa8 100644 --- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java @@ -44,7 +44,7 @@ public class ReadCommandVerbHandler implements IVerbHandler ReadResponse response; try (ReadOrderGroup opGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(opGroup)) { - response = command.createResponse(iterator); + response = command.createResponse(iterator, command.columnFilter()); } MessageOut reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, serializer()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/ReadResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index b3cc725..6f418f9 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -24,8 +24,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.io.IVersionedSerializer; @@ -34,7 +37,6 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -52,9 +54,15 @@ public abstract class ReadResponse this.metadata = metadata; } - public static ReadResponse createDataResponse(UnfilteredPartitionIterator data) + public static ReadResponse createDataResponse(UnfilteredPartitionIterator data, ColumnFilter selection) + { + return new LocalDataResponse(data, selection); + } + + @VisibleForTesting + public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data, ColumnFilter selection) { - return new DataResponse(data); + return new RemoteDataResponse(LocalDataResponse.build(data, selection)); } public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data) @@ -63,7 +71,6 @@ public abstract class ReadResponse } public abstract UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command); - public abstract ByteBuffer digest(CFMetaData metadata, ReadCommand command); public abstract boolean isDigestQuery(); @@ -102,27 +109,22 @@ public abstract class ReadResponse } } - private static class DataResponse extends ReadResponse + // built on the owning node responding to a query + private static class LocalDataResponse extends DataResponse { - // The response, serialized in the current messaging version - private final ByteBuffer data; - private final SerializationHelper.Flag flag; - - private DataResponse(ByteBuffer data) + private final ColumnFilter received; + private LocalDataResponse(UnfilteredPartitionIterator iter, ColumnFilter received) { - super(null); // This is never call on the serialization side, where we actually care of the metadata. - this.data = data; - this.flag = SerializationHelper.Flag.FROM_REMOTE; + super(iter.metadata(), build(iter, received), SerializationHelper.Flag.LOCAL); + this.received = received; } - private DataResponse(UnfilteredPartitionIterator iter) + private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection) { - super(iter.metadata()); try (DataOutputBuffer buffer = new DataOutputBuffer()) { - UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, buffer, MessagingService.current_version); - this.data = buffer.buffer(); - this.flag = SerializationHelper.Flag.LOCAL; + UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, selection, buffer, MessagingService.current_version); + return buffer.buffer(); } catch (IOException e) { @@ -131,12 +133,57 @@ public abstract class ReadResponse } } + protected ColumnFilter selection(ColumnFilter sent) + { + // we didn't send anything, so we don't provide it in the serializer methods, but use the + // object's reference to the original column filter we received + assert sent == null | sent == received; + return received; + } + } + + // built on the coordinator node receiving a response + private static class RemoteDataResponse extends DataResponse + { + protected RemoteDataResponse(ByteBuffer data) + { + super(null, data, SerializationHelper.Flag.FROM_REMOTE); + } + + protected ColumnFilter selection(ColumnFilter sent) + { + // we should always know what we sent, and should provide it in digest() and makeIterator() + assert sent != null; + return sent; + } + } + + static abstract class DataResponse extends ReadResponse + { + // TODO: can the digest be calculated over the raw bytes now? + // The response, serialized in the current messaging version + private final ByteBuffer data; + private final SerializationHelper.Flag flag; + + protected DataResponse(CFMetaData metadata, ByteBuffer data, SerializationHelper.Flag flag) + { + super(metadata); + this.data = data; + this.flag = flag; + } + + protected abstract ColumnFilter selection(ColumnFilter filter); + public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command) { try { DataInputPlus in = new DataInputBuffer(data, true); - return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in, MessagingService.current_version, metadata, flag); + return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in, + MessagingService.current_version, + metadata, + selection(command.columnFilter()), + flag); } catch (IOException e) { @@ -307,7 +354,7 @@ public abstract class ReadResponse assert version == MessagingService.VERSION_30; ByteBuffer data = ByteBufferUtil.readWithVIntLength(in); - return new DataResponse(data); + return new RemoteDataResponse(data); } public long serializedSize(ReadResponse response, int version) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/SerializationHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java index 2326f1e..88f6832 100644 --- a/src/java/org/apache/cassandra/db/SerializationHeader.java +++ b/src/java/org/apache/cassandra/db/SerializationHeader.java @@ -27,6 +27,7 @@ import com.google.common.base.Function; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.BytesType; @@ -372,35 +373,62 @@ public class SerializationHeader public static class Serializer implements IMetadataComponentSerializer { - public void serializeForMessaging(SerializationHeader header, DataOutputPlus out, boolean hasStatic) throws IOException + public void serializeForMessaging(SerializationHeader header, ColumnFilter selection, DataOutputPlus out, boolean hasStatic) throws IOException { EncodingStats.serializer.serialize(header.stats, out); - if (hasStatic) - Columns.serializer.serialize(header.columns.statics, out); - Columns.serializer.serialize(header.columns.regulars, out); + if (selection == null) + { + if (hasStatic) + Columns.serializer.serialize(header.columns.statics, out); + Columns.serializer.serialize(header.columns.regulars, out); + } + else + { + if (hasStatic) + Columns.serializer.serializeSubset(header.columns.statics, selection.fetchedColumns().statics, out); + Columns.serializer.serializeSubset(header.columns.regulars, selection.fetchedColumns().regulars, out); + } } - public SerializationHeader deserializeForMessaging(DataInputPlus in, CFMetaData metadata, boolean hasStatic) throws IOException + public SerializationHeader deserializeForMessaging(DataInputPlus in, CFMetaData metadata, ColumnFilter selection, boolean hasStatic) throws IOException { EncodingStats stats = EncodingStats.serializer.deserialize(in); AbstractType keyType = metadata.getKeyValidator(); List> clusteringTypes = typesOf(metadata.clusteringColumns()); - Columns statics = hasStatic ? Columns.serializer.deserialize(in, metadata) : Columns.NONE; - Columns regulars = Columns.serializer.deserialize(in, metadata); + Columns statics, regulars; + if (selection == null) + { + statics = hasStatic ? Columns.serializer.deserialize(in, metadata) : Columns.NONE; + regulars = Columns.serializer.deserialize(in, metadata); + } + else + { + statics = hasStatic ? Columns.serializer.deserializeSubset(selection.fetchedColumns().statics, in) : Columns.NONE; + regulars = Columns.serializer.deserializeSubset(selection.fetchedColumns().regulars, in); + } return new SerializationHeader(keyType, clusteringTypes, new PartitionColumns(statics, regulars), stats, null); } - public long serializedSizeForMessaging(SerializationHeader header, boolean hasStatic) + public long serializedSizeForMessaging(SerializationHeader header, ColumnFilter selection, boolean hasStatic) { long size = EncodingStats.serializer.serializedSize(header.stats); - if (hasStatic) - size += Columns.serializer.serializedSize(header.columns.statics); - size += Columns.serializer.serializedSize(header.columns.regulars); + if (selection == null) + { + if (hasStatic) + size += Columns.serializer.serializedSize(header.columns.statics); + size += Columns.serializer.serializedSize(header.columns.regulars); + } + else + { + if (hasStatic) + size += Columns.serializer.serializedSubsetSize(header.columns.statics, selection.fetchedColumns().statics); + size += Columns.serializer.serializedSubsetSize(header.columns.regulars, selection.fetchedColumns().regulars); + } return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java index a3c8768..fab8591 100644 --- a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java +++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java @@ -216,7 +216,7 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements CFMetaData.serializer.serialize(partition.metadata(), out, version); try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator()) { - UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version, p.rowCount()); + UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, p.rowCount()); } } @@ -238,7 +238,7 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements int nonExpiringLiveCells = in.readInt(); CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); - UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, metadata, SerializationHelper.Flag.LOCAL); + UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, in, version, SerializationHelper.Flag.LOCAL); assert !header.isReversed && header.rowEstimate >= 0; MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(header.partitionDeletion, metadata.comparator, false); @@ -286,7 +286,7 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements + TypeSizes.sizeof(p.nonTombstoneCellCount) + TypeSizes.sizeof(p.nonExpiringLiveCells) + CFMetaData.serializer.serializedSize(partition.metadata(), version) - + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, MessagingService.current_version, p.rowCount()); + + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, MessagingService.current_version, p.rowCount()); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index bb73929..e6d51e5 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -716,7 +716,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition else { CFMetaData.serializer.serialize(update.metadata(), out, version); - UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version, update.rows.size()); + UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, update.rows.size()); } } } @@ -752,7 +752,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition private static PartitionUpdate deserialize30(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException { CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); - UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, metadata, flag); + UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, in, version, flag); if (header.isEmpty) return emptyUpdate(metadata, header.key); @@ -802,7 +802,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition return LegacyLayout.serializedSizeAsLegacyPartition(iter, version); return CFMetaData.serializer.serializedSize(update.metadata(), version) - + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, version, update.rows.size()); + + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, version, update.rows.size()); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java index 0418e7f..f7ee5ee 100644 --- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java @@ -361,7 +361,7 @@ public abstract class UnfilteredPartitionIterators */ public static class Serializer { - public void serialize(UnfilteredPartitionIterator iter, DataOutputPlus out, int version) throws IOException + public void serialize(UnfilteredPartitionIterator iter, ColumnFilter selection, DataOutputPlus out, int version) throws IOException { assert version >= MessagingService.VERSION_30; // We handle backward compatibility directy in ReadResponse.LegacyRangeSliceReplySerializer @@ -371,13 +371,13 @@ public abstract class UnfilteredPartitionIterators out.writeBoolean(true); try (UnfilteredRowIterator partition = iter.next()) { - UnfilteredRowIteratorSerializer.serializer.serialize(partition, out, version); + UnfilteredRowIteratorSerializer.serializer.serialize(partition, selection, out, version); } } out.writeBoolean(false); } - public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final CFMetaData metadata, final SerializationHelper.Flag flag) throws IOException + public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final CFMetaData metadata, final ColumnFilter selection, final SerializationHelper.Flag flag) throws IOException { assert version >= MessagingService.VERSION_30; // We handle backward compatibility directy in ReadResponse.LegacyRangeSliceReplySerializer final boolean isForThrift = in.readBoolean(); @@ -428,7 +428,7 @@ public abstract class UnfilteredPartitionIterators try { nextReturned = true; - next = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, flag); + next = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, selection, flag); return next; } catch (IOException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java index 531bd26..f17ccca 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; @@ -71,22 +72,22 @@ public class UnfilteredRowIteratorSerializer public static final UnfilteredRowIteratorSerializer serializer = new UnfilteredRowIteratorSerializer(); // Should only be used for the on-wire format. - public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version) throws IOException + public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version) throws IOException { - serialize(iterator, out, version, -1); + serialize(iterator, selection, out, version, -1); } // Should only be used for the on-wire format. - public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version, int rowEstimate) throws IOException + public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException { SerializationHeader header = new SerializationHeader(iterator.metadata(), iterator.columns(), iterator.stats()); - serialize(iterator, out, header, version, rowEstimate); + serialize(iterator, header, selection, out, version, rowEstimate); } // Should only be used for the on-wire format. - public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, SerializationHeader header, int version, int rowEstimate) throws IOException + public void serialize(UnfilteredRowIterator iterator, SerializationHeader header, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException { ByteBufferUtil.writeWithVIntLength(iterator.partitionKey().getKey(), out); @@ -113,7 +114,7 @@ public class UnfilteredRowIteratorSerializer out.writeByte((byte)flags); - SerializationHeader.serializer.serializeForMessaging(header, out, hasStatic); + SerializationHeader.serializer.serializeForMessaging(header, selection, out, hasStatic); if (!partitionDeletion.isLive()) header.writeDeletionTime(partitionDeletion, out); @@ -131,7 +132,7 @@ public class UnfilteredRowIteratorSerializer // Please note that this consume the iterator, and as such should not be called unless we have a simple way to // recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate/ArrayBackedCachedPartition. - public long serializedSize(UnfilteredRowIterator iterator, int version, int rowEstimate) + public long serializedSize(UnfilteredRowIterator iterator, ColumnFilter selection, int version, int rowEstimate) { SerializationHeader header = new SerializationHeader(iterator.metadata(), iterator.columns(), @@ -149,7 +150,7 @@ public class UnfilteredRowIteratorSerializer Row staticRow = iterator.staticRow(); boolean hasStatic = staticRow != Rows.EMPTY_STATIC_ROW; - size += SerializationHeader.serializer.serializedSizeForMessaging(header, hasStatic); + size += SerializationHeader.serializer.serializedSizeForMessaging(header, selection, hasStatic); if (!partitionDeletion.isLive()) size += header.deletionTimeSerializedSize(partitionDeletion); @@ -167,7 +168,7 @@ public class UnfilteredRowIteratorSerializer return size; } - public Header deserializeHeader(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag) throws IOException + public Header deserializeHeader(CFMetaData metadata, ColumnFilter selection, DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException { DecoratedKey key = metadata.decorateKey(ByteBufferUtil.readWithVIntLength(in)); int flags = in.readUnsignedByte(); @@ -182,7 +183,7 @@ public class UnfilteredRowIteratorSerializer boolean hasStatic = (flags & HAS_STATIC_ROW) != 0; boolean hasRowEstimate = (flags & HAS_ROW_ESTIMATE) != 0; - SerializationHeader header = SerializationHeader.serializer.deserializeForMessaging(in, metadata, hasStatic); + SerializationHeader header = SerializationHeader.serializer.deserializeForMessaging(in, metadata, selection, hasStatic); DeletionTime partitionDeletion = hasPartitionDeletion ? header.readDeletionTime(in) : DeletionTime.LIVE; @@ -220,9 +221,9 @@ public class UnfilteredRowIteratorSerializer }; } - public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag) throws IOException + public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, ColumnFilter selection, SerializationHelper.Flag flag) throws IOException { - return deserialize(in, version, metadata, flag, deserializeHeader(in, version, metadata, flag)); + return deserialize(in, version, metadata, flag, deserializeHeader(metadata, selection, in, version, flag)); } public static class Header http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 0846ef1..fc917f0 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1689,7 +1689,7 @@ public class StorageProxy implements StorageProxyMBean { try (ReadOrderGroup orderGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(orderGroup)) { - handler.response(command.createResponse(iterator)); + handler.response(command.createResponse(iterator, command.columnFilter())); } MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/test/unit/org/apache/cassandra/db/ColumnsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ColumnsTest.java b/test/unit/org/apache/cassandra/db/ColumnsTest.java index 5447fcc..a0ade96 100644 --- a/test/unit/org/apache/cassandra/db/ColumnsTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnsTest.java @@ -18,13 +18,12 @@ */ package org.apache.cassandra.db; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; +import java.io.IOException; +import java.util.*; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Predicate; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import org.junit.AfterClass; @@ -36,6 +35,8 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.marshal.SetType; import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.btree.BTreeSet; @@ -44,42 +45,39 @@ public class ColumnsTest private static CFMetaData cfMetaData = MockSchema.newCFS().metadata; + // this tests most of our functionality, since each subset we perform + // reasonably comprehensive tests of basic functionality against @Test public void testContainsWithoutAndMergeTo() { - for (RandomColumns randomColumns : random()) - testContainsWithoutAndMergeTo(randomColumns.columns, randomColumns.definitions); + for (ColumnsCheck randomColumns : randomSmall(true)) + testContainsWithoutAndMergeTo(randomColumns); } - private void testContainsWithoutAndMergeTo(Columns columns, List definitions) + private void testContainsWithoutAndMergeTo(ColumnsCheck input) { // pick some arbitrary groupings of columns to remove at-once (to avoid factorial complexity) // whatever is left after each removal, we perform this logic on again, recursively - List> removeGroups = shuffleAndGroup(Lists.newArrayList(definitions)); + List> removeGroups = shuffleAndGroup(Lists.newArrayList(input.definitions)); for (List defs : removeGroups) { - Columns subset = columns; - for (ColumnDefinition def : defs) - subset = subset.without(def); - Assert.assertEquals(columns.columnCount() - defs.size(), subset.columnCount()); - List remainingDefs = Lists.newArrayList(columns); - remainingDefs.removeAll(defs); + ColumnsCheck subset = input.remove(defs); // test contents after .without - assertContents(subset, remainingDefs); + subset.assertContents(); // test .contains - assertSubset(columns, subset); + assertSubset(input.columns, subset.columns); // test .mergeTo - Columns otherSubset = columns; - for (ColumnDefinition def : remainingDefs) + Columns otherSubset = input.columns; + for (ColumnDefinition def : subset.definitions) { otherSubset = otherSubset.without(def); - assertContents(otherSubset.mergeTo(subset), definitions); + assertContents(otherSubset.mergeTo(subset.columns), input.definitions); } - testContainsWithoutAndMergeTo(subset, remainingDefs); + testContainsWithoutAndMergeTo(subset); } } @@ -90,6 +88,67 @@ public class ColumnsTest Assert.assertFalse(subset.contains(superset)); } + @Test + public void testSerialize() throws IOException + { + testSerialize(Columns.NONE, Collections.emptyList()); + for (ColumnsCheck randomColumns : randomSmall(false)) + testSerialize(randomColumns.columns, randomColumns.definitions); + } + + private void testSerialize(Columns columns, List definitions) throws IOException + { + try (DataOutputBuffer out = new DataOutputBuffer()) + { + Columns.serializer.serialize(columns, out); + Assert.assertEquals(Columns.serializer.serializedSize(columns), out.buffer().remaining()); + Columns deserialized = Columns.serializer.deserialize(new DataInputBuffer(out.buffer(), false), mock(columns)); + Assert.assertEquals(columns, deserialized); + Assert.assertEquals(columns.hashCode(), deserialized.hashCode()); + assertContents(deserialized, definitions); + } + } + + @Test + public void testSerializeSmallSubset() throws IOException + { + for (ColumnsCheck randomColumns : randomSmall(true)) + testSerializeSubset(randomColumns); + } + + @Test + public void testSerializeHugeSubset() throws IOException + { + for (ColumnsCheck randomColumns : randomHuge()) + testSerializeSubset(randomColumns); + } + + private void testSerializeSubset(ColumnsCheck input) throws IOException + { + testSerializeSubset(input.columns, input.columns, input.definitions); + testSerializeSubset(input.columns, Columns.NONE, Collections.emptyList()); + List> removeGroups = shuffleAndGroup(Lists.newArrayList(input.definitions)); + for (List defs : removeGroups) + { + Collections.sort(defs); + ColumnsCheck subset = input.remove(defs); + testSerializeSubset(input.columns, subset.columns, subset.definitions); + } + } + + private void testSerializeSubset(Columns superset, Columns subset, List subsetDefinitions) throws IOException + { + try (DataOutputBuffer out = new DataOutputBuffer()) + { + Columns.serializer.serializeSubset(subset, superset, out); + Assert.assertEquals(Columns.serializer.serializedSubsetSize(subset, superset), out.buffer().remaining()); + Columns deserialized = Columns.serializer.deserializeSubset(superset, new DataInputBuffer(out.buffer(), false)); + Assert.assertEquals(subset, deserialized); + Assert.assertEquals(subset.hashCode(), deserialized.hashCode()); + assertContents(deserialized, subsetDefinitions); + } + } + private static void assertContents(Columns columns, List defs) { Assert.assertEquals(defs, Lists.newArrayList(columns)); @@ -109,6 +168,7 @@ public class ColumnsTest { hasSimple = true; Assert.assertEquals(i, columns.simpleIdx(def)); + Assert.assertEquals(def, columns.getSimple(i)); Assert.assertEquals(def, simple.next()); ++firstComplexIdx; } @@ -117,6 +177,7 @@ public class ColumnsTest Assert.assertFalse(simple.hasNext()); hasComplex = true; Assert.assertEquals(i - firstComplexIdx, columns.complexIdx(def)); + Assert.assertEquals(def, columns.getComplex(i - firstComplexIdx)); Assert.assertEquals(def, complex.next()); } i++; @@ -127,6 +188,16 @@ public class ColumnsTest Assert.assertFalse(all.hasNext()); Assert.assertEquals(hasSimple, columns.hasSimple()); Assert.assertEquals(hasComplex, columns.hasComplex()); + + // check select order + if (!columns.hasSimple() || !columns.getSimple(0).kind.isPrimaryKeyKind()) + { + List selectOrderDefs = new ArrayList<>(defs); + Collections.sort(selectOrderDefs, (a, b) -> a.name.bytes.compareTo(b.name.bytes)); + List selectOrderColumns = new ArrayList<>(); + Iterators.addAll(selectOrderColumns, columns.selectOrderIterator()); + Assert.assertEquals(selectOrderDefs, selectOrderColumns); + } } private static List> shuffleAndGroup(List list) @@ -141,7 +212,7 @@ public class ColumnsTest list.set(j, v); } - // then group + // then group (logarithmically, to ensure our recursive functions don't explode the state space) List> result = new ArrayList<>(); for (int i = 0 ; i < list.size() ;) { @@ -162,83 +233,179 @@ public class ColumnsTest MockSchema.cleanup(); } - private static class RandomColumns + private static class ColumnsCheck { final Columns columns; final List definitions; - private RandomColumns(List definitions) + private ColumnsCheck(Columns columns, List definitions) + { + this.columns = columns; + this.definitions = definitions; + } + + private ColumnsCheck(List definitions) { this.columns = Columns.from(BTreeSet.of(definitions)); this.definitions = definitions; } + + ColumnsCheck remove(List remove) + { + Columns subset = columns; + for (ColumnDefinition def : remove) + subset = subset.without(def); + Assert.assertEquals(columns.columnCount() - remove.size(), subset.columnCount()); + List remainingDefs = Lists.newArrayList(columns); + remainingDefs.removeAll(remove); + return new ColumnsCheck(subset, remainingDefs); + } + + void assertContents() + { + ColumnsTest.assertContents(columns, definitions); + } + } + + private static List randomHuge() + { + List result = new ArrayList<>(); + ThreadLocalRandom random = ThreadLocalRandom.current(); + result.add(randomHuge(random.nextInt(64, 128), 0, 0, 0)); + result.add(randomHuge(0, random.nextInt(64, 128), 0, 0)); + result.add(randomHuge(0, 0, random.nextInt(64, 128), 0)); + result.add(randomHuge(0, 0, 0, random.nextInt(64, 128))); + result.add(randomHuge(random.nextInt(64, 128), random.nextInt(64, 128), 0, 0)); + result.add(randomHuge(0, random.nextInt(64, 128), random.nextInt(64, 128), 0)); + result.add(randomHuge(0, 0, random.nextInt(64, 128), random.nextInt(64, 128))); + result.add(randomHuge(random.nextInt(64, 128), random.nextInt(64, 128), random.nextInt(64, 128), 0)); + result.add(randomHuge(0, random.nextInt(64, 128), random.nextInt(64, 128), random.nextInt(64, 128))); + result.add(randomHuge(random.nextInt(64, 128), random.nextInt(64, 128), random.nextInt(64, 128), random.nextInt(64, 128))); + return result; } - private static List random() + private static List randomSmall(boolean permitMultiplePartitionKeys) { - List random = new ArrayList<>(); + List random = new ArrayList<>(); for (int i = 1 ; i <= 3 ; i++) { - random.add(random(i, i - 1, i - 1, i - 1)); - random.add(random(i - 1, i, i - 1, i - 1)); - random.add(random(i - 1, i - 1, i, i - 1)); - random.add(random(i - 1, i - 1, i - 1, i)); + int pkCount = permitMultiplePartitionKeys ? i - 1 : 1; + if (permitMultiplePartitionKeys) + random.add(randomSmall(i, i - 1, i - 1, i - 1)); + random.add(randomSmall(0, 0, i, i)); // both kinds of regular, no PK + random.add(randomSmall(pkCount, i, i - 1, i - 1)); // PK + clustering, few or none regular + random.add(randomSmall(pkCount, i - 1, i, i - 1)); // PK + few or none clustering, some regular, few or none complex + random.add(randomSmall(pkCount, i - 1, i - 1, i)); // PK + few or none clustering or regular, some complex } return random; } - private static RandomColumns random(int pkCount, int clCount, int regularCount, int complexCount) + private static ColumnsCheck randomSmall(int pkCount, int clCount, int regularCount, int complexCount) { - List chars = new ArrayList<>(); + List names = new ArrayList<>(); for (char c = 'a' ; c <= 'z' ; c++) - chars.add(c); + names .add(Character.toString(c)); List result = new ArrayList<>(); - addPartition(select(chars, pkCount), result); - addClustering(select(chars, clCount), result); - addRegular(select(chars, regularCount), result); - addComplex(select(chars, complexCount), result); + addPartition(select(names, pkCount), result); + addClustering(select(names, clCount), result); + addRegular(select(names, regularCount), result); + addComplex(select(names, complexCount), result); Collections.sort(result); - return new RandomColumns(result); + return new ColumnsCheck(result); } - private static List select(List chars, int count) + private static List select(List names, int count) { - List result = new ArrayList<>(); + List result = new ArrayList<>(); ThreadLocalRandom random = ThreadLocalRandom.current(); for (int i = 0 ; i < count ; i++) { - int v = random.nextInt(chars.size()); - result.add(chars.get(v)); - chars.remove(v); + int v = random.nextInt(names.size()); + result.add(names.get(v)); + names.remove(v); } return result; } - private static void addPartition(List chars, List results) + private static ColumnsCheck randomHuge(int pkCount, int clCount, int regularCount, int complexCount) { - addSimple(ColumnDefinition.Kind.PARTITION_KEY, chars, results); + List result = new ArrayList<>(); + Set usedNames = new HashSet<>(); + addPartition(names(pkCount, usedNames), result); + addClustering(names(clCount, usedNames), result); + addRegular(names(regularCount, usedNames), result); + addComplex(names(complexCount, usedNames), result); + Collections.sort(result); + return new ColumnsCheck(result); } - private static void addClustering(List chars, List results) + private static List names(int count, Set usedNames) { - addSimple(ColumnDefinition.Kind.CLUSTERING, chars, results); + List names = new ArrayList<>(); + StringBuilder builder = new StringBuilder(); + ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = 0 ; i < count ; i++) + { + builder.setLength(0); + for (int j = 0 ; j < 3 || usedNames.contains(builder.toString()) ; j++) + builder.append((char) random.nextInt('a', 'z' + 1)); + String name = builder.toString(); + names.add(name); + usedNames.add(name); + } + return names; } - private static void addRegular(List chars, List results) + private static void addPartition(List names, List results) { - addSimple(ColumnDefinition.Kind.REGULAR, chars, results); + for (String name : names) + results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(name), UTF8Type.instance, null, ColumnDefinition.Kind.PARTITION_KEY)); } - private static void addSimple(ColumnDefinition.Kind kind, List chars, List results) + private static void addClustering(List names, List results) { - for (Character c : chars) - results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(c.toString()), UTF8Type.instance, null, kind)); + int i = 0; + for (String name : names) + results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(name), UTF8Type.instance, i++, ColumnDefinition.Kind.CLUSTERING)); } - private static void addComplex(List chars, List results) + private static void addRegular(List names, List results) { - for (Character c : chars) - results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(c.toString()), SetType.getInstance(UTF8Type.instance, true), null, ColumnDefinition.Kind.REGULAR)); + for (String name : names) + results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(name), UTF8Type.instance, null, ColumnDefinition.Kind.REGULAR)); + } + + private static void addComplex(List names, List results) + { + for (String name : names) + results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(name), SetType.getInstance(UTF8Type.instance, true), null, ColumnDefinition.Kind.REGULAR)); + } + + private static CFMetaData mock(Columns columns) + { + if (columns.isEmpty()) + return cfMetaData; + CFMetaData.Builder builder = CFMetaData.Builder.create(cfMetaData.ksName, cfMetaData.cfName); + boolean hasPartitionKey = false; + for (ColumnDefinition def : columns) + { + switch (def.kind) + { + case PARTITION_KEY: + builder.addPartitionKey(def.name, def.type); + hasPartitionKey = true; + break; + case CLUSTERING: + builder.addClusteringColumn(def.name, def.type); + break; + case REGULAR: + builder.addRegularColumn(def.name, def.type); + break; + } + } + if (!hasPartitionKey) + builder.addPartitionKey("219894021498309239rufejsfjdksfjheiwfhjes", UTF8Type.instance); + return builder.build(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/test/unit/org/apache/cassandra/service/DataResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java index efd3504..0804bfb 100644 --- a/test/unit/org/apache/cassandra/service/DataResolverTest.java +++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java @@ -491,7 +491,7 @@ public class DataResolverTest public MessageIn readResponseMessage(InetAddress from, UnfilteredPartitionIterator partitionIterator) { return MessageIn.create(from, - ReadResponse.createDataResponse(partitionIterator), + ReadResponse.createRemoteDataResponse(partitionIterator, command.columnFilter()), Collections.EMPTY_MAP, MessagingService.Verb.REQUEST_RESPONSE, MessagingService.current_version);