geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bschucha...@apache.org
Subject [geode] branch develop updated: GEODE-4928 DistributedLockService doesn't work as expected while the dlock grantor is initialized
Date Fri, 23 Mar 2018 23:57:33 GMT
This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new f3b47a5  GEODE-4928 DistributedLockService doesn't work as expected while the dlock
grantor is initialized
f3b47a5 is described below

commit f3b47a5a8ba65fbcf0915e94da3ef3683962c43a
Author: Bruce Schuchardt <bschuchardt@pivotal.io>
AuthorDate: Fri Mar 23 16:56:35 2018 -0700

    GEODE-4928 DistributedLockService doesn't work as expected while the dlock grantor is
initialized
    
    The lock service cleans up transaction locks in a background thread but
    cleans up regular dlocks in its membership listener.  This allows you to
    get a dlock as soon as a member holding that lock leaves the distributed
    system, but its transaction locks stick around for a while.
    
    The fix for this is to wait for the background processing to complete before
    acquiring locks and checking for conflicts.
---
 .../distributed/internal/locks/DLockGrantor.java   |   6 +
 .../locks/DLockLessorDepartureHandler.java         |   2 +
 .../geode/internal/cache/TXFarSideCMTracker.java   |  11 +-
 .../cache/locks/TXLessorDepartureHandler.java      |  34 +++-
 .../internal/DlockAndTxlockRegressionTest.java     | 213 +++++++++++++++++++++
 .../apache/geode/test/dunit/DUnitBlackboard.java   |   2 +-
 .../test/dunit/internal/InternalBlackboard.java    |   2 +-
 .../geode/test/dunit/standalone/DUnitLauncher.java |   2 +-
 8 files changed, 261 insertions(+), 11 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
index d49cd62..84eab2a 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
@@ -482,6 +482,12 @@ public class DLockGrantor {
           logger.trace(LogMarker.DLS, "[DLockGrantor.handleLockBatch] request: {}", request);
         }
 
