cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [6/6] cassandra git commit: Merge commit 'b234ca3ee4101a34b761948d633ef4a12aa70ca2' into trunk
Date Fri, 27 Jan 2017 08:32:49 GMT
Merge commit 'b234ca3ee4101a34b761948d633ef4a12aa70ca2' into trunk

* commit 'b234ca3ee4101a34b761948d633ef4a12aa70ca2':
  Revert CASSANDRA-12768 (and update fix of CASSANDRA-12694) due to upgrade regression


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/26eab454
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/26eab454
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/26eab454

Branch: refs/heads/trunk
Commit: 26eab45465a140be5ef5fae45dc8d7cbb33192b9
Parents: 2987a70 b234ca3
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Fri Jan 27 09:27:44 2017 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Fri Jan 27 09:28:41 2017 +0100

----------------------------------------------------------------------
 .../org/apache/cassandra/db/ReadResponse.java   | 26 ++++++------
 .../cassandra/db/filter/ColumnFilter.java       | 44 ++++++++++++++++++--
 .../apache/cassandra/net/MessagingService.java  |  3 +-
 3 files changed, 55 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/26eab454/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadResponse.java
index 107bcc4,cca21f8..7cf04a4
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@@ -52,7 -79,7 +52,7 @@@ public abstract class ReadRespons
      @VisibleForTesting
      public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data,
ReadCommand command)
      {
--        return new RemoteDataResponse(LocalDataResponse.build(data, command.columnFilter()));
++        return new RemoteDataResponse(LocalDataResponse.build(data, command.columnFilter()),
MessagingService.current_version);
      }
  
      public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data, ReadCommand
command)
@@@ -108,7 -135,7 +108,7 @@@
      {
          private LocalDataResponse(UnfilteredPartitionIterator iter, ReadCommand command)
          {
-             super(build(iter, command.columnFilter()), SerializationHelper.Flag.LOCAL);
 -            super(command, build(iter, command.columnFilter()), SerializationHelper.Flag.LOCAL);
++            super(build(iter, command.columnFilter()), MessagingService.current_version,
SerializationHelper.Flag.LOCAL);
          }
  
          private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection)
@@@ -129,9 -156,9 +129,9 @@@
      // built on the coordinator node receiving a response
      private static class RemoteDataResponse extends DataResponse
      {
--        protected RemoteDataResponse(ByteBuffer data)
++        protected RemoteDataResponse(ByteBuffer data, int version)
          {
-             super(data, SerializationHelper.Flag.FROM_REMOTE);
 -            super(null, data, SerializationHelper.Flag.FROM_REMOTE);
++            super(data, version, SerializationHelper.Flag.FROM_REMOTE);
          }
      }
  
@@@ -140,12 -167,12 +140,14 @@@
          // TODO: can the digest be calculated over the raw bytes now?
          // The response, serialized in the current messaging version
          private final ByteBuffer data;
++        private final int dataSerializationVersion;
          private final SerializationHelper.Flag flag;
  
-         protected DataResponse(ByteBuffer data, SerializationHelper.Flag flag)
 -        protected DataResponse(ReadCommand command, ByteBuffer data, SerializationHelper.Flag
flag)
++        protected DataResponse(ByteBuffer data, int dataSerializationVersion, SerializationHelper.Flag
flag)
          {
 -            super(command);
 +            super();
              this.data = data;
++            this.dataSerializationVersion = dataSerializationVersion;
              this.flag = flag;
          }
  
@@@ -157,7 -184,7 +159,7 @@@
                  // the later can be null (for RemoteDataResponse as those are created in
the serializers and
                  // those don't have easy access to the command). This is also why we need
the command as parameter here.
                  return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in,
--                                                                                       
 MessagingService.current_version,
++                                                                                       
 dataSerializationVersion,
                                                                                         
 command.metadata(),
                                                                                         
 command.columnFilter(),
                                                                                         
 flag);
@@@ -204,11 -377,9 +206,8 @@@
              if (digest.hasRemaining())
                  return new DigestResponse(digest);
  
