cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] cassandra git commit: Fix LWW bug affecting MaterializedViews
Date Thu, 27 Aug 2015 18:38:51 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 307c196e6 -> 0174ad406


Fix LWW bug affecting MaterializedViews

Patch by tjake; reviewed by carl for CASSANDRA-10197


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f00a3da8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f00a3da8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f00a3da8

Branch: refs/heads/trunk
Commit: f00a3da835a43a43d6606e0a2544c12554a6a425
Parents: b5b32a4
Author: T Jake Luciani <jake@apache.org>
Authored: Wed Aug 26 16:26:06 2015 -0400
Committer: T Jake Luciani <jake@apache.org>
Committed: Thu Aug 27 14:36:11 2015 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/view/TemporalRow.java   | 188 +++++++++++++------
 .../cql3/MaterializedViewLongTest.java          |   8 +-
 .../cassandra/cql3/MaterializedViewTest.java    |  26 +++
 4 files changed, 156 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f00a3da8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ca2ded8..eef7974 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-beta2
+ * Fix LWW bug affecting Materialized Views (CASSANDRA-10197)
  * Ensures frozen sets and maps are always sorted (CASSANDRA-10162)
  * Don't deadlock when flushing CFS backed custom indexes (CASSANDRA-10181)
  * Fix double flushing of secondary index tables (CASSANDRA-10180)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f00a3da8/src/java/org/apache/cassandra/db/view/TemporalRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java
index 3ba91ee..be1c584 100644
--- a/src/java/org/apache/cassandra/db/view/TemporalRow.java
+++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java
@@ -26,8 +26,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
 
 import com.google.common.collect.Iterables;
 
@@ -68,51 +66,16 @@ public class TemporalRow
     public interface Resolver
     {
         /**
-         * @param cells Iterable of all cells for a certain TemporalRow's Cell, in timestamp
sorted order
+         * @param cellVersions  all cells for a certain TemporalRow's Cell
          * @return      A single TemporalCell from the iterable which satisfies the resolution
criteria, or null if
          *              there is no cell which qualifies
          */
-        TemporalCell resolve(Iterable<TemporalCell> cells);
+        TemporalCell resolve(TemporalCell.Versions cellVersions);
     }
 
-    /**
-     * Returns the first value in the iterable if it is from the set of persisted cells,
and the cell which results from
-     * reconciliation of the remaining cells does not have the same value.
-     */
-    public static final Resolver oldValueIfUpdated = cells -> {
-        Iterator<TemporalCell> iterator = cells.iterator();
-        if (!iterator.hasNext())
-            return null;
-
-        TemporalCell initial = iterator.next();
-        if (initial.isNew || !iterator.hasNext())
-            return null;
-
-        TemporalCell value = initial;
-        while (iterator.hasNext())
-            value = value.reconcile(iterator.next());
-
-        return ByteBufferUtil.compareUnsigned(initial.value, value.value) != 0 ? initial
: null;
-    };
-
-    public static final Resolver earliest = cells -> {
-        Iterator<TemporalCell> iterator = cells.iterator();
-        if (!iterator.hasNext())
-            return null;
-        return iterator.next();
-    };
-
-    public static final Resolver latest = cells -> {
-        Iterator<TemporalCell> iterator = cells.iterator();
-        if (!iterator.hasNext())
-            return null;
-
-        TemporalCell value = iterator.next();
-        while (iterator.hasNext())
-            value = value.reconcile(iterator.next());
-
-        return value;
-    };
+    public static final Resolver oldValueIfUpdated = TemporalCell.Versions::getOldCellIfUpdated;
+    public static final Resolver earliest = TemporalCell.Versions::getEarliestCell;
+    public static final Resolver latest = TemporalCell.Versions::getLatestCell;
 
     private static class TemporalCell
     {
@@ -157,6 +120,117 @@ public class TemporalRow
         {
             return new BufferCell(definition, timestamp, ttl, localDeletionTime, value, cellPath);
         }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            TemporalCell that = (TemporalCell) o;
+
+            if (timestamp != that.timestamp) return false;
+            if (ttl != that.ttl) return false;
+            if (localDeletionTime != that.localDeletionTime) return false;
+            if (isNew != that.isNew) return false;
+            return !(value != null ? !value.equals(that.value) : that.value != null);
+        }
+
+        public int hashCode()
+        {
+            int result = value != null ? value.hashCode() : 0;
+            result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+            result = 31 * result + ttl;
+            result = 31 * result + localDeletionTime;
+            result = 31 * result + (isNew ? 1 : 0);
+            return result;
+        }
+
+        /**
+         * Tracks the versions of a cell for a given TemporalRow.
+         * There are only two possible versions, existing and new.
+         *
+         */
+        static class Versions
+        {
+            private TemporalCell existingCell = null;
+            private TemporalCell newCell = null;
+            private int numSet = 0;
+
+
+            /**
+             * @return the cell that is earliest
+             * Or would be overwritten in the case of a timestamp conflict
+             */
+            public TemporalCell getEarliestCell()
+            {
+                assert numSet > 0;
+
+                if (numSet == 1)
+                    return existingCell == null ? newCell : existingCell;
+
+                TemporalCell latest = existingCell.reconcile(newCell);
+
+                return latest == newCell ? existingCell : newCell;
+            }
+
+            /**
+             * @return the cell that is latest
+             * Or would be the winner in the case of a timestamp conflict
+             */
+            public TemporalCell getLatestCell()
+            {
+                assert numSet > 0;
+
+                if (numSet == 1)
+                    return existingCell == null ? newCell : existingCell;
+
+                return existingCell.reconcile(newCell);
+            }
+
+            /**
+             * @return the new cell if it updates the existing cell
+             */
+            public TemporalCell getOldCellIfUpdated()
+            {
+                assert numSet > 0;
+
+                if (numSet == 1)
+                   return null;
+
+                TemporalCell value = existingCell.reconcile(newCell);
+
+                return ByteBufferUtil.compareUnsigned(existingCell.value, value.value) !=
0 ? existingCell : null;
+            }
+
+            void setVersion(TemporalCell cell)
+            {
+                assert cell != null;
+
+                if (cell.isNew)
+                {
+                    assert newCell == null || newCell.equals(cell) : "Only one cell version
can be marked New";
+                    newCell = cell;
+                    numSet = existingCell == null ? 1 : 2;
+                }
+                else
+                {
+                    assert existingCell == null || existingCell.equals(cell) : "Only one
cell version can be marked Existing";
+                    existingCell = cell;
+                    numSet = newCell == null ? 1 : 2;
+                }
+            }
+
+            public void addToRow(TemporalRow row, ColumnIdentifier column, CellPath path)
+            {
+                if (existingCell != null)
+                    row.addColumnValue(column, path, existingCell.timestamp, existingCell.ttl,
+                                       existingCell.localDeletionTime, existingCell.value,
existingCell.isNew);
+
+                if (newCell != null)
+                    row.addColumnValue(column, path, newCell.timestamp, newCell.ttl,
+                                       newCell.localDeletionTime, newCell.value, newCell.isNew);
+            }
+        }
     }
 
     private final ColumnFamilyStore baseCfs;