+        DLockLessorDepartureHandler handler = this.dlock.getDLockLessorDepartureHandler();
+        // make sure the tx locks of departed members have been cleared so we don't have
+        // conflicts with non-existent members. This is done in a waiting-pool thread launched
+        // when the member-departure is announced.
+        handler.waitForInProcessDepartures();
+
         DLockBatch batch = (DLockBatch) request.getObjectName();
         this.resMgr.makeReservation((IdentityArrayList) batch.getReqs());
         if (isDebugEnabled_DLS) {
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockLessorDepartureHandler.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockLessorDepartureHandler.java
index 64280b2..2178a38 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockLessorDepartureHandler.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockLessorDepartureHandler.java
@@ -28,4 +28,6 @@ public interface DLockLessorDepartureHandler {
 
   void handleDepartureOf(InternalDistributedMember owner, DLockGrantor grantor);
 
+  public void waitForInProcessDepartures() throws InterruptedException;
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java
index 1876973..8f2093f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java
@@ -140,14 +140,21 @@ public class TXFarSideCMTracker {
   public void waitForAllToProcess() throws InterruptedException {
     if (Thread.interrupted())
       throw new InterruptedException(); // wisest to do this before the synchronize below
-    // Assume that a thread interrupt is only sent in the
+    // Assume that a thread interrupt is only set in the
     // case of a shutdown, in that case we don't need to wait
-    // around any longer, propigating the interrupt is reasonable behavior
+    // around any longer, propagating the interrupt is reasonable behavior
+    boolean messageWritten = false;
     synchronized (this.txInProgress) {
       while (!this.txInProgress.isEmpty()) {
+        logger.info("Lock grantor recovery is waiting for transactions to complete: {}",
+            txInProgress);
+        messageWritten = true;
         this.txInProgress.wait();
       }
     }
+    if (messageWritten) {
+      logger.info("Wait for transactions completed");
+    }
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLessorDepartureHandler.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLessorDepartureHandler.java
index c34df64..53f998e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLessorDepartureHandler.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLessorDepartureHandler.java
@@ -36,6 +36,18 @@ import org.apache.geode.internal.logging.LogService;
 public class TXLessorDepartureHandler implements DLockLessorDepartureHandler {
   private static final Logger logger = LogService.getLogger();
 
+  private final Object stateLock = new Object();
+  private boolean processingDepartures;
+
+  @Override
+  public void waitForInProcessDepartures() throws InterruptedException {
+    synchronized (stateLock) {
+      while (processingDepartures) {
+        stateLock.wait();
+      }
+    }
+  }
+
   public void handleDepartureOf(InternalDistributedMember owner, DLockGrantor grantor) {
     // get DTLS
     TXLockService dtls = TXLockService.getDTLS();
@@ -74,12 +86,22 @@ public class TXLessorDepartureHandler implements DLockLessorDepartureHandler
{
     try {
       dm.getWaitingThreadPool().execute(new Runnable() {
         public void run() {
-          for (int i = 0; i < batches.length; i++) {
-            TXLockBatch batch = (TXLockBatch) batches[i];
-            // send TXOriginatorDepartureMessage
-            Set participants = batch.getParticipants();
-            TXOriginatorRecoveryProcessor.sendMessage(participants, owner, batch.getTXLockId(),
-                grantor, dm);
+          synchronized (stateLock) {
+            processingDepartures = true;
+          }
+          try {
+            for (int i = 0; i < batches.length; i++) {
+              TXLockBatch batch = (TXLockBatch) batches[i];
+              // send TXOriginatorDepartureMessage
+              Set participants = batch.getParticipants();
+              TXOriginatorRecoveryProcessor.sendMessage(participants, owner, batch.getTXLockId(),
+                  grantor, dm);
+            }
+          } finally {
+            synchronized (stateLock) {
+              processingDepartures = false;
+              stateLock.notifyAll();
+            }
           }
         }
       });
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/DlockAndTxlockRegressionTest.java
b/geode-core/src/test/java/org/apache/geode/distributed/internal/DlockAndTxlockRegressionTest.java
new file mode 100644
index 0000000..27d3903
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/DlockAndTxlockRegressionTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.distributed.internal;
+
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+import org.awaitility.Awaitility;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.distributed.LockServiceDestroyedException;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.DistributedTestUtils;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.standalone.DUnitLauncher;
+import org.apache.geode.test.junit.categories.DLockTest;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category({DLockTest.class, DistributedTest.class})
+public class DlockAndTxlockRegressionTest extends JUnit4CacheTestCase {
+  private static final Logger logger = LogService.getLogger();
+  public static final String TRANSACTION_COUNT = "transactionCount";
+
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
+
+  @Override
+  public Properties getDistributedSystemProperties() {
+    Properties properties = super.getDistributedSystemProperties();
+    properties.setProperty(ConfigurationProperties.DISABLE_AUTO_RECONNECT, "true");
+    properties.setProperty(ConfigurationProperties.MEMBER_TIMEOUT, "1000");
+    properties.setProperty(ConfigurationProperties.NAME,
+        "vm" + Integer.getInteger(DUnitLauncher.VM_NUM_PARAM));
+    System.getProperties().remove("gemfire.member-timeout");
+    System.getProperties().remove("gemfire.log-level");
+    return properties;
+  }
+
+  /**
+   * Distributed locks are released quickly when a server crashes but transaction locks are
+   * released in a background "pooled waiting" thread because the release involves communicating
+   * with participants of the transaction. This makes the pattern of<br>
+   * 1. get dlock,<br>
+   * 2. perform transaction<br>
+   * sometimes fail if the background cleanup takes too long. You may get the dlock but then
get a
+   * CommitConflictException when committing the transaction due to lingering tx locks from
the
+   * crashed server. The fix makes tx lock acquisition wait for the cleanup to finish.
+   */
+  @Test
+  public void testDLockProtectsAgainstTransactionConflict() throws Exception {
+    IgnoredException
+        .addIgnoredException("DistributedSystemDisconnectedException|ForcedDisconnectException");
+    // create four nodes to perform dlock & transactions and then
+    // kill & restart each one using a forced disconnect.
+    Host host = Host.getHost(0);
+    VM[] servers = new VM[] {host.getVM(0), host.getVM(1), host.getVM(2)};
+    for (VM vm : servers) {
+      vm.invoke(() -> createCacheAndRegion());
+    }
+
+    servers[0].invoke(new SerializableRunnable() {
+      public void run() {
+        becomeLockGrantor();
+      }
+    });
+
+    AsyncInvocation[] asyncInvocations = new AsyncInvocation[servers.length];
+    for (int i = 0; i < servers.length; i++) {
+      asyncInvocations[i] = servers[i].invokeAsync(() -> performOps());
+    }
+
+    // this test uses the DUnit blackboard to coordinate actions between JVMs
+    getBlackboard().initBlackboard();
+    getBlackboard().setMailbox(TRANSACTION_COUNT, 0);
+
+    try {
+
+      for (int i = 0; i < servers.length; i++) {
+        checkAsyncInvocations(asyncInvocations);
+
+        // clobber the current lock grantor
+        VM vm = servers[i];
+        vm.invoke("force disconnect", () -> forceDisconnect());
+        asyncInvocations[i].join();
+        vm.invoke("create cache", () -> createCacheAndRegion());
+        asyncInvocations[i] = vm.invokeAsync(() -> performOps());
+
+        // move the grantor into the next VM to be clobbered
+        int nextServer = (i + 1) % (servers.length - 1);
+        logger.info("moving the lock grantor to vm " + nextServer);
+        servers[nextServer].invoke("become lock grantor", () -> becomeLockGrantor());
+
+        int txCount = getBlackboard().getMailbox(TRANSACTION_COUNT);
+        int newTxCount = txCount + 10;
+        Awaitility.await("check for new transactions").atMost(10, TimeUnit.SECONDS).until(()
-> {
+          checkAsyncInvocations(asyncInvocations);
+          int newCount = getBlackboard().getMailbox(TRANSACTION_COUNT);
+          return newCount >= newTxCount;
+        });
+      }
+
+    } finally {
+
+      for (VM vm : servers) {
+        vm.invoke(() -> closeCache());
+      }
+
+      Throwable failure = null;
+      for (AsyncInvocation asyncInvocation : asyncInvocations) {
+        asyncInvocation.join();
+        if (asyncInvocation.exceptionOccurred()) {
+          failure = asyncInvocation.getException();
+        }
+      }
+      if (failure != null) {
+        throw new RuntimeException("test failed", failure);
+      }
+    }
+  }
+
+  private void checkAsyncInvocations(AsyncInvocation[] asyncInvocations) {
+    for (AsyncInvocation asyncInvocation : asyncInvocations) {
+      if (!asyncInvocation.isAlive() && asyncInvocation.exceptionOccurred()) {
+        throw new RuntimeException("", asyncInvocation.getException());
+      }
+    }
+  }
+
+  public void forceDisconnect() {
+    DistributedTestUtils.crashDistributedSystem(getCache().getDistributedSystem());
+  }
+
+  public void createCacheAndRegion() {
+    Cache cache = getCache();
+    cache.createRegionFactory(RegionShortcut.REPLICATE).setConcurrencyChecksEnabled(false)
+        .create("TestRegion");
+    DistributedLockService dlockService =
+        DistributedLockService.create("Bulldog", cache.getDistributedSystem());
+  }
+
+  public void becomeLockGrantor() {
+    DistributedLockService dlockService = DistributedLockService.getServiceNamed("Bulldog");
+    dlockService.becomeLockGrantor();
+  }
+
+  public void performOps() {
+    Cache cache = getCache();
+    Region region = cache.getRegion("TestRegion");
+    DistributedLockService dlockService = DistributedLockService.getServiceNamed("Bulldog");
+    Random random = new Random();
+
+    while (!cache.isClosed()) {
+      boolean locked = false;
+      try {
+        locked = dlockService.lock("testDLock", 30_000, -1);
+        if (!locked) {
+          // this could happen if we're starved out for 30sec by other VMs
+          continue;
+        }
+
+        cache.getCacheTransactionManager().begin();
+
+        region.put("TestKey", "TestValue" + random.nextInt(100000));
+
+        try {
+          cache.getCacheTransactionManager().commit();
+        } catch (CommitConflictException e) {
+          throw new RuntimeException("dlock failed to prevent a transaction conflict", e);
+        }
+
+        int txCount = getBlackboard().getMailbox(TRANSACTION_COUNT);
+        getBlackboard().setMailbox(TRANSACTION_COUNT, txCount + 1);
+
+      } catch (CancelException | LockServiceDestroyedException e) {
+        // okay to ignore
+      } finally {
+        if (locked) {
+          dlockService.unlock("testDLock");
+        }
+      }
+    }
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/DUnitBlackboard.java b/geode-core/src/test/java/org/apache/geode/test/dunit/DUnitBlackboard.java
index ec02d4b..4bdddfe 100755
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/DUnitBlackboard.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/DUnitBlackboard.java
@@ -118,7 +118,7 @@ public class DUnitBlackboard {
   /**
    * retrieve an object from a mailbox slot
    */
-  public Object getMailbox(String boxName) {
+  public <T> T getMailbox(String boxName) {
     try {
       return blackboard.getMailbox(boxName);
     } catch (RemoteException e) {
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/internal/InternalBlackboard.java
b/geode-core/src/test/java/org/apache/geode/test/dunit/internal/InternalBlackboard.java
index bc5b9b7..24abf4f 100755
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/internal/InternalBlackboard.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/internal/InternalBlackboard.java
@@ -67,7 +67,7 @@ public interface InternalBlackboard extends Remote, Serializable {
   /**
    * retrieve an object from a mailbox slot
    */
-  Object getMailbox(String boxName) throws RemoteException;
+  <T> T getMailbox(String boxName) throws RemoteException;
 
   /**
    * ping the blackboard to make sure it's there
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/standalone/DUnitLauncher.java
b/geode-core/src/test/java/org/apache/geode/test/dunit/standalone/DUnitLauncher.java
index ca6a623..fa723d5 100644
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/standalone/DUnitLauncher.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/standalone/DUnitLauncher.java
@@ -124,7 +124,7 @@ public class DUnitLauncher {
   static final String MASTER_PARAM = "DUNIT_MASTER";
 
   public static final String RMI_PORT_PARAM = GEMFIRE_PREFIX + "DUnitLauncher.RMI_PORT";
-  static final String VM_NUM_PARAM = GEMFIRE_PREFIX + "DUnitLauncher.VM_NUM";
+  public static final String VM_NUM_PARAM = GEMFIRE_PREFIX + "DUnitLauncher.VM_NUM";
   static final String VM_VERSION_PARAM = GEMFIRE_PREFIX + "DUnitLauncher.VM_VERSION";
 
   private static final String LAUNCHED_PROPERTY = GEMFIRE_PREFIX + "DUnitLauncher.LAUNCHED";

-- 
To stop receiving notification emails like this one, please contact
bschuchardt@apache.org.

Mime
View raw message