cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] cassandra git commit: Re-apply MV updates on commitlog replay
Date Tue, 01 Sep 2015 19:31:41 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 78dcf79c7 -> 17f8788ef


Re-apply MV updates on commitlog replay

patch by tjake; reviewed by carlyeks for CASSANDRA-10164


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b5868685
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b5868685
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b5868685

Branch: refs/heads/trunk
Commit: b58686858c632ed642ccf355f1f3a588e28b0e8a
Parents: 9c02625
Author: T Jake Luciani <jake@apache.org>
Authored: Thu Aug 27 13:28:04 2015 -0400
Committer: T Jake Luciani <jake@apache.org>
Committed: Tue Sep 1 15:30:20 2015 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Keyspace.java  | 29 +++++++++++------
 .../db/commitlog/CommitLogReplayer.java         | 13 +++++++-
 .../db/view/MaterializedViewBuilder.java        | 13 +-------
 .../db/view/MaterializedViewManager.java        |  8 ++---
 .../apache/cassandra/service/StorageProxy.java  | 33 +++++++-------------
 .../cassandra/streaming/StreamReceiveTask.java  |  8 +++--
 .../cassandra/cql3/MaterializedViewTest.java    | 26 +++++++++++++++
 8 files changed, 79 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6539792..88b99a2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-beta2
+ * Re-apply MaterializedView updates on commitlog replay (CASSANDRA-10164)
  * Require AbstractType.isByteOrderComparable declaration in constructor (CASSANDRA-9901)
  * Avoid digest mismatch on upgrade to 3.0 (CASSANDRA-9554)
  * Fix Materialized View builder when adding multiple MVs (CASSANDRA-10156)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index f5a047f..981209c 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -386,7 +386,17 @@ public class Keyspace
 
     public void apply(Mutation mutation, boolean writeCommitLog)
     {
-        apply(mutation, writeCommitLog, true);
+        apply(mutation, writeCommitLog, true, false);
+    }
+
+    public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
+    {
+        apply(mutation, writeCommitLog, updateIndexes, false);
+    }
+
+    public void applyFromCommitLog(Mutation mutation)
+    {
+        apply(mutation, false, true, true);
     }
 
     /**
@@ -396,8 +406,9 @@ public class Keyspace
      *                       may happen concurrently, depending on the CL Executor type.
      * @param writeCommitLog false to disable commitlog append entirely
      * @param updateIndexes  false to disable index updates (used by CollationController
"defragmenting")
+     * @param isClReplay     true if caller is the commitlog replayer
      */
-    public void apply(final Mutation mutation, final boolean writeCommitLog, boolean updateIndexes)
+    public void apply(final Mutation mutation, final boolean writeCommitLog, boolean updateIndexes,
boolean isClReplay)
     {
         if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
             throw new RuntimeException("Testing write failures");
@@ -456,15 +467,15 @@ public class Keyspace
                 {
                     try
                     {
-                        Tracing.trace("Create materialized view mutations from replica");
-                        cfs.materializedViewManager.pushViewReplicaUpdates(upd);
+                        Tracing.trace("Creating materialized view mutations from base table
replica");
+                        cfs.materializedViewManager.pushViewReplicaUpdates(upd, !isClReplay);
                     }
-                    catch (Exception e)
+                    catch (Throwable t)
                     {
-                        if (!(e instanceof WriteTimeoutException))
-                            logger.warn("Encountered exception when creating materialized
view mutations", e);
-
-                        JVMStabilityInspector.inspectThrowable(e);
+                        JVMStabilityInspector.inspectThrowable(t);
+                        logger.error(String.format("Unknown exception caught while attempting
to update MaterializedView! %s.%s",
+                                     upd.metadata().ksName, upd.metadata().cfName), t);
+                        throw t;
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 93c3026..4f50008 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -161,8 +161,19 @@ public class CommitLogReplayer
 
         // flush replayed keyspaces
         futures.clear();
+        boolean flushingSystem = false;
         for (Keyspace keyspace : keyspacesRecovered)
+        {
+            if (keyspace.getName().equals(SystemKeyspace.NAME))
+                flushingSystem = true;
+
             futures.addAll(keyspace.flush());
+        }
+
+        // also flush batchlog incase of any MV updates
+        if (!flushingSystem)
+            futures.add(Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceFlush());
+
         FBUtilities.waitOnFutures(futures);
         return replayedCount.get();
     }
@@ -594,7 +605,7 @@ public class CommitLogReplayer
                 if (newMutation != null)
                 {
                     assert !newMutation.isEmpty();
-                    Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
+                    Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation);
                     keyspacesRecovered.add(keyspace);
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
index 6083634..e23fd84 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
@@ -95,18 +95,7 @@ public class MaterializedViewBuilder extends CompactionInfo.Holder
                    Collection<Mutation> mutations = view.createMutations(partition,
temporalRows, true);
 
                    if (mutations != null)
-                   {
-                       try
-                       {
-                           StorageProxy.mutateMV(key.getKey(), mutations);
-                           break;
-                       }
-                       catch (WriteTimeoutException ex)
-                       {
-                           NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES)
-                                       .warn("Encountered write timeout when building materialized
view {}, the entries were stored in the batchlog and will be replayed at another time", view.name);
-                       }
-                   }
+                       StorageProxy.mutateMV(key.getKey(), mutations, true);
                }
            }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
index e0cecf5..ac6a256 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
@@ -151,12 +151,8 @@ public class MaterializedViewManager
      * Calculates and pushes updates to the views replicas. The replicas are determined by
      * {@link MaterializedViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
      */
-    public void pushViewReplicaUpdates(PartitionUpdate update) throws UnavailableException,
OverloadedException, WriteTimeoutException
+    public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog)
     {
-        // This happens when we are replaying from commitlog. In that case, we have already
sent this commit off to the
-        // view node.
-        if (!StorageService.instance.isJoined()) return;
-
         List<Mutation> mutations = null;
         TemporalRow.Set temporalRows = null;
         for (Map.Entry<String, MaterializedView> view : viewsByName.entrySet())
@@ -174,7 +170,7 @@ public class MaterializedViewManager
         }
         if (mutations != null)
         {
-            StorageProxy.mutateMV(update.partitionKey().getKey(), mutations);
+            StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 25789bb..4952959 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -652,8 +652,7 @@ public class StorageProxy implements StorageProxyMBean
      *
      * @param mutations the mutations to be applied across the replicas
      */
-    public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations)
-    throws UnavailableException, OverloadedException, WriteTimeoutException
+    public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations,
boolean writeCommitLog)
     {
         Tracing.trace("Determining replicas for mutation");
         final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
@@ -693,7 +692,10 @@ public class StorageProxy implements StorageProxyMBean
                 if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) &&
                     wrapper.handler.pendingEndpoints.isEmpty())
                 {
-                    mutation.apply();
+                    if (writeCommitLog)
+                        mutation.apply();
+                    else
+                        mutation.applyUnsafe();
                 }
                 else
                 {
@@ -703,31 +705,18 @@ public class StorageProxy implements StorageProxyMBean
 
             if (!wrappers.isEmpty())
             {
+                Mutation blMutation = BatchlogManager.getBatchlogMutationFor(Lists.transform(wrappers,
w -> w.mutation), batchUUID, MessagingService.current_version);
+
                 //Apply to local batchlog memtable in this thread
-                BatchlogManager.getBatchlogMutationFor(Lists.transform(wrappers, w ->
w.mutation), batchUUID, MessagingService.current_version).apply();
+                if (writeCommitLog)
+                    blMutation.apply();
+                else
+                    blMutation.applyUnsafe();
 
                 // now actually perform the writes and wait for them to complete
                 asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
             }
         }
