geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aging...@apache.org
Subject [geode] branch develop updated: GEODE-5828: Add dunit test reproducing the issue and incorporate review changes (#2584)
Date Wed, 10 Oct 2018 19:09:17 GMT
This is an automated email from the ASF dual-hosted git repository.

agingade pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0e550cc  GEODE-5828: Add dunit test reproducing the issue and incorporate review
changes (#2584)
0e550cc is described below

commit 0e550cce06dbf3c5d0dfddffa20a6bca743179a0
Author: agingade <agingade@pivotal.io>
AuthorDate: Wed Oct 10 12:09:09 2018 -0700

    GEODE-5828: Add dunit test reproducing the issue and incorporate review changes (#2584)
    
    This ticket is already closed by the checkin db8ba67
    As part of this checkin dunit test is added reproduce the issue and verify the previous
checking fixed the issue.
    Incorporate review comments missed in the previous checkin.
---
 ...ntServerTransactionFailoverDistributedTest.java | 124 ++++++++++++++++++++-
 .../geode/internal/cache/TXCommitMessage.java      |   2 +-
 2 files changed, 124 insertions(+), 2 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
index 9f411f4..c8acb15 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
@@ -14,8 +14,12 @@
  */
 package org.apache.geode.internal.cache;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.geode.test.dunit.VM.getHostName;
 import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase.getBlackboard;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 
 import java.io.Serializable;
@@ -26,6 +30,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import org.apache.geode.cache.CacheTransactionManager;
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
@@ -38,11 +43,16 @@ import org.apache.geode.cache.client.PoolManager;
 import org.apache.geode.cache.client.internal.PoolImpl;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
 import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
 import org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.rules.CacheRule;
 import org.apache.geode.test.dunit.rules.ClientCacheRule;
@@ -113,8 +123,13 @@ public class ClientServerTransactionFailoverDistributedTest implements
Serializa
   }
 
   private int createServerRegion(int totalNumBuckets, boolean isAccessor) throws Exception
{
+    return createServerRegion(totalNumBuckets, isAccessor, 0);
+  }
+
+  private int createServerRegion(int totalNumBuckets, boolean isAccessor, int redundancy)
+      throws Exception {
     PartitionAttributesFactory factory = new PartitionAttributesFactory();
-    factory.setTotalNumBuckets(totalNumBuckets);
+    factory.setTotalNumBuckets(totalNumBuckets).setRedundantCopies(redundancy);
     if (isAccessor) {
       factory.setLocalMaxMemory(0);
     }
@@ -338,4 +353,111 @@ public class ClientServerTransactionFailoverDistributedTest implements
Serializa
       Thread.sleep(1000);
     }
   }
+
+  @Test
+  public void txCommitGetsAppliedOnAllTheReplicasAfterHostIsShutDownAndIfOneOfTheNodeHasCommitted()
+      throws Exception {
+    getBlackboard().initBlackboard();
+    VM client = server4;
+
+    port1 = server1.invoke(() -> createServerRegion(1, false, 2));
+
+    server1.invoke(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      region.put("Key-1", "Value-1");
+      region.put("Key-2", "Value-2");
+    });
+
+    port2 = server2.invoke(() -> createServerRegion(1, false, 2));
+
+    server3.invoke(() -> createServerRegion(1, false, 2));
+
+    client.invoke(() -> createClientRegion(true, port1, port2));
+
+    server1.invoke(() -> {
+      DistributionMessageObserver.setInstance(
+          new DistributionMessageObserver() {
+            @Override
+            public void beforeSendMessage(ClusterDistributionManager dm,
+                DistributionMessage message) {
+              if (message instanceof TXCommitMessage.CommitProcessForTXIdMessage) {
+                InternalDistributedMember m = message.getRecipients()[0];
+                message.resetRecipients();
+                message.setRecipient(m);
+              }
+            }
+          });
+    });
+
+    server2.invoke(() -> {
+      DistributionMessageObserver.setInstance(
+          new DistributionMessageObserver() {
+            @Override
+            public void beforeProcessMessage(ClusterDistributionManager dm,
+                DistributionMessage message) {
+              if (message instanceof TXCommitMessage.CommitProcessForTXIdMessage) {
+                getBlackboard().signalGate("bounce");
+              }
+            }
+          });
+    });
+
+    server3.invoke(() -> {
+      DistributionMessageObserver.setInstance(
+          new DistributionMessageObserver() {
+            @Override
+            public void beforeProcessMessage(ClusterDistributionManager dm,
+                DistributionMessage message) {
+              if (message instanceof TXCommitMessage.CommitProcessForTXIdMessage) {
+                getBlackboard().signalGate("bounce");
+              }
+            }
+          });
+    });
+
+    AsyncInvocation clientAsync = client.invokeAsync(() -> {
+      {
+        CacheTransactionManager transactionManager =
+            clientCacheRule.getClientCache().getCacheTransactionManager();
+        Region region = clientCacheRule.getClientCache().getRegion(regionName);
+        transactionManager.begin();
+        region.put("TxKey-1", "TxValue-1");
+        region.put("TxKey-2", "TxValue-2");
+        transactionManager.commit();
+      }
+    });
+
+    await().atMost(60, SECONDS).until(() -> getBlackboard().isGateSignaled("bounce"));
+    server1.invoke(() -> {
+      DistributionMessageObserver.setInstance(null);
+    });
+    server1.bounceForcibly();
+
+    clientAsync.join();
+
+    server2.invoke(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      assertThat(region.get("TxKey-1")).isEqualTo("TxValue-1");
+      assertThat(region.get("TxKey-2")).isEqualTo("TxValue-2");
+    });
+
+    server3.invoke(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      assertThat(region.get("TxKey-1")).isEqualTo("TxValue-1");
+      assertThat(region.get("TxKey-2")).isEqualTo("TxValue-2");
+    });
+
+    client.invoke(() -> {
+      Region region = clientCacheRule.getClientCache().getRegion(regionName);
+      assertThat(region.get("TxKey-1")).isEqualTo("TxValue-1");
+      assertThat(region.get("TxKey-2")).isEqualTo("TxValue-2");
+    });
+
+    server2.invoke(() -> {
+      DistributionMessageObserver.setInstance(null);
+    });
+    server3.invoke(() -> {
+      DistributionMessageObserver.setInstance(null);
+    });
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index c88a086..4e0b896 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -1985,7 +1985,7 @@ public class TXCommitMessage extends PooledDistributionMessage
     if (!getSender().equals(id)) {
       return;
     }
-    getDistributionManager().removeMembershipListener(this);
+    distributionManager.removeMembershipListener(this);
 
     synchronized (this) {
       if (isProcessing() || this.departureNoticed) {


Mime
View raw message