-             // Note that we can only get there if version == 3.0, which is the current_version.
When we'll change the
-             // version, we'll have to deserialize/re-serialize the data to be in the proper
version.
--            assert version == MessagingService.VERSION_30;
              ByteBuffer data = ByteBufferUtil.readWithVIntLength(in);
--            return new RemoteDataResponse(data);
++            return new RemoteDataResponse(data, version);
          }
  
          public long serializedSize(ReadResponse response, int version)
@@@ -219,9 -390,31 +218,10 @@@
              long size = ByteBufferUtil.serializedSizeWithVIntLength(digest);
              if (!isDigest)
              {
--                // Note that we can only get there if version == 3.0, which is the current_version.
When we'll change the
--                // version, we'll have to deserialize/re-serialize the data to be in the
proper version.
--                assert version == MessagingService.VERSION_30;
++                // In theory, we should deserialize/re-serialize if the version asked is
different from the current
++                // version as the content could have a different serialization format. So
far though, we haven't made
++                // change to partition iterators serialization since 3.0 so we skip this.
++                assert version >= MessagingService.VERSION_30;
                  ByteBuffer data = ((DataResponse)response).data;
                  size += ByteBufferUtil.serializedSizeWithVIntLength(data);
              }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26eab454/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index d320fc3,93a848e..b3ae505
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@@ -20,6 -20,6 +20,7 @@@ package org.apache.cassandra.db.filter
  import java.io.IOException;
  import java.util.*;
  
++import com.google.common.collect.Iterables;
  import com.google.common.collect.SortedSetMultimap;
  import com.google.common.collect.TreeMultimap;
  
@@@ -30,6 -30,6 +31,7 @@@ import org.apache.cassandra.db.rows.Cel
  import org.apache.cassandra.config.ColumnDefinition;
  import org.apache.cassandra.io.util.DataInputPlus;
  import org.apache.cassandra.io.util.DataOutputPlus;
