geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [19/50] [abbrv] incubator-geode git commit: GEODE-2091: Do not return false when containsValueForKey call failed in a transaction
Date Tue, 15 Nov 2016 20:49:06 GMT
GEODE-2091: Do not return false when containsValueForKey call failed in a transaction

Correctly handle exception to fail the transaction instead of returning null.
Add check for colocated buckets so that correct TrasactionException can be thrown.
Fix containsKey method call as well.
Add test cases in dunit test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/e01dbe6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/e01dbe6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/e01dbe6f

Branch: refs/heads/feature/GEODE-1930
Commit: e01dbe6f557a11aa99cd6ec4da349a38a97d678e
Parents: bd229d7
Author: eshu <eshu@pivotal.io>
Authored: Thu Nov 10 14:57:48 2016 -0800
Committer: eshu <eshu@pivotal.io>
Committed: Thu Nov 10 15:02:58 2016 -0800

----------------------------------------------------------------------
 .../geode/internal/cache/TXStateStub.java       |   4 +
 .../cache/tx/PartitionedTXRegionStub.java       |  42 +++-
 .../apache/geode/disttx/PRDistTXDUnitTest.java  |   8 +
 .../disttx/PRDistTXWithVersionsDUnitTest.java   |   8 +
 .../cache/execute/PRTransactionDUnitTest.java   | 200 ++++++++++++-------
 5 files changed, 181 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e01dbe6f/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java
index a6c78f2..5dd624b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java
@@ -135,6 +135,10 @@ public abstract class TXStateStub implements TXStateInterface {
     return stub;
   }
 
+  public Map<Region<?, ?>, TXRegionStub> getRegionStubs() {
+    return this.regionStubs;
+  }
+
 
   @Override
   public String toString() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e01dbe6f/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
index 0e9c128..10ae7a5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache.tx;
 import org.apache.geode.CancelException;
 import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.Region;
 import org.apache.geode.cache.Region.Entry;
 import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
 import org.apache.geode.cache.TransactionDataNotColocatedException;
@@ -24,6 +25,7 @@ import org.apache.geode.cache.TransactionDataRebalancedException;
 import org.apache.geode.cache.TransactionException;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.BucketNotFoundException;
+import org.apache.geode.internal.cache.ColocationHelper;
 import org.apache.geode.internal.cache.DataLocationException;
 import org.apache.geode.internal.cache.DistributedPutAllOperation;
 import org.apache.geode.internal.cache.DistributedRemoveAllOperation;
@@ -46,6 +48,7 @@ import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.offheap.annotations.Released;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -64,6 +67,9 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
     super(txstate, r);
   }
 
+  public Map<Integer, Boolean> getBuckets() {
+    return buckets;
+  }
 
   public void destroyExistingEntry(EntryEventImpl event, boolean cacheWrite,
       Object expectedOldValue) {
@@ -107,16 +113,15 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub
{
       }
       ex = ex.getCause();
     }
