cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [3/6] cassandra git commit: Fix incorrect [2.1 <- 3.0] serialization of counter cells created in 2.0
Date Tue, 01 Aug 2017 14:41:24 GMT
Fix incorrect [2.1 <- 3.0] serialization of counter cells created in 2.0

Also fixes calculation of legacy counter update cells' serialized size.

patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for CASSANDRA-13691


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ba712897
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ba712897
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ba712897

Branch: refs/heads/trunk
Commit: ba71289778369e71d9abbdb93cb6b91ba67f9c85
Parents: 9dc896f
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Thu Jul 13 14:47:18 2017 -0700
Committer: Aleksey Yeschenko <aleksey@yeschenko.com>
Committed: Tue Aug 1 15:24:55 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 +-
 .../apache/cassandra/cql3/UpdateParameters.java |  7 ++-
 .../org/apache/cassandra/db/LegacyLayout.java   | 23 ++++++---
 .../cassandra/db/context/CounterContext.java    | 52 ++++++++++++++------
 .../cassandra/thrift/CassandraServer.java       |  4 +-
 .../org/apache/cassandra/utils/CounterId.java   |  3 +-
 .../db/context/CounterContextTest.java          | 34 ++++++++++++-
 7 files changed, 98 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ba712897/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3f5fe6..4038ac7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Fix incorrect [2.1 <- 3.0] serialization of counter cells created in 2.0 (CASSANDRA-13691)
  * Fix invalid writetime for null cells (CASSANDRA-13711)
  * Fix ALTER TABLE statement to atomically propagate changes to the table and its MVs (CASSANDRA-12952)
  * Fixed ambiguous output of nodetool tablestats command (CASSANDRA-13722)
@@ -19,6 +20,7 @@
 Merged from 2.1:
  * Clone HeartBeatState when building gossip messages. Make its generation/version volatile
(CASSANDRA-13700)
 
+
 3.0.14
  * Ensure int overflow doesn't occur when calculating large partition warning size (CASSANDRA-13172)
  * Ensure consistent view of partition columns between coordinator and replica in ColumnFilter
(CASSANDRA-13004)
@@ -45,6 +47,7 @@ Merged from 2.2:
  * Nodes started with join_ring=False should be able to serve requests when authentication
is enabled (CASSANDRA-11381)
  * cqlsh COPY FROM: increment error count only for failures, not for attempts (CASSANDRA-13209)
 
+
 3.0.13
  * Make reading of range tombstones more reliable (CASSANDRA-12811)
  * Fix startup problems due to schema tables not completely flushed (CASSANDRA-12213)
@@ -170,7 +173,6 @@ Merged from 2.1:
  * cqlsh copy-from: sort user type fields in csv (CASSANDRA-12959)
 
 
-
 3.0.10
  * Disallow offheap_buffers memtable allocation (CASSANDRA-11039)
  * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ba712897/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index d902dec..8ff5344 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -162,12 +162,15 @@ public class UpdateParameters
         // "counter update", which is a temporary state until we run into 'CounterMutation.updateWithCurrentValue()'
         // which does the read-before-write and sets the proper CounterId, clock and updated
value.
         //
-        // We thus create a "fake" local shard here. The CounterId/clock used don't matter
as this is just a temporary
+        // We thus create a "fake" local shard here. The clock used doesn't matter as this
is just a temporary
         // state that will be replaced when processing the mutation in CounterMutation, but
the reason we use a 'local'
         // shard is due to the merging rules: if a user includes multiple updates to the
same counter in a batch, those
         // multiple updates will be merged in the PartitionUpdate *before* they even reach
CounterMutation. So we need
         // such update to be added together, and that's what a local shard gives us.
