geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esh...@apache.org
Subject incubator-geode git commit: GEODE-2077: Throw appropriate exceptions when get op in a transaction failed instead of return null.
Date Mon, 07 Nov 2016 22:56:37 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/develop 02d962c20 -> 89c522ad6


GEODE-2077: Throw appropriate exceptions when get op in a transaction failed instead of return
null.

Also added a dunit test which fails without the fix.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/89c522ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/89c522ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/89c522ad

Branch: refs/heads/develop
Commit: 89c522ad6ff75766c6ee80d0c45e995d571a57f2
Parents: 02d962c
Author: eshu <eshu@pivotal.io>
Authored: Mon Nov 7 14:45:50 2016 -0800
Committer: eshu <eshu@pivotal.io>
Committed: Mon Nov 7 14:45:50 2016 -0800

----------------------------------------------------------------------
 .../geode/internal/cache/PartitionedRegion.java |  24 ++-
 .../apache/geode/disttx/PRDistTXDUnitTest.java  |   4 +
 .../disttx/PRDistTXWithVersionsDUnitTest.java   |   4 +
 .../cache/execute/PRColocationDUnitTest.java    |   6 +-
 .../cache/execute/PRTransactionDUnitTest.java   | 171 ++++++++++++++++++-
 5 files changed, 199 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/89c522ad/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 8f67e25..96c58d5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -3965,7 +3965,16 @@ public class PartitionedRegion extends LocalRegion
         if (allowRetry) {
           retryNode = getNodeForBucketReadOrLoad(bucketId);
         } else {
-          return null;
+          // Only transactions set allowRetry to false,
+          // fail the transaction here as region is destroyed.
+          Throwable cause = pde.getCause();
+          if (cause != null && cause instanceof RegionDestroyedException) {
+            throw (RegionDestroyedException) cause;
+          } else {
+            // Should not see it currently, all current constructors of PRLocallyDestroyedException
+            // set the cause to RegionDestroyedException.
+            throw new RegionDestroyedException(toString(), getFullPath());
+          }
         }
       } catch (ForceReattemptException prce) {
         prce.checkKey(key);
@@ -3990,6 +3999,7 @@ public class PartitionedRegion extends LocalRegion
             retryTime.waitToRetryNode();
           }
         } else {
+          // with transaction
           if (prce instanceof BucketNotFoundException) {
             TransactionException ex = new TransactionDataNotColocatedException(
                 LocalizedStrings.PartitionedRegion_KEY_0_NOT_COLOCATED_WITH_TRANSACTION
@@ -4002,8 +4012,18 @@ public class PartitionedRegion extends LocalRegion
             throw (PrimaryBucketException) cause;
           } else if (cause instanceof TransactionDataRebalancedException) {
             throw (TransactionDataRebalancedException) cause;
+          } else if (cause instanceof RegionDestroyedException) {
+            TransactionException ex = new TransactionDataRebalancedException(
+                LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING
+                    .toLocalizedString(key));
+            ex.initCause(cause);
+            throw ex;
           } else {
-            return null;
+            // Make transaction fail so client could retry
+            // instead of returning null if ForceReattemptException is thrown.
+            // Should not see it currently, added to be protected against future changes.
+            TransactionException ex = new TransactionException("Failed to get key: " + key,
prce);
+            throw ex;
           }
         }
       } catch (PrimaryBucketException notPrimary) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/89c522ad/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
