Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 69627200C18 for ; Fri, 27 Jan 2017 09:32:47 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 669AD160B47; Fri, 27 Jan 2017 08:32:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5B5E9160B5B for ; Fri, 27 Jan 2017 09:32:46 +0100 (CET) Received: (qmail 12255 invoked by uid 500); 27 Jan 2017 08:32:45 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 11962 invoked by uid 99); 27 Jan 2017 08:32:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Jan 2017 08:32:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 03CD6DF989; Fri, 27 Jan 2017 08:32:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: slebresne@apache.org To: commits@cassandra.apache.org Date: Fri, 27 Jan 2017 08:32:49 -0000 Message-Id: <84e1e99661b54e21aebc3f62865695d8@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [6/6] cassandra git commit: Merge commit 'b234ca3ee4101a34b761948d633ef4a12aa70ca2' into trunk archived-at: Fri, 27 Jan 2017 08:32:47 -0000 Merge commit 'b234ca3ee4101a34b761948d633ef4a12aa70ca2' into trunk * commit 'b234ca3ee4101a34b761948d633ef4a12aa70ca2': Revert CASSANDRA-12768 (and update fix of CASSANDRA-12694) due to upgrade regression Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/26eab454 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/26eab454 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/26eab454 Branch: refs/heads/trunk Commit: 26eab45465a140be5ef5fae45dc8d7cbb33192b9 Parents: 2987a70 b234ca3 Author: Sylvain Lebresne Authored: Fri Jan 27 09:27:44 2017 +0100 Committer: Sylvain Lebresne Committed: Fri Jan 27 09:28:41 2017 +0100 ---------------------------------------------------------------------- .../org/apache/cassandra/db/ReadResponse.java | 26 ++++++------ .../cassandra/db/filter/ColumnFilter.java | 44 ++++++++++++++++++-- .../apache/cassandra/net/MessagingService.java | 3 +- 3 files changed, 55 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/26eab454/src/java/org/apache/cassandra/db/ReadResponse.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ReadResponse.java index 107bcc4,cca21f8..7cf04a4 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@@ -52,7 -79,7 +52,7 @@@ public abstract class ReadRespons @VisibleForTesting public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data, ReadCommand command) { -- return new RemoteDataResponse(LocalDataResponse.build(data, command.columnFilter())); ++ return new RemoteDataResponse(LocalDataResponse.build(data, command.columnFilter()), MessagingService.current_version); } public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data, ReadCommand command) @@@ -108,7 -135,7 +108,7 @@@ { private LocalDataResponse(UnfilteredPartitionIterator iter, ReadCommand command) { - super(build(iter, command.columnFilter()), SerializationHelper.Flag.LOCAL); - super(command, build(iter, command.columnFilter()), SerializationHelper.Flag.LOCAL); ++ super(build(iter, command.columnFilter()), MessagingService.current_version, SerializationHelper.Flag.LOCAL); } private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection) @@@ -129,9 -156,9 +129,9 @@@ // built on the coordinator node receiving a response private static class RemoteDataResponse extends DataResponse { -- protected RemoteDataResponse(ByteBuffer data) ++ protected RemoteDataResponse(ByteBuffer data, int version) { - super(data, SerializationHelper.Flag.FROM_REMOTE); - super(null, data, SerializationHelper.Flag.FROM_REMOTE); ++ super(data, version, SerializationHelper.Flag.FROM_REMOTE); } } @@@ -140,12 -167,12 +140,14 @@@ // TODO: can the digest be calculated over the raw bytes now? // The response, serialized in the current messaging version private final ByteBuffer data; ++ private final int dataSerializationVersion; private final SerializationHelper.Flag flag; - protected DataResponse(ByteBuffer data, SerializationHelper.Flag flag) - protected DataResponse(ReadCommand command, ByteBuffer data, SerializationHelper.Flag flag) ++ protected DataResponse(ByteBuffer data, int dataSerializationVersion, SerializationHelper.Flag flag) { - super(command); + super(); this.data = data; ++ this.dataSerializationVersion = dataSerializationVersion; this.flag = flag; } @@@ -157,7 -184,7 +159,7 @@@ // the later can be null (for RemoteDataResponse as those are created in the serializers and // those don't have easy access to the command). This is also why we need the command as parameter here. return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in, -- MessagingService.current_version, ++ dataSerializationVersion, command.metadata(), command.columnFilter(), flag); @@@ -204,11 -377,9 +206,8 @@@ if (digest.hasRemaining()) return new DigestResponse(digest); - // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the - // version, we'll have to deserialize/re-serialize the data to be in the proper version. -- assert version == MessagingService.VERSION_30; ByteBuffer data = ByteBufferUtil.readWithVIntLength(in); -- return new RemoteDataResponse(data); ++ return new RemoteDataResponse(data, version); } public long serializedSize(ReadResponse response, int version) @@@ -219,9 -390,31 +218,10 @@@ long size = ByteBufferUtil.serializedSizeWithVIntLength(digest); if (!isDigest) { -- // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the -- // version, we'll have to deserialize/re-serialize the data to be in the proper version. -- assert version == MessagingService.VERSION_30; ++ // In theory, we should deserialize/re-serialize if the version asked is different from the current ++ // version as the content could have a different serialization format. So far though, we haven't made ++ // change to partition iterators serialization since 3.0 so we skip this. ++ assert version >= MessagingService.VERSION_30; ByteBuffer data = ((DataResponse)response).data; size += ByteBufferUtil.serializedSizeWithVIntLength(data); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/26eab454/src/java/org/apache/cassandra/db/filter/ColumnFilter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/filter/ColumnFilter.java index d320fc3,93a848e..b3ae505 --- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java @@@ -20,6 -20,6 +20,7 @@@ package org.apache.cassandra.db.filter import java.io.IOException; import java.util.*; ++import com.google.common.collect.Iterables; import com.google.common.collect.SortedSetMultimap; import com.google.common.collect.TreeMultimap; @@@ -30,6 -30,6 +31,7 @@@ import org.apache.cassandra.db.rows.Cel import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; ++import org.apache.cassandra.net.MessagingService; /** * Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected @@@ -62,17 -62,16 +64,17 @@@ public class ColumnFilte { public static final Serializer serializer = new Serializer(); - // True if _fetched_ includes all regular columns (an any static in _queried_), in which case metadata must not be - // True if _fetched_ is all the columns, in which case metadata must not be null. If false, - // then _fetched_ == _queried_ and we only store _queried_. - private final boolean isFetchAll; ++ // True if _fetched_ includes all regular columns (and any static in _queried_), in which case metadata must not be + // null. If false, then _fetched_ == _queried_ and we only store _queried_. + private final boolean fetchAllRegulars; - private final CFMetaData metadata; // can be null if !isFetchAll + private final CFMetaData metadata; // can be null if !fetchAllRegulars - private final PartitionColumns queried; // can be null if isFetchAll and _fetched_ == _queried_ + private final PartitionColumns queried; // can be null if fetchAllRegulars, to represent a wildcard query (all + // static and regular columns are both _fetched_ and _queried_). private final SortedSetMultimap subSelections; // can be null - private ColumnFilter(boolean isFetchAll, + private ColumnFilter(boolean fetchAllRegulars, CFMetaData metadata, PartitionColumns queried, SortedSetMultimap subSelections) @@@ -429,19 -401,19 +431,42 @@@ public static class Serializer { -- private static final int IS_FETCH_ALL_MASK = 0x01; ++ private static final int FETCH_ALL_MASK = 0x01; private static final int HAS_QUERIED_MASK = 0x02; private static final int HAS_SUB_SELECTIONS_MASK = 0x04; private static int makeHeaderByte(ColumnFilter selection) { - return (selection.fetchAllRegulars ? IS_FETCH_ALL_MASK : 0) - return (selection.isFetchAll ? IS_FETCH_ALL_MASK : 0) ++ return (selection.fetchAllRegulars ? FETCH_ALL_MASK : 0) | (selection.queried != null ? HAS_QUERIED_MASK : 0) | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0); } ++ private static ColumnFilter maybeUpdateForBackwardCompatility(ColumnFilter selection, int version) ++ { ++ if (version > MessagingService.VERSION_30 || !selection.fetchAllRegulars || selection.queried == null) ++ return selection; ++ ++ // The meaning of fetchAllRegulars changed (at least when queried != null) due to CASSANDRA-12768: in ++ // pre-4.0 it means that *all* columns are fetched, not just the regular ones, and so 3.0/3.X nodes ++ // would send us more than we'd like. So instead recreating a filter that correspond to what we ++ // actually want (it's a tiny bit less efficient as we include all columns manually and will mark as ++ // queried some columns that are actually only fetched, but it's fine during upgrade). ++ // More concretely, we replace our filter by a non-fetch-all one that queries every columns that our ++ // current filter fetches. ++ Columns allRegulars = selection.metadata.partitionColumns().regulars; ++ Set queriedStatic = new HashSet<>(); ++ Iterables.addAll(queriedStatic, Iterables.filter(selection.queried, ColumnDefinition::isStatic)); ++ return new ColumnFilter(false, ++ null, ++ new PartitionColumns(Columns.from(queriedStatic), allRegulars), ++ selection.subSelections); ++ } ++ public void serialize(ColumnFilter selection, DataOutputPlus out, int version) throws IOException { ++ selection = maybeUpdateForBackwardCompatility(selection, version); ++ out.writeByte(makeHeaderByte(selection)); if (selection.queried != null) @@@ -461,7 -433,7 +486,7 @@@ public ColumnFilter deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException { int header = in.readUnsignedByte(); -- boolean isFetchAll = (header & IS_FETCH_ALL_MASK) != 0; ++ boolean isFetchAll = (header & FETCH_ALL_MASK) != 0; boolean hasQueried = (header & HAS_QUERIED_MASK) != 0; boolean hasSubSelections = (header & HAS_SUB_SELECTIONS_MASK) != 0; @@@ -485,11 -457,11 +510,22 @@@ } } ++ // Same concern than in serialize/serializedSize: we should be wary of the change in meaning for isFetchAll. ++ // If we get a filter with isFetchAll from 3.0/3.x, it actually expects all static columns to be fetched, ++ // make sure we do that (note that if queried == null, that's already what we do). ++ // Note that here again this will make us do a bit more work that necessary, namely we'll _query_ all ++ // statics even though we only care about _fetching_ them all, but that's a minor inefficiency, so fine ++ // during upgrade. ++ if (version <= MessagingService.VERSION_30 && isFetchAll && queried != null) ++ queried = new PartitionColumns(metadata.partitionColumns().statics, queried.regulars); ++ return new ColumnFilter(isFetchAll, isFetchAll ? metadata : null, queried, subSelections); } public long serializedSize(ColumnFilter selection, int version) { ++ selection = maybeUpdateForBackwardCompatility(selection, version); ++ long size = 1; // header byte if (selection.queried != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/26eab454/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/MessagingService.java index 38c1cd2,f82e80b..7215397 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@@ -88,8 -88,12 +88,9 @@@ public final class MessagingService imp public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService"; // 8 bits version, so don't waste versions - public static final int VERSION_12 = 6; - public static final int VERSION_20 = 7; - public static final int VERSION_21 = 8; - public static final int VERSION_22 = 9; public static final int VERSION_30 = 10; -- public static final int current_version = VERSION_30; ++ public static final int VERSION_40 = 11; ++ public static final int current_version = VERSION_40; public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC"; public static final byte[] ONE_BYTE = new byte[1];