-        builder.addCell(BufferCell.live(metadata, column, timestamp, CounterContext.instance().createLocal(increment)));
+        //
+        // We set counterid to a special value to differentiate between regular pre-2.0 local
shards from pre-2.1 era
+        // and "counter update" temporary state cells. Please see CounterContext.createUpdate()
for further details.
+        builder.addCell(BufferCell.live(metadata, column, timestamp, CounterContext.instance().createUpdate(increment)));
     }
 
     public void setComplexDeletionTime(ColumnDefinition column)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ba712897/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 669fb1c..4f7bc22 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -511,12 +511,12 @@ public abstract class LegacyLayout
         {
             size += ByteBufferUtil.serializedSizeWithShortLength(cell.name.encode(partition.metadata()));
             size += 1;  // serialization flags
-            if (cell.kind == LegacyLayout.LegacyCell.Kind.EXPIRING)
+            if (cell.isExpiring())
             {
                 size += TypeSizes.sizeof(cell.ttl);
                 size += TypeSizes.sizeof(cell.localDeletionTime);
             }
-            else if (cell.kind == LegacyLayout.LegacyCell.Kind.DELETED)
+            else if (cell.isTombstone())
             {
                 size += TypeSizes.sizeof(cell.timestamp);
                 // localDeletionTime replaces cell.value as the body
@@ -524,7 +524,14 @@ public abstract class LegacyLayout
                 size += TypeSizes.sizeof(cell.localDeletionTime);
                 continue;
             }
-            else if (cell.kind == LegacyLayout.LegacyCell.Kind.COUNTER)
+            else if (cell.isCounterUpdate())
+            {
+                size += TypeSizes.sizeof(cell.timestamp);
+                long count = CounterContext.instance().getLocalCount(cell.value);
+                size += ByteBufferUtil.serializedSizeWithLength(ByteBufferUtil.bytes(count));
+                continue;
+            }
+            else if (cell.isCounter())
             {
                 size += TypeSizes.sizeof(Long.MIN_VALUE);  // timestampOfLastDelete
             }
@@ -1073,7 +1080,7 @@ public abstract class LegacyLayout
             ByteBuffer value = ByteBufferUtil.readWithLength(in);
             LegacyCellName name = decodeCellName(metadata, cellname, readAllAsDynamic);
             return (mask & COUNTER_UPDATE_MASK) != 0
-                ? new LegacyCell(LegacyCell.Kind.COUNTER, name, CounterContext.instance().createLocal(ByteBufferUtil.toLong(value)),
ts, Cell.NO_DELETION_TIME, Cell.NO_TTL)
+                ? new LegacyCell(LegacyCell.Kind.COUNTER, name, CounterContext.instance().createUpdate(ByteBufferUtil.toLong(value)),
ts, Cell.NO_DELETION_TIME, Cell.NO_TTL)
                 : ((mask & DELETION_MASK) == 0
                         ? new LegacyCell(LegacyCell.Kind.REGULAR, name, value, ts, Cell.NO_DELETION_TIME,
Cell.NO_TTL)
                         : new LegacyCell(LegacyCell.Kind.DELETED, name, ByteBufferUtil.EMPTY_BYTE_BUFFER,
ts, ByteBufferUtil.toInt(value), Cell.NO_TTL));
@@ -1489,11 +1496,11 @@ public abstract class LegacyLayout
             return new LegacyCell(Kind.DELETED, decodeCellName(metadata, superColumnName,
name), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp, nowInSec, LivenessInfo.NO_TTL);
         }
 
-        public static LegacyCell counter(CFMetaData metadata, ByteBuffer superColumnName,
ByteBuffer name, long value)
+        public static LegacyCell counterUpdate(CFMetaData metadata, ByteBuffer superColumnName,
ByteBuffer name, long value)
         throws UnknownColumnException
         {
             // See UpdateParameters.addCounter() for more details on this
-            ByteBuffer counterValue = CounterContext.instance().createLocal(value);
+            ByteBuffer counterValue = CounterContext.instance().createUpdate(value);
             return counter(decodeCellName(metadata, superColumnName, name), counterValue);
         }
 
@@ -1515,10 +1522,10 @@ public abstract class LegacyLayout
             return 0;
         }
 
-        private boolean isCounterUpdate()
+        public boolean isCounterUpdate()
         {
             // See UpdateParameters.addCounter() for more details on this
-            return isCounter() && CounterContext.instance().isLocal(value);
+            return isCounter() && CounterContext.instance().isUpdate(value);
         }
 
         public ClusteringPrefix clustering()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ba712897/src/java/org/apache/cassandra/db/context/CounterContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/context/CounterContext.java b/src/java/org/apache/cassandra/db/context/CounterContext.java