index 024776a..150fe33 100644
--- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
@@ -39,6 +39,10 @@ public class PRDistTXDUnitTest extends PRTransactionDUnitTest {
   @Test
   public void testTxWithNonColocatedGet() {}
 
+  @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply
to disttx.")
+  @Test
+  public void testTxWithGetOnMovedBucket() {}
+
   @Ignore("[DISTTX] TODO test overridden and intentionally left blank as they fail.")
   @Test
   public void testBasicPRTransactionRedundancy0() {}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/89c522ad/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
index aefb581..8ff1d94 100644
--- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
@@ -39,6 +39,10 @@ public class PRDistTXWithVersionsDUnitTest extends PRTransactionWithVersionsDUni
   @Test
   public void testTxWithNonColocatedGet() {}
 
+  @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply
to disttx.")
+  @Test
+  public void testTxWithGetOnMovedBucket() {}
+
   @Ignore("[DISTTX] TODO test overridden and intentionally left blank as they fail.")
   @Override
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/89c522ad/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java
index bedb1d4..b018bfd 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java
@@ -2371,10 +2371,14 @@ public class PRColocationDUnitTest extends JUnit4CacheTestCase {
   }
 
   public static void putOrderPartitionedRegion(String partitionedRegionName) {
+    putOrderPartitionedRegion(partitionedRegionName, 10);
+  }
+
+  public static void putOrderPartitionedRegion(String partitionedRegionName, int numOfCust)
{
     assertNotNull(basicGetCache());
     Region partitionedregion = basicGetCache().getRegion(Region.SEPARATOR + partitionedRegionName);
     assertNotNull(partitionedregion);
-    for (int i = 1; i <= 10; i++) {
+    for (int i = 1; i <= numOfCust; i++) {
       CustId custid = new CustId(i);
       for (int j = 1; j <= 10; j++) {
         int oid = (i * 10) + j;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/89c522ad/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
index 37ea4e5..2122960 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
@@ -39,13 +39,17 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.Region.Entry;
 import org.apache.geode.cache.TransactionDataNotColocatedException;
 import org.apache.geode.cache.TransactionDataRebalancedException;
+import org.apache.geode.cache.TransactionId;
 import org.apache.geode.cache.execute.Execution;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
 import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.internal.NanoTimer;
 import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.execute.data.CustId;
@@ -341,8 +345,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
 
     setAttributes(CustomerPartitionedRegionName, null);
 
-    dataStore1.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects);
-    dataStore2.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects);
+    createPRInTwoNodes();
 
     // Put the customer 1-2 in CustomerPartitionedRegion
     dataStore1.invoke(
@@ -404,17 +407,14 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
     }
   }
 
-  @SuppressWarnings("unchecked")
+
   private void performGetTx() {
     PartitionedRegion pr = (PartitionedRegion) basicGetCache()
         .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
     CacheTransactionManager mgr = pr.getCache().getCacheTransactionManager();
     CustId cust1 = new CustId(1);
     CustId cust2 = new CustId(2);
-    int bucketId1 = pr.getKeyInfo(cust1).getBucketId();
-    List<Integer> localPrimaryBucketList = pr.getLocalPrimaryBucketsListTestOnly();
-    assertTrue(localPrimaryBucketList.size() == 1);
-    boolean isCust1Local = (Integer) localPrimaryBucketList.get(0) == bucketId1;
+    boolean isCust1Local = isCust1Local(pr, cust1);
 
     // touch first get on remote node -- using TXStateStub
     Assertions.assertThatThrownBy(() -> getTx(!isCust1Local, mgr, pr, cust1, cust2))
@@ -425,6 +425,14 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
         .isInstanceOf(TransactionDataNotColocatedException.class);
   }
 
+  @SuppressWarnings("unchecked")
+  private boolean isCust1Local(PartitionedRegion pr, CustId cust1) {
+    int bucketId1 = pr.getKeyInfo(cust1).getBucketId();
+    List<Integer> localPrimaryBucketList = pr.getLocalPrimaryBucketsListTestOnly();
+    assertTrue(localPrimaryBucketList.size() == 1);
+    return (Integer) localPrimaryBucketList.get(0) == bucketId1;
+  }
+
   private void getTx(boolean doCust1First, CacheTransactionManager mgr, PartitionedRegion
pr,
       CustId cust1, CustId cust2) {
     CustId first = doCust1First ? cust1 : cust2;
@@ -445,12 +453,161 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
     }
   }
 
+  /**
+   * This method executes a transaction with get on a key in a moved bucket, and expects
transaction
+   * to fail with TransactionDataRebalancedException.
+   * 
+   * @param bucketRedundancy redundancy for the colocated PRs
+   */
+  @SuppressWarnings("unchecked")
+  protected void baiscPRTXWithGetOnMovedBucket(int bucketRedundancy) {
+    dataStore1.invoke(runGetCache);
+    dataStore2.invoke(runGetCache);
+    redundancy = new Integer(bucketRedundancy);
+    localMaxmemory = new Integer(50);
+    totalNumBuckets = new Integer(1);
+
+    setAttributes(CustomerPartitionedRegionName, null);
+
+    createPRInTwoNodes();
+
+    setAttributes(OrderPartitionedRegionName, CustomerPartitionedRegionName);
+
+    createPRInTwoNodes();
+
+    // Put the customer 1 in CustomerPartitionedRegion
+    dataStore1.invoke(
+        () -> PRColocationDUnitTest.putCustomerPartitionedRegion(CustomerPartitionedRegionName,
1));
+
+    // Put the associated order in colocated OrderPartitionedRegion
+    dataStore1.invoke(
+        () -> PRColocationDUnitTest.putOrderPartitionedRegion(OrderPartitionedRegionName,
1));
+
+    DistributedMember dm1 = (DistributedMember) dataStore1.invoke(getDM());
+    DistributedMember dm2 = (DistributedMember) dataStore2.invoke(getDM());
+
+    // First get transaction.
+    TransactionId txId = (TransactionId) dataStore1.invoke(beginTx());
+    dataStore1.invoke(resumeTx(txId, dm1, dm2));
+
+    // Second one. Will go through different path (using TXState or TXStateStub)
+    txId = (TransactionId) dataStore1.invoke(beginTx());
+    dataStore1.invoke(resumeTx(txId, dm1, dm2));
+  }
+
+  @SuppressWarnings({"rawtypes", "serial"})
+  private SerializableCallable getDM() {
+    return new SerializableCallable("getDM") {
+      @Override
+      public Object call() {
+        return ((GemFireCacheImpl) basicGetCache()).getMyId();
+      }
+    };
+  }
+
+  @SuppressWarnings({"rawtypes", "serial"})
+  private SerializableCallable beginTx() {
+    return new SerializableCallable("begin tx") {
+      @Override
+      public Object call() {
+        PartitionedRegion pr = (PartitionedRegion) basicGetCache()
+            .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
+        CacheTransactionManager mgr = basicGetCache().getCacheTransactionManager();
+        CustId cust1 = new CustId(1);
+        mgr.begin();
+        Object value = pr.get(cust1);
+        assertNotNull(value);
+        return mgr.suspend();
+      }
+    };
+  }
+
+  @SuppressWarnings("serial")
+  private SerializableRunnable resumeTx(TransactionId txId, DistributedMember dm1,
+      DistributedMember dm2) {
+    return new SerializableRunnable("resume tx") {
+      @Override
+      public void run() {
+        PartitionedRegion pr = (PartitionedRegion) basicGetCache()
+            .getRegion(Region.SEPARATOR + OrderPartitionedRegionName);
+        CacheTransactionManager mgr = basicGetCache().getCacheTransactionManager();
+
+        moveBucket(dm1, dm2);
+
+        Assertions.assertThatThrownBy(() -> _resumeTx(txId, pr, mgr))
+            .isInstanceOf(TransactionDataRebalancedException.class);
+      }
+
+      private void _resumeTx(TransactionId txId, PartitionedRegion pr,
+          CacheTransactionManager mgr) {
+        CustId cust1 = new CustId(1);
+        OrderId order1 = new OrderId(11, cust1);
+        mgr.resume(txId);
+        try {
+          pr.get(order1);
+        } finally {
+          mgr.rollback();
+        }
+      }
+
+      @SuppressWarnings("unchecked")
+      private void moveBucket(DistributedMember dm1, DistributedMember dm2) {
+        PartitionedRegion pr = (PartitionedRegion) basicGetCache()
+            .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
+        CustId cust1 = new CustId(1);
+        OrderId order1 = new OrderId(11, cust1);
+        boolean isCust1Local = isCust1LocalSingleBucket(pr, cust1);
+        DistributedMember source = isCust1Local ? dm1 : dm2;
+        DistributedMember destination = isCust1Local ? dm2 : dm1;
+        PartitionedRegion prOrder = (PartitionedRegion) basicGetCache()
+            .getRegion(Region.SEPARATOR + OrderPartitionedRegionName);
+
+        LogService.getLogger().info("source ={}, destination ={}", source, destination);
+        if (isCust1Local) {
+          // Use TXState
+          setBucketReadHook(order1, source, destination, prOrder);
+        } else {
+          // Use TXStateStub -- transaction data on remote node
+          PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, order1);
+        }
+      }
+
+      private void setBucketReadHook(OrderId order1, DistributedMember source,
+          DistributedMember destination, PartitionedRegion prOrder) {
+        prOrder.getDataStore().setBucketReadHook(new Runnable() {
+          @SuppressWarnings("unchecked")
+          public void run() {
+            LogService.getLogger().info("In bucketReadHook");
+            PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, order1);
+          }
+        });
+      }
+    };
+  }
+
+  @SuppressWarnings("unchecked")
+  private boolean isCust1LocalSingleBucket(PartitionedRegion pr, CustId cust1) {
+    List<Integer> localPrimaryBucketList = pr.getLocalPrimaryBucketsListTestOnly();
+    return (Integer) localPrimaryBucketList.size() == 1;
+  }
+
+
+  private void createPRInTwoNodes() {
+    dataStore1.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects);
+    dataStore2.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects);
+  }
+
   @Test
   public void testTxWithNonColocatedGet() {
     baiscPRTXWithNonColocatedGet(0);
   }
 
   @Test
+  public void testTxWithGetOnMovedBucket() {
+    baiscPRTXWithGetOnMovedBucket(0);
+  }
+
+  @Test
   public void testPRTXInCacheListenerRedundancy0() {
     basicPRTXInCacheListener(0);
   }


Mime
View raw message