@@ -167,7 +241,7 @@ public class TemporalRow
     private final boolean startIsNew;
 
     public final int nowInSec;
-    private final Map<ColumnIdentifier, Map<CellPath, SortedMap<Long, TemporalCell>>>
columnValues = new HashMap<>();
+    private final Map<ColumnIdentifier, Map<CellPath, TemporalCell.Versions>>
columnValues = new HashMap<>();
     private int viewClusteringTtl = NO_TTL;
     private long viewClusteringTimestamp = NO_TIMESTAMP;
     private int viewClusteringLocalDeletionTime = NO_DELETION_TIME;
@@ -230,10 +304,10 @@ public class TemporalRow
         if (!columnValues.containsKey(identifier))
             columnValues.put(identifier, new HashMap<>());
 
-        Map<CellPath, SortedMap<Long, TemporalCell>> innerMap = columnValues.get(identifier);
+        Map<CellPath, TemporalCell.Versions> innerMap = columnValues.get(identifier);
 
         if (!innerMap.containsKey(cellPath))
-            innerMap.put(cellPath, new TreeMap<>());
+            innerMap.put(cellPath, new TemporalCell.Versions());
 
         // If this column is part of the view's primary keys
         if (viewPrimaryKey.contains(identifier))
@@ -243,7 +317,7 @@ public class TemporalRow
             this.viewClusteringLocalDeletionTime = minValueIfSet(this.viewClusteringLocalDeletionTime,
localDeletionTime, NO_DELETION_TIME);
         }
 
-        innerMap.get(cellPath).put(timestamp, new TemporalCell(value, timestamp, ttl, localDeletionTime,
isNew));
+        innerMap.get(cellPath).setVersion(new TemporalCell(value, timestamp, ttl, localDeletionTime,
isNew));
     }
 
     private static int minValueIfSet(int existing, int update, int defaultValue)