++import org.apache.cassandra.net.MessagingService;
  
  /**
   * Represents which (non-PK) columns (and optionally which sub-part of a column for complex
columns) are selected
@@@ -62,17 -62,16 +64,17 @@@ public class ColumnFilte
  {
      public static final Serializer serializer = new Serializer();
  
-     // True if _fetched_ includes all regular columns (an any static in _queried_), in which
case metadata must not be
 -    // True if _fetched_ is all the columns, in which case metadata must not be null. If
false,
 -    // then _fetched_ == _queried_ and we only store _queried_.
 -    private final boolean isFetchAll;
++    // True if _fetched_ includes all regular columns (and any static in _queried_), in
which case metadata must not be
 +    // null. If false, then _fetched_ == _queried_ and we only store _queried_.
 +    private final boolean fetchAllRegulars;
  
 -    private final CFMetaData metadata; // can be null if !isFetchAll
 +    private final CFMetaData metadata; // can be null if !fetchAllRegulars
  
 -    private final PartitionColumns queried; // can be null if isFetchAll and _fetched_ ==
_queried_
 +    private final PartitionColumns queried; // can be null if fetchAllRegulars, to represent
a wildcard query (all
 +                                            // static and regular columns are both _fetched_
and _queried_).
      private final SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections;
// can be null
  
 -    private ColumnFilter(boolean isFetchAll,
 +    private ColumnFilter(boolean fetchAllRegulars,
                           CFMetaData metadata,
                           PartitionColumns queried,
                           SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections)
@@@ -429,19 -401,19 +431,42 @@@
  
      public static class Serializer
      {
--        private static final int IS_FETCH_ALL_MASK       = 0x01;
++        private static final int FETCH_ALL_MASK       = 0x01;
          private static final int HAS_QUERIED_MASK      = 0x02;
          private static final int HAS_SUB_SELECTIONS_MASK = 0x04;
  
          private static int makeHeaderByte(ColumnFilter selection)
          {
-             return (selection.fetchAllRegulars ? IS_FETCH_ALL_MASK : 0)
 -            return (selection.isFetchAll ? IS_FETCH_ALL_MASK : 0)
++            return (selection.fetchAllRegulars ? FETCH_ALL_MASK : 0)
                   | (selection.queried != null ? HAS_QUERIED_MASK : 0)
                   | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0);
          }
  
++        private static ColumnFilter maybeUpdateForBackwardCompatility(ColumnFilter selection,
int version)
++        {
++            if (version > MessagingService.VERSION_30 || !selection.fetchAllRegulars
|| selection.queried == null)
++                return selection;
++
++            // The meaning of fetchAllRegulars changed (at least when queried != null) due
to CASSANDRA-12768: in
++            // pre-4.0 it means that *all* columns are fetched, not just the regular ones,
and so 3.0/3.X nodes
++            // would send us more than we'd like. So instead recreating a filter that correspond
to what we
++            // actually want (it's a tiny bit less efficient as we include all columns manually
and will mark as
++            // queried some columns that are actually only fetched, but it's fine during
upgrade).
++            // More concretely, we replace our filter by a non-fetch-all one that queries
every columns that our
++            // current filter fetches.
++            Columns allRegulars = selection.metadata.partitionColumns().regulars;
++            Set<ColumnDefinition> queriedStatic = new HashSet<>();
++            Iterables.addAll(queriedStatic, Iterables.filter(selection.queried, ColumnDefinition::isStatic));
++            return new ColumnFilter(false,
++                                    null,
++                                    new PartitionColumns(Columns.from(queriedStatic), allRegulars),
++                                    selection.subSelections);
++        }
++
          public void serialize(ColumnFilter selection, DataOutputPlus out, int version) throws
IOException
          {
++            selection = maybeUpdateForBackwardCompatility(selection, version);
++
              out.writeByte(makeHeaderByte(selection));
  
              if (selection.queried != null)
@@@ -461,7 -433,7 +486,7 @@@
          public ColumnFilter deserialize(DataInputPlus in, int version, CFMetaData metadata)
throws IOException
          {
              int header = in.readUnsignedByte();
--            boolean isFetchAll = (header & IS_FETCH_ALL_MASK) != 0;
++            boolean isFetchAll = (header & FETCH_ALL_MASK) != 0;
              boolean hasQueried = (header & HAS_QUERIED_MASK) != 0;
              boolean hasSubSelections = (header & HAS_SUB_SELECTIONS_MASK) != 0;
  
@@@ -485,11 -457,11 +510,22 @@@
                  }
              }
  
++            // Same concern than in serialize/serializedSize: we should be wary of the change
in meaning for isFetchAll.
++            // If we get a filter with isFetchAll from 3.0/3.x, it actually expects all
static columns to be fetched,
++            // make sure we do that (note that if queried == null, that's already what we
do).
++            // Note that here again this will make us do a bit more work that necessary,
namely we'll _query_ all
++            // statics even though we only care about _fetching_ them all, but that's a
minor inefficiency, so fine
++            // during upgrade.
++            if (version <= MessagingService.VERSION_30 && isFetchAll &&
queried != null)
++                queried = new PartitionColumns(metadata.partitionColumns().statics, queried.regulars);
++
              return new ColumnFilter(isFetchAll, isFetchAll ? metadata : null, queried, subSelections);
          }
  
          public long serializedSize(ColumnFilter selection, int version)
          {
++            selection = maybeUpdateForBackwardCompatility(selection, version);
++
              long size = 1; // header byte
  
              if (selection.queried != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26eab454/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 38c1cd2,f82e80b..7215397
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -88,8 -88,12 +88,9 @@@ public final class MessagingService imp
      public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
  
      // 8 bits version, so don't waste versions
 -    public static final int VERSION_12 = 6;
 -    public static final int VERSION_20 = 7;
 -    public static final int VERSION_21 = 8;
 -    public static final int VERSION_22 = 9;
      public static final int VERSION_30 = 10;
--    public static final int current_version = VERSION_30;
++    public static final int VERSION_40 = 11;
++    public static final int current_version = VERSION_40;
  
      public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
      public static final byte[] ONE_BYTE = new byte[1];


Mime
View raw message