-        catch (WriteTimeoutException ex)
-        {
-            mvWriteMetrics.timeouts.mark();
-            Tracing.trace("Write timeout; received {} of {} required replies", ex.received,
ex.blockFor);
-            throw ex;
-        }
-        catch (UnavailableException e)
-        {
-            mvWriteMetrics.unavailables.mark();
-            Tracing.trace("Unavailable");
-            throw e;
-        }
-        catch (OverloadedException e)
-        {
-            mvWriteMetrics.unavailables.mark();
-            Tracing.trace("Overloaded");
-            throw e;
-        }
         finally
         {
             mvWriteMetrics.addNano(System.nanoTime() - startTime);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 52c8884..cb99654 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -146,7 +146,7 @@ public class StreamReceiveTask extends StreamTask
                     //We have a special path for Materialized view.
                     //Since the MV requires cleaning up any pre-existing state, we must put
                     //all partitions through the same write path as normal mutations.
-                    //This also ensures any 2is are also updated
+                    //This also ensures any 2i's are also updated
                     if (hasMaterializedViews)
                     {
                         for (SSTableReader reader : readers)
@@ -157,7 +157,8 @@ public class StreamReceiveTask extends StreamTask
                                 {
                                     try (UnfilteredRowIterator rowIterator = scanner.next())
                                     {
-                                        new Mutation(PartitionUpdate.fromIterator(rowIterator)).apply();
+                                        //Apply unsafe (we will flush below before transaction
is done)
+                                        new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
                                     }
                                 }
                             }
@@ -183,7 +184,10 @@ public class StreamReceiveTask extends StreamTask
                     //We don't keep the streamed sstables since we've applied them manually
                     //So we abort the txn and delete the streamed sstables
                     if (hasMaterializedViews)
+                    {
+                        cfs.forceBlockingFlush();
                         task.txn.abort();
+                    }
                 }
             }
             finally

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java b/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
index daa68e9..7d08a8b 100644
--- a/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
@@ -292,6 +292,32 @@ public class MaterializedViewTest extends CQLTester
     }
 
     @Test
+    public void testBuilderWidePartition() throws Throwable
+    {
+        createTable("CREATE TABLE %s (" +
+                    "k int, " +
+                    "c int, " +
+                    "intval int, " +
+                    "PRIMARY KEY (k, c))");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+
+        for(int i = 0; i < 1024; i++)
+            execute("INSERT INTO %s (k, c, intval) VALUES (?, ?, ?)", 0, i, 0);
+
+        createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT
NULL AND c IS NOT NULL AND intval IS NOT NULL PRIMARY KEY (intval, c, k)");
+
+
+        while (!SystemKeyspace.isViewBuilt(keyspace(), "mv"))
+            Thread.sleep(1000);
+
+        assertRows(execute("SELECT count(*) from %s WHERE k = ?", 0), row(1024L));
+        assertRows(execute("SELECT count(*) from mv WHERE intval = ?", 0), row(1024L));
+    }
+
+    @Test
     public void testRangeTombstone() throws Throwable
     {
         createTable("CREATE TABLE %s (" +


Mime
View raw message