cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/3] cassandra git commit: Only use batchlog when paired materialized view replica is remote
Date Mon, 24 Aug 2015 17:50:34 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk d200822e6 -> 5b4393694


Only use batchlog when paired materialized view replica is remote

Patch by tjake; reviewed by carl for CASSANDRA-10061


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e7693242
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e7693242
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e7693242

Branch: refs/heads/trunk
Commit: e769324220ccfb2e48063d639e378a8a34814651
Parents: 1fc3121
Author: T Jake Luciani <jake@apache.org>
Authored: Wed Aug 12 15:55:29 2015 -0400
Committer: T Jake Luciani <jake@apache.org>
Committed: Mon Aug 24 13:49:11 2015 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/service/StorageProxy.java  | 29 ++++++++++++++++----
 2 files changed, 24 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7693242/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9cfbd64..2695233 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-beta2
+ * Only use batchlog when paired materialized view replica is remote (CASSANDRA-10061)
  * Reuse TemporalRow when updating multiple MaterializedViews (CASSANDRA-10060)
  * Validate gc_grace_seconds for batchlog writes and MVs (CASSANDRA-9917)
  * Fix sstablerepairedset (CASSANDRA-10132)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7693242/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index f58ac56..25789bb 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -24,9 +24,12 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.*;
@@ -675,7 +678,8 @@ public class StorageProxy implements StorageProxyMBean
             {
                 String keyspaceName = mutation.getKeyspaceName();
                 Token tk = mutation.key().getToken();
-                List<InetAddress> naturalEndpoints = Lists.newArrayList(MaterializedViewUtils.getViewNaturalEndpoint(keyspaceName,
baseToken, tk));
+                InetAddress pairedEndpoint = MaterializedViewUtils.getViewNaturalEndpoint(keyspaceName,
baseToken, tk);
+                List<InetAddress> naturalEndpoints = Lists.newArrayList(pairedEndpoint);
 
                 WriteResponseHandlerWrapper wrapper = wrapMVBatchResponseHandler(mutation,
                                                                                  consistencyLevel,
@@ -684,14 +688,27 @@ public class StorageProxy implements StorageProxyMBean
                                                                                  WriteType.BATCH,
                                                                                  cleanup);
 
-                wrappers.add(wrapper);
+                //When local node is the endpoint and there are no pending nodes we can
+                // Just apply the mutation locally.
+                if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) &&
+                    wrapper.handler.pendingEndpoints.isEmpty())
+                {
+                    mutation.apply();
+                }
+                else
+                {
+                    wrappers.add(wrapper);
+                }
             }
 
-            //Apply to local batchlog memtable in this thread
-            BatchlogManager.getBatchlogMutationFor(mutations, batchUUID, MessagingService.current_version).apply();
+            if (!wrappers.isEmpty())
+            {
+                //Apply to local batchlog memtable in this thread
+                BatchlogManager.getBatchlogMutationFor(Lists.transform(wrappers, w ->
w.mutation), batchUUID, MessagingService.current_version).apply();
 
-            // now actually perform the writes and wait for them to complete
-            asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
+                // now actually perform the writes and wait for them to complete
+                asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
+            }
         }
         catch (WriteTimeoutException ex)
         {


Mime
View raw message