-    if (keyInfo != null && !buckets.isEmpty() && !buckets.containsKey(keyInfo.getBucketId()))
{
-      // for parent region if previous ops were successful and for child colocated regions
-      // where the bucketId was not previously encountered
+
+    if (isKeyInNonColocatedBucket(keyInfo)) {
       return new TransactionDataNotColocatedException(
           LocalizedStrings.PartitionedRegion_KEY_0_NOT_COLOCATED_WITH_TRANSACTION
               .toLocalizedString(keyInfo.getKey()));
     }
     ex = cause;
     while (ex != null) {
-      if (ex instanceof PrimaryBucketException) {
+      if (ex instanceof PrimaryBucketException || ex instanceof BucketNotFoundException)
{
         return new TransactionDataRebalancedException(
             LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING
                 .toLocalizedString());
@@ -126,6 +131,23 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub
{
     return new TransactionDataNodeHasDepartedException(cause.getLocalizedMessage());
   }
 
+  // is this key in a different bucket from all the existing buckets
+  // of the underlying PR or its colocated PRs touched by the transaction.
+  private boolean isKeyInNonColocatedBucket(KeyInfo keyInfo) {
+    Map<Region<?, ?>, TXRegionStub> regionStubs = this.state.getRegionStubs();
+    Collection<PartitionedRegion> colcatedRegions = (Collection<PartitionedRegion>)
ColocationHelper
+        .getAllColocationRegions((PartitionedRegion) this.region).values();
+    // get all colocated region buckets touched in the transaction
+    for (PartitionedRegion colcatedRegion : colcatedRegions) {
+      PartitionedTXRegionStub regionStub =
+          (PartitionedTXRegionStub) regionStubs.get(colcatedRegion);
+      if (regionStub != null) {
+        buckets.putAll(regionStub.getBuckets());
+      }
+    }
+    return keyInfo != null && !buckets.isEmpty() && !buckets.containsKey(keyInfo.getBucketId());
+  }
+
 
   /**
    * wait to retry after getting a ForceReattemptException
@@ -232,7 +254,9 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub
{
       throw re;
     } catch (ForceReattemptException e) {
       if (isBucketNotFoundException(e)) {
-        return false;
+        RuntimeException re = getTransactionException(keyInfo, e);
+        re.initCause(e);
+        throw re;
       }
       waitToRetry();
       RuntimeException re = new TransactionDataNodeHasDepartedException(
@@ -274,7 +298,9 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub
{
       throw re;
     } catch (ForceReattemptException e) {
       if (isBucketNotFoundException(e)) {
-        return false;
+        RuntimeException re = getTransactionException(keyInfo, e);
+        re.initCause(e);
+        throw re;
       }
       waitToRetry();
       RuntimeException re = new TransactionDataNodeHasDepartedException(
@@ -306,7 +332,9 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub
{
       throw re;
     } catch (ForceReattemptException e) {
       if (isBucketNotFoundException(e)) {
-        return null;
+        RuntimeException re = getTransactionException(keyInfo, e);
+        re.initCause(e);
+        throw re;
       }
       waitToRetry();
       RuntimeException re = getTransactionException(keyInfo, e);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e01dbe6f/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
index 150fe33..ed8d3c6 100644
--- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
@@ -43,6 +43,14 @@ public class PRDistTXDUnitTest extends PRTransactionDUnitTest {
   @Test
   public void testTxWithGetOnMovedBucket() {}
 
+  @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply
to disttx.")
+  @Test
+  public void testTxWithContainsValueForKeyOnMovedBucket() {}
+
+  @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply
to disttx.")
+  @Test
+  public void testTxWithContainsKeyOnMovedBucket() {}
+
   @Ignore("[DISTTX] TODO test overridden and intentionally left blank as they fail.")
   @Test
   public void testBasicPRTransactionRedundancy0() {}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e01dbe6f/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
index 8ff1d94..4e6f846 100644
--- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
@@ -43,6 +43,14 @@ public class PRDistTXWithVersionsDUnitTest extends PRTransactionWithVersionsDUni
   @Test
   public void testTxWithGetOnMovedBucket() {}
 
+  @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply
to disttx.")
+  @Test
+  public void testTxWithContainsValueForKeyOnMovedBucket() {}
+
+  @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply
to disttx.")
+  @Test
+  public void testTxWithContainsKeyOnMovedBucket() {}
+
   @Ignore("[DISTTX] TODO test overridden and intentionally left blank as they fail.")
   @Override
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e01dbe6f/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
index 2122960..e2ba2b3 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
@@ -336,7 +336,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
    * 
    * @param bucketRedundancy redundancy for the colocated PRs
    */
-  protected void baiscPRTXWithNonColocatedGet(int bucketRedundancy) {
+  protected void basicPRTXWithNonColocatedGet(int bucketRedundancy) {
     dataStore1.invoke(runGetCache);
     dataStore2.invoke(runGetCache);
     redundancy = new Integer(bucketRedundancy);
@@ -454,45 +454,26 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
   }
 
   /**
-   * This method executes a transaction with get on a key in a moved bucket, and expects
transaction
-   * to fail with TransactionDataRebalancedException.
+   * This method executes a transaction with operation on a key in a moved bucket, and expects
+   * transaction to fail with TransactionDataRebalancedException.
    * 
+   * @param op which entry op to be executed
    * @param bucketRedundancy redundancy for the colocated PRs
    */
   @SuppressWarnings("unchecked")
-  protected void baiscPRTXWithGetOnMovedBucket(int bucketRedundancy) {
-    dataStore1.invoke(runGetCache);
-    dataStore2.invoke(runGetCache);
-    redundancy = new Integer(bucketRedundancy);
-    localMaxmemory = new Integer(50);
-    totalNumBuckets = new Integer(1);
-
-    setAttributes(CustomerPartitionedRegionName, null);
-
-    createPRInTwoNodes();
-
-    setAttributes(OrderPartitionedRegionName, CustomerPartitionedRegionName);
-
-    createPRInTwoNodes();
-
-    // Put the customer 1 in CustomerPartitionedRegion
-    dataStore1.invoke(
-        () -> PRColocationDUnitTest.putCustomerPartitionedRegion(CustomerPartitionedRegionName,
1));
-
-    // Put the associated order in colocated OrderPartitionedRegion
-    dataStore1.invoke(
-        () -> PRColocationDUnitTest.putOrderPartitionedRegion(OrderPartitionedRegionName,
1));
+  protected void basicPRTXWithOpOnMovedBucket(Op op, int bucketRedundancy) {
+    setupMoveBucket(bucketRedundancy);
 
     DistributedMember dm1 = (DistributedMember) dataStore1.invoke(getDM());
     DistributedMember dm2 = (DistributedMember) dataStore2.invoke(getDM());
 
     // First get transaction.
     TransactionId txId = (TransactionId) dataStore1.invoke(beginTx());
-    dataStore1.invoke(resumeTx(txId, dm1, dm2));
+    dataStore1.invoke(resumeTx(op, txId, dm1, dm2));
 
     // Second one. Will go through different path (using TXState or TXStateStub)
     txId = (TransactionId) dataStore1.invoke(beginTx());
-    dataStore1.invoke(resumeTx(txId, dm1, dm2));
+    dataStore1.invoke(resumeTx(op, txId, dm1, dm2));
   }
 
   @SuppressWarnings({"rawtypes", "serial"})
@@ -523,7 +504,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
   }
 
   @SuppressWarnings("serial")
-  private SerializableRunnable resumeTx(TransactionId txId, DistributedMember dm1,
+  private SerializableRunnable resumeTx(Op op, TransactionId txId, DistributedMember dm1,
       DistributedMember dm2) {
     return new SerializableRunnable("resume tx") {
       @Override
@@ -532,57 +513,96 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
             .getRegion(Region.SEPARATOR + OrderPartitionedRegionName);
         CacheTransactionManager mgr = basicGetCache().getCacheTransactionManager();
 
-        moveBucket(dm1, dm2);
+        moveBucket(op, dm1, dm2);
 
-        Assertions.assertThatThrownBy(() -> _resumeTx(txId, pr, mgr))
+        Assertions.assertThatThrownBy(() -> _resumeTx(op, txId, pr, mgr))
             .isInstanceOf(TransactionDataRebalancedException.class);
       }
+    };
+  }
 
-      private void _resumeTx(TransactionId txId, PartitionedRegion pr,
-          CacheTransactionManager mgr) {
-        CustId cust1 = new CustId(1);
-        OrderId order1 = new OrderId(11, cust1);
-        mgr.resume(txId);
-        try {
+  enum Op {
+    GET, CONTAINSVALUEFORKEY, CONTAINSKEY;
+  }
+
+  private void _resumeTx(Op op, TransactionId txId, PartitionedRegion pr,
+      CacheTransactionManager mgr) {
+    CustId cust1 = new CustId(1);
+    OrderId order1 = new OrderId(11, cust1);
+    mgr.resume(txId);
+    try {
+      switch (op) {
+        case GET:
           pr.get(order1);
-        } finally {
-          mgr.rollback();
-        }
+          break;
+        case CONTAINSVALUEFORKEY:
+          pr.containsValueForKey(order1);
+          break;
+        case CONTAINSKEY:
+          pr.containsKey(order1);
+          break;
+        default:
+          throw new AssertionError("Unknown operations " + op);
       }
 
-      @SuppressWarnings("unchecked")
-      private void moveBucket(DistributedMember dm1, DistributedMember dm2) {
-        PartitionedRegion pr = (PartitionedRegion) basicGetCache()
-            .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
-        CustId cust1 = new CustId(1);
-        OrderId order1 = new OrderId(11, cust1);
-        boolean isCust1Local = isCust1LocalSingleBucket(pr, cust1);
-        DistributedMember source = isCust1Local ? dm1 : dm2;
-        DistributedMember destination = isCust1Local ? dm2 : dm1;
-        PartitionedRegion prOrder = (PartitionedRegion) basicGetCache()
-            .getRegion(Region.SEPARATOR + OrderPartitionedRegionName);
+    } finally {
+      mgr.rollback();
+    }
+  }
 
-        LogService.getLogger().info("source ={}, destination ={}", source, destination);
-        if (isCust1Local) {
-          // Use TXState
-          setBucketReadHook(order1, source, destination, prOrder);
-        } else {
-          // Use TXStateStub -- transaction data on remote node
-          PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, order1);
-        }
-      }
+  @SuppressWarnings("unchecked")
+  private void moveBucket(Op op, DistributedMember dm1, DistributedMember dm2) {
+    PartitionedRegion pr = (PartitionedRegion) basicGetCache()
+        .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
+    CustId cust1 = new CustId(1);
+    OrderId order1 = new OrderId(11, cust1);
+    boolean isCust1Local = isCust1LocalSingleBucket(pr, cust1);
+    DistributedMember source = isCust1Local ? dm1 : dm2;
+    DistributedMember destination = isCust1Local ? dm2 : dm1;
+    PartitionedRegion prOrder = (PartitionedRegion) basicGetCache()
+        .getRegion(Region.SEPARATOR + OrderPartitionedRegionName);
+
+    LogService.getLogger().info("source ={}, destination ={}", source, destination);
+
+    switch (op) {
+      case GET:
+        moveBucketForGet(order1, isCust1Local, source, destination, prOrder);
+        break;
+      case CONTAINSVALUEFORKEY:
+      case CONTAINSKEY:
+        PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, order1);
+        break;
+      default:
+        throw new AssertionError("Unknown operations " + op);
+    }
+  }
 
-      private void setBucketReadHook(OrderId order1, DistributedMember source,
-          DistributedMember destination, PartitionedRegion prOrder) {
-        prOrder.getDataStore().setBucketReadHook(new Runnable() {
-          @SuppressWarnings("unchecked")
-          public void run() {
-            LogService.getLogger().info("In bucketReadHook");
-            PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, order1);
-          }
-        });
+  @SuppressWarnings("unchecked")
+  private void moveBucketForGet(OrderId order1, boolean isCust1Local, DistributedMember source,
+      DistributedMember destination, PartitionedRegion prOrder) {
+    if (isCust1Local) {
+      // Use TXState
+      setBucketReadHook(order1, source, destination, prOrder);
+    } else {
+      // Use TXStateStub -- transaction data on remote node
+      PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, order1);
+    }
+  }
+
+  private void setBucketReadHook(OrderId order1, DistributedMember source,
+      DistributedMember destination, PartitionedRegion prOrder) {
+    prOrder.getDataStore().setBucketReadHook(new Runnable() {
+      @SuppressWarnings("unchecked")
+      public void run() {
+        LogService.getLogger().info("In bucketReadHook");
+        PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, order1);
       }
-    };
+    });
+  }
+
+  private void createPRInTwoNodes() {
+    dataStore1.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects);
+    dataStore2.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects);
   }
 
   @SuppressWarnings("unchecked")
