cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject cassandra git commit: In mutateMV, if not yet gossiping, write all mutations to batchlog
Date Wed, 07 Oct 2015 19:47:42 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 e77730179 -> 6c3fa8e30


In mutateMV, if not yet gossiping, write all mutations to batchlog

Patch by Joel Knighton; reviewed by tjake for CASSANDRA-10413


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c3fa8e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c3fa8e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c3fa8e3

Branch: refs/heads/cassandra-3.0
Commit: 6c3fa8e30de21aecce35032762470bfa0fb3cb5e
Parents: e777301
Author: Joel Knighton <joel.knighton@datastax.com>
Authored: Wed Sep 30 04:50:19 2015 +0000
Committer: T Jake Luciani <jake@apache.org>
Committed: Wed Oct 7 15:46:56 2015 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/view/ViewUtils.java |  2 +-
 .../apache/cassandra/service/StorageProxy.java  | 91 +++++++++++---------
 3 files changed, 52 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c3fa8e3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ba0012e..0bac64e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * If node is not yet gossiping write all MV updates to batchlog only (CASSANDRA-10413)
  * Re-populate token metadata after commit log recovery (CASSANDRA-10293)
  * Provide additional metrics for materialized views (CASSANDRA-10323)
  * Flush system schema tables after local schema changes (CASSANDRA-10429)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c3fa8e3/src/java/org/apache/cassandra/db/view/ViewUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java b/src/java/org/apache/cassandra/db/view/ViewUtils.java
index 628142d..ebbae65 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUtils.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java
@@ -94,7 +94,7 @@ public final class ViewUtils
 
             if (StorageService.instance.getTokenMetadata().pendingEndpointsFor(viewToken,
keyspaceName).size() > 0)
             {
-                //Since there are pending endpoints we are going to store hints this in the
batchlog regardless.
+                //Since there are pending endpoints we are going to write to the batchlog
regardless.
                 //So we can pretend we are the views endpoint.
 
                 return FBUtilities.getBroadcastAddress();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c3fa8e3/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index d1142fc..f210951 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -660,55 +660,64 @@ public class StorageProxy implements StorageProxyMBean
         final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
 
         long startTime = System.nanoTime();
-        List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size());
+
 
         try
         {
-            Token baseToken = StorageService.instance.getTokenMetadata().partitioner.getToken(dataKey);
-
-            ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
-
-            //Since the base -> view replication is 1:1 we only need to store the BL locally
-            final Collection<InetAddress> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddress());
+            // if we haven't joined the ring, write everything to batchlog because paired
replicas may be stale
             final UUID batchUUID = UUIDGen.getTimeUUID();
-            BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
-                                                                                        
                 () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
 
-            // add a handler for each mutation - includes checking availability, but doesn't
initiate any writes, yet
-            for (Mutation mutation : mutations)
+            if (!Gossiper.instance.isEnabled())
+            {
+                BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(),
+                                                        mutations),
+                                      writeCommitLog);
+            }
+            else
             {
-                String keyspaceName = mutation.getKeyspaceName();
-                Token tk = mutation.key().getToken();
-                InetAddress pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName,
baseToken, tk);
-                List<InetAddress> naturalEndpoints = Lists.newArrayList(pairedEndpoint);
-
-                WriteResponseHandlerWrapper wrapper = wrapViewBatchResponseHandler(mutation,
-                                                                                   consistencyLevel,
-                                                                                   consistencyLevel,
-                                                                                   naturalEndpoints,
-                                                                                   baseComplete,
-                                                                                   WriteType.BATCH,
-                                                                                   cleanup);
-
-                // When local node is the endpoint and there are no pending nodes we can
-                // Just apply the mutation locally.
-                if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) && wrapper.handler.pendingEndpoints.isEmpty())
+                List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size());
+                Token baseToken = StorageService.instance.getTokenMetadata().partitioner.getToken(dataKey);
+
+                ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
+
+                //Since the base -> view replication is 1:1 we only need to store the
BL locally
+                final Collection<InetAddress> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddress());
+                BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
+                                                                                        
                     () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
+
+                // add a handler for each mutation - includes checking availability, but
doesn't initiate any writes, yet
+                for (Mutation mutation : mutations)
                 {
-                    mutation.apply(writeCommitLog);
-                    viewWriteMetrics.viewReplicasSuccess.inc();
+                    String keyspaceName = mutation.getKeyspaceName();
+                    Token tk = mutation.key().getToken();
+                    InetAddress pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName,
baseToken, tk);
+                    List<InetAddress> naturalEndpoints = Lists.newArrayList(pairedEndpoint);
+
+                    WriteResponseHandlerWrapper wrapper = wrapViewBatchResponseHandler(mutation,
+                                                                                       consistencyLevel,
+                                                                                       consistencyLevel,
+                                                                                       naturalEndpoints,
+                                                                                       baseComplete,
+                                                                                       WriteType.BATCH,
+                                                                                       cleanup);
+
+                    // When local node is the endpoint and there are no pending nodes we
can
+                    // Just apply the mutation locally.
+                    if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) &&
wrapper.handler.pendingEndpoints.isEmpty() && StorageService.instance.isJoined())
+                        mutation.apply(writeCommitLog);
+                    else
+                        wrappers.add(wrapper);
                 }
-                else
-                    wrappers.add(wrapper);
-            }
 
-            if (!wrappers.isEmpty())
-            {
-                // Apply to local batchlog memtable in this thread
-                BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(),
Lists.transform(wrappers, w -> w.mutation)),
-                                      writeCommitLog);
+                if (!wrappers.isEmpty())
+                {
+                    // Apply to local batchlog memtable in this thread
+                    BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(),
Lists.transform(wrappers, w -> w.mutation)),
+                                          writeCommitLog);
 
-                // now actually perform the writes and wait for them to complete
-                asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.VIEW_MUTATION);
+                    // now actually perform the writes and wait for them to complete
+                    asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.VIEW_MUTATION);
+                }
             }
         }
         finally
@@ -1081,7 +1090,7 @@ public class StorageProxy implements StorageProxyMBean
      * | off            |       ANY      | --> DO NOT fire hints. And DO NOT wait for
them to complete.
      * }
      * </pre>
-     * 
+     *
      * @throws OverloadedException if the hints cannot be written/enqueued
      */
     public static void sendToHintedEndpoints(final Mutation mutation,
@@ -2250,7 +2259,7 @@ public class StorageProxy implements StorageProxyMBean
         }
 
         Set<InetAddress> allEndpoints = Gossiper.instance.getLiveTokenOwners();
-        
+
         int blockFor = allEndpoints.size();
         final TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor);
 


Mime
View raw message