index e3f4dfd..29e5cfc 100644
--- a/src/java/org/apache/cassandra/db/context/CounterContext.java
+++ b/src/java/org/apache/cassandra/db/context/CounterContext.java
@@ -81,6 +81,14 @@ public class CounterContext
     private static final int COUNT_LENGTH = TypeSizes.sizeof(Long.MAX_VALUE);
     private static final int STEP_LENGTH = CounterId.LENGTH + CLOCK_LENGTH + COUNT_LENGTH;
 
+    /*
+     * A special hard-coded value we use for clock ids to differentiate between regular local
shards
+     * and 'fake' local shards used to emulate pre-3.0 CounterUpdateCell-s in UpdateParameters.
+     *
+     * Important for handling counter writes and reads during rolling 2.1/2.2 -> 3.0 upgrades.
+     */
+    static final CounterId UPDATE_CLOCK_ID = CounterId.fromInt(0);
+
     private static final Logger logger = LoggerFactory.getLogger(CounterContext.class);
 
     public static enum Relationship
@@ -100,6 +108,35 @@ public class CounterContext
     }
 
     /**
+     * Creates a counter context with a single local shard with clock id of UPDATE_CLOCK_ID.
+     *
+     * This is only used in a PartitionUpdate until the update has gone through
+     * CounterMutation.apply(), at which point this special local shard will be replaced
by a regular global one.
+     * It should never hit commitlog / memtable / disk, but can hit network.
+     *
+     * We use this so that if an update statement has multiple increments of the same counter
we properly
+     * add them rather than keeping only one of them.
+     *
+     * NOTE: Before CASSANDRA-13691 we used a regular local shard without a hard-coded clock
id value here.
+     * It was problematic, because it was possible to return a false positive, and on read
path encode an old counter
+     * cell from 2.0 era with a regular local shard as a counter update, and to break the
2.1 coordinator.
+     */
+    public ByteBuffer createUpdate(long count)
+    {
+        ContextState state = ContextState.allocate(0, 1, 0);
+        state.writeLocal(UPDATE_CLOCK_ID, 1L, count);
+        return state.context;
+    }
+
+    /**
+     * Checks if a context is an update (see createUpdate() for justification).
+     */
+    public boolean isUpdate(ByteBuffer context)
+    {
+        return ContextState.wrap(context).getCounterId().equals(UPDATE_CLOCK_ID);
+    }
+
+    /**
      * Creates a counter context with a single global, 2.1+ shard (a result of increment).
      */
     public ByteBuffer createGlobal(CounterId id, long clock, long count)
@@ -111,12 +148,7 @@ public class CounterContext
 
     /**
      * Creates a counter context with a single local shard.
-     * This is only used in a PartitionUpdate until the update has gone through
-     * CounterMutation.apply(), at which point all the local shard are replaced by
-     * global ones. In other words, local shards should never hit the disk or
-     * memtables. And we use this so that if an update statement has multiple increment
-     * of the same counter we properly add them rather than keeping only one of them.
-     * (this is also used for tests of compatibility with pre-2.1 counters)
+     * For use by tests of compatibility with pre-2.1 counters only.
      */
     public ByteBuffer createLocal(long count)
     {
@@ -682,14 +714,6 @@ public class CounterContext
     }
 
     /**
-     * Checks if a context is local
-     */
-    public boolean isLocal(ByteBuffer context)
-    {
-        return ContextState.wrap(context).isLocal();
-    }
-
-    /**
      * Returns the clock and the count associated with the given counter id, or (0, 0) if
no such shard is present.
      */
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ba712897/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 0dec94e..86caac3 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1016,7 +1016,7 @@ public class CassandraServer implements Cassandra.Iface
     private LegacyLayout.LegacyCell toCounterLegacyCell(CFMetaData metadata, ByteBuffer superColumnName,
CounterColumn column)
     throws UnknownColumnException
     {
-        return LegacyLayout.LegacyCell.counter(metadata, superColumnName, column.name, column.value);
+        return LegacyLayout.LegacyCell.counterUpdate(metadata, superColumnName, column.name,
column.value);
     }
 
     private void sortAndMerge(CFMetaData metadata, List<LegacyLayout.LegacyCell> cells,
int nowInSec)
@@ -2169,7 +2169,7 @@ public class CassandraServer implements Cassandra.Iface
                 LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata,