@@ -591,20 +611,52 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
     return (Integer) localPrimaryBucketList.size() == 1;
   }
 
+  @SuppressWarnings("unchecked")
+  private void setupMoveBucket(int bucketRedundancy) {
+    dataStore1.invoke(runGetCache);
+    dataStore2.invoke(runGetCache);
+    redundancy = new Integer(bucketRedundancy);
+    localMaxmemory = new Integer(50);
+    totalNumBuckets = new Integer(1);
 
-  private void createPRInTwoNodes() {
-    dataStore1.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects);
-    dataStore2.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects);
+    setAttributes(CustomerPartitionedRegionName, null);
+
+    createPRInTwoNodes();
+
+    setAttributes(OrderPartitionedRegionName, CustomerPartitionedRegionName);
+
+    createPRInTwoNodes();
+
+    // Put the customer 1 in CustomerPartitionedRegion
+    dataStore1.invoke(
+        () -> PRColocationDUnitTest.putCustomerPartitionedRegion(CustomerPartitionedRegionName,
1));
+
+    // Put the associated order in colocated OrderPartitionedRegion
+    dataStore1.invoke(
+        () -> PRColocationDUnitTest.putOrderPartitionedRegion(OrderPartitionedRegionName,
1));
   }
 
   @Test
   public void testTxWithNonColocatedGet() {
-    baiscPRTXWithNonColocatedGet(0);
+    basicPRTXWithNonColocatedGet(0);
   }
 
   @Test
   public void testTxWithGetOnMovedBucket() {
-    baiscPRTXWithGetOnMovedBucket(0);
+    Op op = Op.GET;
+    basicPRTXWithOpOnMovedBucket(op, 0);
+  }
+
+  @Test
+  public void testTxWithContainsValueForKeyOnMovedBucket() {
+    Op op = Op.CONTAINSVALUEFORKEY;
+    basicPRTXWithOpOnMovedBucket(op, 0);
+  }
+
+  @Test
+  public void testTxWithContainsKeyOnMovedBucket() {
+    Op op = Op.CONTAINSKEY;
+    basicPRTXWithOpOnMovedBucket(op, 0);
   }
 
   @Test


Mime
View raw message