@@ -333,16 +407,16 @@ public class TemporalRow
 
     public Collection<org.apache.cassandra.db.rows.Cell> values(ColumnDefinition definition,
Resolver resolver)
     {
-        Map<CellPath, SortedMap<Long, TemporalCell>> innerMap = columnValues.get(definition.name);
+        Map<CellPath, TemporalCell.Versions> innerMap = columnValues.get(definition.name);
         if (innerMap == null)
         {
             return Collections.emptyList();
         }
 
         Collection<org.apache.cassandra.db.rows.Cell> value = new ArrayList<>();
-        for (Map.Entry<CellPath, SortedMap<Long, TemporalCell>> pathAndCells
: innerMap.entrySet())
+        for (Map.Entry<CellPath, TemporalCell.Versions> pathAndCells : innerMap.entrySet())
         {
-            TemporalCell cell = resolver.resolve(pathAndCells.getValue().values());
+            TemporalCell cell = resolver.resolve(pathAndCells.getValue());
 
             if (cell != null)
                 value.add(cell.cell(definition, pathAndCells.getKey()));
@@ -421,19 +495,13 @@ public class TemporalRow
             TemporalRow existing = clusteringToRow.put(row.startRow.clustering(), newRow);
             assert existing == null;
 
-
-            for (Map.Entry<ColumnIdentifier, Map<CellPath, SortedMap<Long, TemporalCell>>>
entry : row.columnValues.entrySet())
+            for (Map.Entry<ColumnIdentifier, Map<CellPath, TemporalCell.Versions>>
entry : row.columnValues.entrySet())
             {
-                for (Map.Entry<CellPath, SortedMap<Long, TemporalCell>> cellPathEntry
: entry.getValue().entrySet())
+                for (Map.Entry<CellPath, TemporalCell.Versions> cellPathEntry : entry.getValue().entrySet())
                 {
-                    SortedMap<Long, TemporalCell> oldCells = cellPathEntry.getValue();
-
-                    for (Map.Entry<Long, TemporalCell> cellEntry : oldCells.entrySet())
-                    {
-                        newRow.addColumnValue(entry.getKey(), cellPathEntry.getKey(), cellEntry.getKey(),
-                                              cellEntry.getValue().ttl, cellEntry.getValue().localDeletionTime,
-                                              cellEntry.getValue().value, cellEntry.getValue().isNew);
-                    }
+                    TemporalCell.Versions cellVersions = cellPathEntry.getValue();
+
+                    cellVersions.addToRow(newRow, entry.getKey(), cellPathEntry.getKey());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f00a3da8/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java b/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
index 70ec451..9738103 100644
--- a/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
+++ b/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
@@ -104,7 +104,7 @@ public class MaterializedViewLongTest extends CQLTester
                         {
                             try
                             {
-                                executeNet(protocolVersion, "INSERT INTO %s (a, b, c) VALUES
(?, ?, ?)",
+                                executeNet(protocolVersion, "INSERT INTO %s (a, b, c) VALUES
(?, ?, ?) USING TIMESTAMP 1",
                                            1,
                                            1,
                                            i + writerOffset);
@@ -144,12 +144,6 @@ public class MaterializedViewLongTest extends CQLTester
             }
         }
 
-        while (!(((SEPExecutor) StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION)).getPendingTasks()
== 0
-                 && ((SEPExecutor) StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION)).getActiveCount()
== 0))
-        {
-            Thread.sleep(1);
-        }
-
         int value = executeNet(protocolVersion, "SELECT c FROM %s WHERE a = 1 AND b = 1").one().getInt("c");
 
         List<Row> rows = executeNet(protocolVersion, "SELECT c FROM " + keyspace()
+ ".mv").all();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f00a3da8/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java b/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
index faff229..daa68e9 100644
--- a/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
@@ -35,6 +35,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.datastax.driver.core.*;
+import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import junit.framework.Assert;
 import org.apache.cassandra.concurrent.SEPExecutor;
@@ -995,4 +996,29 @@ public class MaterializedViewTest extends CQLTester
         Assert.assertEquals(1, results.size());
         Assert.assertTrue("There should be a null result given back due to ttl expiry", results.get(0).isNull(0));
     }
+
+    @Test
+    public void conflictingTimestampTest() throws Throwable
+    {
+        createTable("CREATE TABLE %s (" +
+                    "a int," +
+                    "b int," +
+                    "c int," +
+                    "PRIMARY KEY (a, b))");
+
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE c IS NOT
NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b)");
+
+        for (int i = 0; i < 50; i++)
+        {
+            updateMV("INSERT INTO %s (a, b, c) VALUES (?, ?, ?) USING TIMESTAMP 1", 1, 1,
i);
+        }
+
+        ResultSet mvRows = executeNet(protocolVersion, "SELECT c FROM mv");
+        List<Row> rows = executeNet(protocolVersion, "SELECT c FROM %s").all();
+        Assert.assertEquals("There should be exactly one row in base", 1, rows.size());
+        int expected = rows.get(0).getInt("c");
+        assertRowsNet(protocolVersion, mvRows, row(expected));
+    }
 }


Mime
View raw message