column_parent.super_column, column.name);
 
                 // See UpdateParameters.addCounter() for more details on this
-                ByteBuffer value = CounterContext.instance().createLocal(column.value);
+                ByteBuffer value = CounterContext.instance().createUpdate(column.value);
                 CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
                 Cell cell = BufferCell.live(metadata, name.column, FBUtilities.timestampMicros(),
value, path);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ba712897/src/java/org/apache/cassandra/utils/CounterId.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CounterId.java b/src/java/org/apache/cassandra/utils/CounterId.java
index 2552178..690d4aa 100644
--- a/src/java/org/apache/cassandra/utils/CounterId.java
+++ b/src/java/org/apache/cassandra/utils/CounterId.java
@@ -46,10 +46,11 @@ public class CounterId implements Comparable<CounterId>
     }
 
     /**
-     * Function for test purposes, do not use otherwise.
      * Pack an int in a valid CounterId so that the resulting ids respects the
      * numerical ordering. Used for creating handcrafted but easy to
      * understand contexts in unit tests (see CounterContextTest).
+     *
+     * Also used to generate a special ID for special-case update contexts (see CounterContext.createUpdate()).
      */
     public static CounterId fromInt(int n)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ba712897/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/context/CounterContextTest.java b/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
index 4f587c6..a8852f7 100644
--- a/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
+++ b/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
@@ -28,17 +28,19 @@ import org.junit.Test;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ClockAndCount;
+import org.apache.cassandra.db.LegacyLayout.LegacyCell;
 import org.apache.cassandra.db.context.CounterContext.Relationship;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CounterId;
 
-import static org.apache.cassandra.db.context.CounterContext.ContextState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import static org.apache.cassandra.db.context.CounterContext.ContextState;
+
 public class CounterContextTest
 {
     private static final CounterContext cc = new CounterContext();
@@ -542,4 +544,34 @@ public class CounterContextTest
         assertEquals(ClockAndCount.create(0L, 0L), cc.getClockAndCountOf(state.context, CounterId.fromInt(15)));
         assertEquals(ClockAndCount.create(0L, 0L), cc.getClockAndCountOf(state.context, CounterId.fromInt(20)));
     }
+
+    @Test // see CASSANDRA-13691
+    public void testCounterUpdate()
+    {
+        /*
+         * a context with just one 'update' shard - a local shard with a hardcoded value
of CounterContext.UPDATE_CLOCK_ID
+         */
+
+        ByteBuffer updateContext = CounterContext.instance().createUpdate(10L);
+
+        assertEquals(ClockAndCount.create(1L, 10L), cc.getClockAndCountOf(updateContext,
CounterContext.UPDATE_CLOCK_ID));
+        assertTrue(cc.isUpdate(updateContext));
+        LegacyCell updateCell = LegacyCell.counter(null, updateContext);
+        assertTrue(updateCell.isCounterUpdate());
+
+
+        /*
+         * a context with a regular local shard sorting first and a couple others in it -
should *not* be identified as an update
+         */
+
+        ContextState notUpdateContextState = ContextState.allocate(1, 1, 1);
+        notUpdateContextState.writeLocal( CounterId.fromInt(1), 1L, 10L);
+        notUpdateContextState.writeRemote(CounterId.fromInt(2), 1L, 10L);
+        notUpdateContextState.writeGlobal(CounterId.fromInt(3), 1L, 10L);
+        ByteBuffer notUpdateContext = notUpdateContextState.context;
+
+        assertFalse(cc.isUpdate(notUpdateContext));
+        LegacyCell notUpdateCell = LegacyCell.counter(null, notUpdateContext);
+        assertFalse(notUpdateCell.isCounterUpdate());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message