geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [02/14] incubator-geode git commit: GEODE-249: Retry on local node does not overwrite previously retrieved results
Date Fri, 18 Mar 2016 21:43:27 GMT
GEODE-249: Retry on local node does not overwrite previously retrieved results

If a query on a remote node fails and buckets are determined to have moved to
the local node, a retry of the query is executed on the local node.  However
the code currently will overwrite any results the local node had already
correctly retrieved.

Added new junit test for PartitionedRegionQueryEvaluator and renamed existing
test to PartitionedRegionQueryEvaluatorIntegrationTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/aab357f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/aab357f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/aab357f6

Branch: refs/heads/feature/GEODE-1050
Commit: aab357f6925f88324987e2246879faeb33ea7962
Parents: 54931a5
Author: Jason Huynh <huynhja@gmail.com>
Authored: Fri Mar 11 15:27:33 2016 -0800
Committer: Jason Huynh <huynhja@gmail.com>
Committed: Fri Mar 18 13:35:03 2016 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/AbstractRegion.java  |   2 +-
 .../cache/PartitionedRegionDataStore.java       |  13 +-
 .../cache/PartitionedRegionQueryEvaluator.java  |  40 +-
 ...onedRegionQueryEvaluatorIntegrationTest.java | 307 +++++++++++++++
 ...artitionedRegionQueryEvaluatorJUnitTest.java | 307 ---------------
 .../PartitionedRegionQueryEvaluatorTest.java    | 373 +++++++++++++++++++
 6 files changed, 725 insertions(+), 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/aab357f6/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
index 4711f80..9a55195 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
@@ -1969,7 +1969,7 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
     return this.serialNumber;
   }
 
-  public final GemFireCacheImpl getCache() {
+  public /*final*/ GemFireCacheImpl getCache() {
     return this.cache;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/aab357f6/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
index 5f28c93..010812d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
@@ -183,6 +183,17 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
   private static final boolean UPDATE_ACCESS_TIME_ON_INTEREST = Boolean
       .getBoolean("gemfire.updateAccessTimeOnClientInterest");
 
+  
+  //Only for testing
+  PartitionedRegionDataStore() {
+    this.bucketCreationLock = null;
+    bucketStats = null;
+    partitionedRegion = null;
+    maximumLocalBytes = -1;
+    this.localBucket2RegionMap = new ConcurrentHashMap<Integer, BucketRegion>();
+    keysOfInterest = null;
+  }
+  
   /**
    * Creates PartitionedRegionDataStore for dataStorage of PR and starts a
    * PartitionService to handle remote operations on this DataStore from other
@@ -261,7 +272,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
    *          the id of the bucket
    * @return true if the provided bucket is being managed
    */
-  public final boolean isManagingBucket(int bucketId)
+  public boolean isManagingBucket(int bucketId)
   {
     BucketRegion buk = this.localBucket2RegionMap.get(Integer.valueOf(bucketId));    
     if (buk != null && !buk.isDestroyed()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/aab357f6/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
index 1d6cf0e..9dfa94c 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
@@ -322,13 +322,13 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
     } else {
         // send separate message to each recipient since each one has a
         // different list of bucket ids
-        processor = new StreamingQueryPartitionResponse(this.sys, n2b.keySet());
+        processor = createStreamingQueryPartitionResponse(this.sys, n2b);
         for (Iterator<Map.Entry<InternalDistributedMember,List<Integer>>>
itr = n2b.entrySet().iterator(); itr.hasNext();) {
           Map.Entry<InternalDistributedMember , List<Integer>> me =  itr.next();
           final InternalDistributedMember rcp =  me.getKey();
           final List<Integer> bucketIds =  me.getValue();
           DistributionMessage m = createRequestMessage(rcp, processor, bucketIds);
-          Set notReceivedMembers = this.sys.getDistributionManager().putOutgoing(m);
+          Set notReceivedMembers = sendMessage(m);
           if (th != null) {
             th.hook(4);
           }
@@ -425,6 +425,14 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
     return requiresRetry | localNeedsRetry;
   }
 
+  protected Set sendMessage(DistributionMessage m) {
+    return this.sys.getDistributionManager().putOutgoing(m);
+  }
+
+  protected StreamingQueryPartitionResponse createStreamingQueryPartitionResponse(InternalDistributedSystem
system,HashMap<InternalDistributedMember, List<Integer>> n2b) {
+    return new StreamingQueryPartitionResponse(system, n2b.keySet());
+  }
+
   
   /**
    * Executes a query over the provided buckets in a <code>PartitionedRegion</code>.
@@ -820,7 +828,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
    * @return the node-to-bucket map
    */
   
-  // (package access, and returns map for unit test purposes)
+  // (package access for unit test purposes)
   Map<InternalDistributedMember, List<Integer>> buildNodeToBucketMap() throws
QueryException
   {
     return buildNodeToBucketMapForBuckets(this.bucketsToQuery);
@@ -858,7 +866,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
       }
     }
     
-    final List allNodes = new ArrayList(this.pr.getRegionAdvisor().adviseDataStore());
+    final List allNodes = getAllNodes();
     /*
     for(Map.Entry<InternalDistributedMember, Collection<Collection>> entry :
resultsPerMember.entrySet()) {
       InternalDistributedMember member = entry.getKey();
@@ -880,7 +888,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
       final List<Integer> buckets = new ArrayList<Integer>();
       for (Integer bid : bucketIdsToConsider) {
         if (!bucketIds.contains(bid)) {
-          final Set owners = pr.getRegionAdvisor().getBucketOwners(bid.intValue());
+          final Set owners = getBucketOwners(bid);
           if (owners.contains(nd)) {
             buckets.add(bid);
             bucketIds.add(bid);
@@ -904,6 +912,14 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
     return ret;
   }
 
+  protected Set<InternalDistributedMember> getBucketOwners(Integer bid) {
+    return pr.getRegionAdvisor().getBucketOwners(bid.intValue());
+  }
+
+  protected ArrayList getAllNodes() {
+    return new ArrayList(this.pr.getRegionAdvisor().adviseDataStore());
+  }
+
   /**
    * Executes query on local data store. 
    * 
@@ -926,7 +942,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
 
       List<Integer> bucketList = this.node2bucketIds.get(me);
       try {
-        PRQueryProcessor qp = new PRQueryProcessor(this.pr, query, parameters, bucketList);
+        PRQueryProcessor qp = createLocalPRQueryProcessor(bucketList);
         MemberResultsList resultCollector = new MemberResultsList();
         
         // Execute Query.
@@ -975,8 +991,12 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
         }
         
         resultCollector.setLastChunkReceived(true);
-        // Add results to the results-list.
-        this.resultsPerMember.put(me, resultCollector);
+        // Add results to the results-list.  If prior successfully completed
+        //results exist from previous executions on different buckets, add (to) those results
as well.
+        MemberResultsList otherResults = (MemberResultsList)this.resultsPerMember.put(me,
resultCollector);
+        if (otherResults != null) {
+          resultCollector.addAll(otherResults);
+        } 
         
       } catch (ForceReattemptException retryRequired) {
         if (logger.isDebugEnabled()) {
@@ -988,6 +1008,10 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
     return false;
   }
 
+  protected PRQueryProcessor createLocalPRQueryProcessor(List<Integer> bucketList)
{
+    return new PRQueryProcessor(this.pr, query, parameters, bucketList);
+  }
+
   protected void memberStreamCorrupted(InternalDistributedMember sender) {
     this.resultsPerMember.remove(sender);
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/aab357f6/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluatorIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluatorIntegrationTest.java
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluatorIntegrationTest.java
new file mode 100644
index 0000000..8f9b392
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluatorIntegrationTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.BucketAdvisor.BucketProfile;
+import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor;
+import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.PartitionProfile;
+import com.gemstone.gemfire.internal.util.VersionedArrayList;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+/**
+ * This class is an integration test for <code>PartitionedRegionQueryEvaluator</code>
class.
+ * @author rreja 
+ */
+@Category(IntegrationTest.class)
+public class PartitionedRegionQueryEvaluatorIntegrationTest
+{
+  @Rule public TestName name = new TestName();
+  LogWriter logger = null;
+
+  @Before
+  public void setUp() throws Exception
+  {
+    if (logger == null) {
+      logger = PartitionedRegionTestHelper.getLogger();
+    }
+  }
+
+  /**
+   * Test for the helper method getNodeToBucketMap.
+   * 
+   */
+  @Test
+  public void testGetNodeToBucketMap()
+  {
+    int totalNodes = 100;
+    String prPrefix = name.getMethodName();
+    String localMaxMemory = "0";
+    final int redundancy = 1;
+    final int totalNoOfBuckets = 5;
+    PartitionedRegion pr = (PartitionedRegion)PartitionedRegionTestHelper
+        .createPartitionedRegion(prPrefix, localMaxMemory, redundancy);
+
+    HashSet<Integer> bucketsToQuery = new HashSet<Integer>();
+    for (int i = 0; i < totalNoOfBuckets; i++) {
+      bucketsToQuery.add(i);
+    }
+    final String expectedUnknownHostException = UnknownHostException.class
+    .getName();
+    pr.getCache().getLogger().info(
+    "<ExpectedException action=add>" + expectedUnknownHostException
+        + "</ExpectedException>");
+    final ArrayList nodes = createNodeList(totalNodes);    
+    pr.getCache().getLogger().info(
+        "<ExpectedException action=remove>" + expectedUnknownHostException
+            + "</ExpectedException>");
+    // populating bucket2Node of the partition region
+      // ArrayList<InternalDistributedMember>
+    final ArrayList dses = createDataStoreList(totalNodes);
+    populateBucket2Node(pr, dses, totalNoOfBuckets);  
+  
+    populateAllPartitionedRegion(pr, nodes);
+
+    // running the algorithm and getting the list of bucktes to grab
+    PartitionedRegionQueryEvaluator evalr = new PartitionedRegionQueryEvaluator(pr.getSystem(),
pr, null, null, null, bucketsToQuery);
+    Map n2bMap = null;
+    try {
+      n2bMap = evalr.buildNodeToBucketMap();
+    } catch (Exception ex) {
+      
+    }
+    ArrayList buckList = new ArrayList();
+    for (Iterator itr = n2bMap.entrySet().iterator(); itr.hasNext();) {
+      Map.Entry entry = (Map.Entry)itr.next();
+      if (entry.getValue() != null)
+        buckList.addAll((List)entry.getValue());
+    }
+    // checking size of the two lists
+    assertEquals("Unexpected number of buckets", totalNoOfBuckets, buckList.size());
+    for (int i = 0; i < totalNoOfBuckets; i++) {
+      assertTrue(" Bucket with Id = " + i + " not present in bucketList.",
+          buckList.contains(new Integer(i)));
+    }
+    clearAllPartitionedRegion(pr);
+    logger.info("************test ended successfully **********");
+    
+  }
+
+  /**
+   * This function populates bucket2Node region of the partition region
+   * 
+   * @param pr
+   */
+  public void populateBucket2Node(PartitionedRegion pr, List nodes,
+      int numOfBuckets)
+  {
+    assertEquals(0, pr.getRegionAdvisor().getCreatedBucketsCount());
+    final RegionAdvisor ra = pr.getRegionAdvisor();
+    int nodeListCnt = 0;
+    Random ran = new Random();
+    HashMap verMap = new HashMap(); // Map tracking version for profile insertion purposes
+    for (int i = 0; i < numOfBuckets; i++) {
+      nodeListCnt = setNodeListCnt(nodeListCnt);  
+      for (int j = 0; j < nodeListCnt; j++) {
+        BucketProfile bp = new BucketProfile();
+        bp.peerMemberId = (InternalDistributedMember) 
+            nodes.get(ran.nextInt(nodes.size()));
+        Integer v;
+        if ((v = (Integer) verMap.get(bp.getDistributedMember())) != null) {
+          bp.version = v.intValue() + 1;
+          verMap.put(bp.getDistributedMember(), new Integer(bp.version));
+        } else {
+          verMap.put(bp.getDistributedMember(), new Integer(0));
+          bp.version = 0;
+        }
+
+        bp.isHosting = true;
+        if (j == 0) {
+          bp.isPrimary = true;
+        }
+        bp.scope = Scope.DISTRIBUTED_ACK;
+        boolean forceBadProfile = true;
+        assertTrue(ra.getBucket(i).getBucketAdvisor().putProfile(bp, forceBadProfile));
+      }
+    }
+  }
+  
+  private void clearAllPartitionedRegion(PartitionedRegion pr) {
+    Cache cache = pr.getCache();
+    Region allPR = PartitionedRegionHelper.getPRRoot(cache);
+    allPR.clear();    
+  }
+  
+  /**
+   * This function decides number of the nodes in the list of bucket2Node region
+   * 
+   * @param i
+   * @return
+   */
+  private int setNodeListCnt(int i)
+  {
+    int nListcnt = 0;
+    switch (i) {
+    case 0:
+      nListcnt = 1;
+      break;
+    case 1:
+      nListcnt = 4;
+      break;
+    case 2:
+      nListcnt = 1;
+      break;
+    case 3:
+      nListcnt = 2;
+      break;
+    case 4:
+      nListcnt = 1;
+      break;
+    case 5:
+      nListcnt = 3;
+      break;
+    case 6:
+      nListcnt = 3;
+      break;
+    case 7:
+      nListcnt = 1;
+      break;
+    case 8:
+      nListcnt = 1;
+      break;
+    case 9:
+      nListcnt = 2;
+      break;
+
+    }
+    return nListcnt;
+  }
+
+  /**
+   * This functions number of new nodes specified by nCount.
+   * 
+   * @param nCount
+   * @return
+   */
+  private ArrayList createNodeList(int nCount)
+  {
+    ArrayList nodeList = new ArrayList(nCount);
+    for (int i = 0; i < nCount; i++) {
+      nodeList.add(createNode(i));
+    }
+    return nodeList;
+  }
+
+  private ArrayList createDataStoreList(int nCount)
+  {
+    // ArrayList<InternalDistributedMember>
+    ArrayList nodeList = new ArrayList(nCount);
+    for (int i = 0; i < nCount; i++) {
+      nodeList.add(createDataStoreMember(i));
+    }
+    return nodeList;
+  }
+
+  private VersionedArrayList getVersionedNodeList(int nCount, List<Node> nodes)
+  {
+    VersionedArrayList nodeList = new VersionedArrayList(nCount);
+    Random ran = new Random();
+    for (int i = 0; i < nCount; i++) {
+      nodeList.add(nodes.get(ran.nextInt(nodes.size())));
+    }
+    return nodeList;
+  }
+
+  private InternalDistributedMember createDataStoreMember(int i)
+  {
+    String hostname = null;
+    InternalDistributedMember mem = null;
+    try {
+      mem = new InternalDistributedMember("host" + i, 3033);
+    }
+    catch (java.net.UnknownHostException uhe) {
+      logger.severe("PartitionedRegion: initalizeNode() Unknown host = "
+          + hostname + " servicePort = " + 0, uhe);
+      throw new PartitionedRegionException(
+          "PartitionedRegionDataStore: initalizeNode() Unknown host = "
+              + hostname + " servicePort = " + 0, uhe);
+    }
+    return mem;
+  }
+
+  /**
+   * this function creates new node.
+   * 
+   * @return
+   */
+  public Node createNode(int i)
+  {
+    Node node = null;
+    try {
+      node = new Node(new InternalDistributedMember("host" + i, 3033), i);
+      node.setPRType(Node.DATASTORE);
+    }
+    catch (java.net.UnknownHostException uhe) {
+      logger.severe("PartitionedRegion: initalizeNode() threw exception", uhe);
+      throw new PartitionedRegionException("", uhe);
+    }
+    return node;
+  }
+
+  private void populateAllPartitionedRegion(PartitionedRegion pr, List nodes)
+  {
+    // int totalNodes = 4;
+    Region rootReg = PartitionedRegionHelper.getPRRoot(pr.getCache());
+//    Region allPRs = PartitionedRegionHelper.getPRConfigRegion(rootReg, pr
+//        .getCache());
+    PartitionRegionConfig prConf = new PartitionRegionConfig(pr.getPRId(), pr
+        .getFullPath(), pr.getPartitionAttributes(), pr.getScope());
+    RegionAdvisor ra = pr.getRegionAdvisor();
+    for (Iterator itr = nodes.iterator(); itr.hasNext();) {
+      Node n = (Node)itr.next();
+      prConf.addNode(n);
+      PartitionProfile pp = (PartitionProfile) ra.createProfile();
+      pp.peerMemberId = n.getMemberId();
+      pp.isDataStore = true;
+      final boolean forceFakeProfile = true;
+      pr.getRegionAdvisor().putProfile(pp, forceFakeProfile);
+    }
+    rootReg.put(pr.getRegionIdentifier(), prConf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/aab357f6/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluatorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluatorJUnitTest.java
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluatorJUnitTest.java
deleted file mode 100644
index 2d30073..0000000
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluatorJUnitTest.java
+++ /dev/null
@@ -1,307 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
-import com.gemstone.gemfire.LogWriter;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.internal.cache.BucketAdvisor.BucketProfile;
-import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor;
-import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.PartitionProfile;
-import com.gemstone.gemfire.internal.util.VersionedArrayList;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-
-/**
- * This class is junit test for <code>PartitionedRegionQueryEvaluator</code>
class.
- * @author rreja 
- */
-@Category(IntegrationTest.class)
-public class PartitionedRegionQueryEvaluatorJUnitTest
-{
-  @Rule public TestName name = new TestName();
-  LogWriter logger = null;
-
-  @Before
-  public void setUp() throws Exception
-  {
-    if (logger == null) {
-      logger = PartitionedRegionTestHelper.getLogger();
-    }
-  }
-
-  /**
-   * Test for the helper method getNodeToBucketMap.
-   * 
-   */
-  @Test
-  public void testGetNodeToBucketMap()
-  {
-    int totalNodes = 100;
-    String prPrefix = name.getMethodName();
-    String localMaxMemory = "0";
-    final int redundancy = 1;
-    final int totalNoOfBuckets = 5;
-    PartitionedRegion pr = (PartitionedRegion)PartitionedRegionTestHelper
-        .createPartitionedRegion(prPrefix, localMaxMemory, redundancy);
-
-    HashSet<Integer> bucketsToQuery = new HashSet<Integer>();
-    for (int i = 0; i < totalNoOfBuckets; i++) {
-      bucketsToQuery.add(i);
-    }
-    final String expectedUnknownHostException = UnknownHostException.class
-    .getName();
-    pr.getCache().getLogger().info(
-    "<ExpectedException action=add>" + expectedUnknownHostException
-        + "</ExpectedException>");
-    final ArrayList nodes = createNodeList(totalNodes);    
-    pr.getCache().getLogger().info(
-        "<ExpectedException action=remove>" + expectedUnknownHostException
-            + "</ExpectedException>");
-    // populating bucket2Node of the partition region
-      // ArrayList<InternalDistributedMember>
-    final ArrayList dses = createDataStoreList(totalNodes);
-    populateBucket2Node(pr, dses, totalNoOfBuckets);  
-  
-    populateAllPartitionedRegion(pr, nodes);
-
-    // running the algorithm and getting the list of bucktes to grab
-    PartitionedRegionQueryEvaluator evalr = new PartitionedRegionQueryEvaluator(pr.getSystem(),
pr, null, null, null, bucketsToQuery);
-    Map n2bMap = null;
-    try {
-      n2bMap = evalr.buildNodeToBucketMap();
-    } catch (Exception ex) {
-      
-    }
-    ArrayList buckList = new ArrayList();
-    for (Iterator itr = n2bMap.entrySet().iterator(); itr.hasNext();) {
-      Map.Entry entry = (Map.Entry)itr.next();
-      if (entry.getValue() != null)
-        buckList.addAll((List)entry.getValue());
-    }
-    // checking size of the two lists
-    assertEquals("Unexpected number of buckets", totalNoOfBuckets, buckList.size());
-    for (int i = 0; i < totalNoOfBuckets; i++) {
-      assertTrue(" Bucket with Id = " + i + " not present in bucketList.",
-          buckList.contains(new Integer(i)));
-    }
-    clearAllPartitionedRegion(pr);
-    logger.info("************test ended successfully **********");
-    
-  }
-
-  /**
-   * This function populates bucket2Node region of the partition region
-   * 
-   * @param pr
-   */
-  public void populateBucket2Node(PartitionedRegion pr, List nodes,
-      int numOfBuckets)
-  {
-    assertEquals(0, pr.getRegionAdvisor().getCreatedBucketsCount());
-    final RegionAdvisor ra = pr.getRegionAdvisor();
-    int nodeListCnt = 0;
-    Random ran = new Random();
-    HashMap verMap = new HashMap(); // Map tracking version for profile insertion purposes
-    for (int i = 0; i < numOfBuckets; i++) {
-      nodeListCnt = setNodeListCnt(nodeListCnt);  
-      for (int j = 0; j < nodeListCnt; j++) {
-        BucketProfile bp = new BucketProfile();
-        bp.peerMemberId = (InternalDistributedMember) 
-            nodes.get(ran.nextInt(nodes.size()));
-        Integer v;
-        if ((v = (Integer) verMap.get(bp.getDistributedMember())) != null) {
-          bp.version = v.intValue() + 1;
-          verMap.put(bp.getDistributedMember(), new Integer(bp.version));
-        } else {
-          verMap.put(bp.getDistributedMember(), new Integer(0));
-          bp.version = 0;
-        }
-
-        bp.isHosting = true;
-        if (j == 0) {
-          bp.isPrimary = true;
-        }
-        bp.scope = Scope.DISTRIBUTED_ACK;
-        boolean forceBadProfile = true;
-        assertTrue(ra.getBucket(i).getBucketAdvisor().putProfile(bp, forceBadProfile));
-      }
-    }
-  }
-  
-  private void clearAllPartitionedRegion(PartitionedRegion pr) {
-    Cache cache = pr.getCache();
-    Region allPR = PartitionedRegionHelper.getPRRoot(cache);
-    allPR.clear();    
-  }
-  
-  /**
-   * This function decides number of the nodes in the list of bucket2Node region
-   * 
-   * @param i
-   * @return
-   */
-  private int setNodeListCnt(int i)
-  {
-    int nListcnt = 0;
-    switch (i) {
-    case 0:
-      nListcnt = 1;
-      break;
-    case 1:
-      nListcnt = 4;
-      break;
-    case 2:
-      nListcnt = 1;
-      break;
-    case 3:
-      nListcnt = 2;
-      break;
-    case 4:
-      nListcnt = 1;
-      break;
-    case 5:
-      nListcnt = 3;
-      break;
-    case 6:
-      nListcnt = 3;
-      break;
-    case 7:
-      nListcnt = 1;
-      break;
-    case 8:
-      nListcnt = 1;
-      break;
-    case 9:
-      nListcnt = 2;
-      break;
-
-    }
-    return nListcnt;
-  }
-
-  /**
-   * This functions number of new nodes specified by nCount.
-   * 
-   * @param nCount
-   * @return
-   */
-  private ArrayList createNodeList(int nCount)
-  {
-    ArrayList nodeList = new ArrayList(nCount);
-    for (int i = 0; i < nCount; i++) {
-      nodeList.add(createNode(i));
-    }
-    return nodeList;
-  }
-
-  private ArrayList createDataStoreList(int nCount)
-  {
-    // ArrayList<InternalDistributedMember>
-    ArrayList nodeList = new ArrayList(nCount);
-    for (int i = 0; i < nCount; i++) {
-      nodeList.add(createDataStoreMember(i));
-    }
-    return nodeList;
-  }
-
-  private VersionedArrayList getVersionedNodeList(int nCount, List<Node> nodes)
-  {
-    VersionedArrayList nodeList = new VersionedArrayList(nCount);
-    Random ran = new Random();
-    for (int i = 0; i < nCount; i++) {
-      nodeList.add(nodes.get(ran.nextInt(nodes.size())));
-    }
-    return nodeList;
-  }
-
-  private InternalDistributedMember createDataStoreMember(int i)
-  {
-    String hostname = null;
-    InternalDistributedMember mem = null;
-    try {
-      mem = new InternalDistributedMember("host" + i, 3033);
-    }
-    catch (java.net.UnknownHostException uhe) {
-      logger.severe("PartitionedRegion: initalizeNode() Unknown host = "
-          + hostname + " servicePort = " + 0, uhe);
-      throw new PartitionedRegionException(
-          "PartitionedRegionDataStore: initalizeNode() Unknown host = "
-              + hostname + " servicePort = " + 0, uhe);
-    }
-    return mem;
-  }
-
-  /**
-   * this function creates new node.
-   * 
-   * @return
-   */
-  public Node createNode(int i)
-  {
-    Node node = null;
-    try {
-      node = new Node(new InternalDistributedMember("host" + i, 3033), i);
-      node.setPRType(Node.DATASTORE);
-    }
-    catch (java.net.UnknownHostException uhe) {
-      logger.severe("PartitionedRegion: initalizeNode() threw exception", uhe);
-      throw new PartitionedRegionException("", uhe);
-    }
-    return node;
-  }
-
-  private void populateAllPartitionedRegion(PartitionedRegion pr, List nodes)
-  {
-    // int totalNodes = 4;
-    Region rootReg = PartitionedRegionHelper.getPRRoot(pr.getCache());
-//    Region allPRs = PartitionedRegionHelper.getPRConfigRegion(rootReg, pr
-//        .getCache());
-    PartitionRegionConfig prConf = new PartitionRegionConfig(pr.getPRId(), pr
-        .getFullPath(), pr.getPartitionAttributes(), pr.getScope());
-    RegionAdvisor ra = pr.getRegionAdvisor();
-    for (Iterator itr = nodes.iterator(); itr.hasNext();) {
-      Node n = (Node)itr.next();
-      prConf.addNode(n);
-      PartitionProfile pp = (PartitionProfile) ra.createProfile();
-      pp.peerMemberId = n.getMemberId();
-      pp.isDataStore = true;
-      final boolean forceFakeProfile = true;
-      pr.getRegionAdvisor().putProfile(pp, forceFakeProfile);
-    }
-    rootReg.put(pr.getRegionIdentifier(), prConf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/aab357f6/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluatorTest.java
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluatorTest.java
new file mode 100644
index 0000000..ac9be94
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluatorTest.java
@@ -0,0 +1,373 @@
+package com.gemstone.gemfire.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.query.QueryException;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.internal.CompiledSelect;
+import com.gemstone.gemfire.cache.query.internal.CompiledValue;
+import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
+import com.gemstone.gemfire.cache.query.internal.LinkedResultSet;
+import com.gemstone.gemfire.cache.query.internal.types.ObjectTypeImpl;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.test.fake.Fakes;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class PartitionedRegionQueryEvaluatorTest {
+
+  InternalDistributedMember localNode;
+  InternalDistributedMember remoteNodeA;
+  InternalDistributedMember remoteNodeB;
+  GemFireCacheImpl cache;
+  InternalDistributedSystem system;
+  PartitionedRegion pr;
+  DefaultQuery query;
+  //Needed to help mock out certain scenarios
+  ExtendedPartitionedRegionDataStore dataStore;
+  //This is the set of nodes that remain after a failure
+  ArrayList allNodes = new ArrayList();
+  //convenience list for empty set
+  Set<InternalDistributedMember> noFailingMembers = new HashSet<>();
+    
+  @Before
+  public void setup() throws Exception {
+    localNode = new InternalDistributedMember("localhost",8888);
+    remoteNodeA = new InternalDistributedMember("localhost",8889);
+    remoteNodeB = new InternalDistributedMember("localhost",8890);
+    cache = Fakes.cache();
+    system = (InternalDistributedSystem) cache.getDistributedSystem();
+    
+    allNodes.add(localNode);
+    allNodes.add(remoteNodeA);
+    allNodes.add(remoteNodeB);
+    
+    pr = mock(PartitionedRegion.class);
+    dataStore = new ExtendedPartitionedRegionDataStore();
+    CompiledSelect select = mock(CompiledSelect.class);
+    when(select.getType()).thenReturn(CompiledValue.COMPARISON);
+    when(select.getElementTypeForOrderByQueries()).thenReturn(new ObjectTypeImpl(String.class));
+    query = mock(DefaultQuery.class);
+    when(query.getSimpleSelect()).thenReturn(select);
+    when(query.getLimit(any())).thenReturn(-1);
+    when (pr.getCachePerfStats()).thenReturn(mock(CachePerfStats.class));
+    when(pr.getMyId()).thenReturn(localNode);
+    when(pr.getDataStore()).thenReturn(dataStore);
+    when(pr.getCache()).thenReturn(cache);
+    
+  }
+
+  @Test
+  public void testLocalQueryReturnsResultsToPartitionedQueryEvaluator() throws Exception
{
+    List resultsForMember1 = createResultObjects("1", "2", "3");
+
+    PartitionedQueryScenario scenario = new PartitionedQueryScenario(localNode, allNodes,
noFailingMembers, createFakeBucketMap(), new ProcessDataFaker() {
+      public void processData(PartitionedRegionQueryEvaluator prqe) {
+        //this test won't have any remote nodes responding
+      }
+      
+      public void executeQueryLocally(Collection resultsCollector) {
+        resultsCollector.add(resultsForMember1);
+      }
+    });
+    
+    Set allBucketsToQuery = scenario.getAllBucketsToQuery();
+    Queue<PartitionedQueryScenario> scenarios = createScenariosQueue(scenario);
+    dataStore.setScenarios(scenarios);
+    
+    PartitionedRegionQueryEvaluator prqe = new ExtendedPartitionedRegionQueryEvaluator(system,
pr, query, null, new LinkedResultSet(), allBucketsToQuery, scenarios);
+    Collection results = prqe.queryBuckets(null).asList();
+    assertNotNull(results);
+    assertEquals(resultsForMember1.size(), results.size());
+    results.removeAll(resultsForMember1);
+    assertTrue(results.isEmpty());
+  }
+  
+  @Test
+  public void testRemoteAndLocalQueryReturnsResultsToPartitionedQueryEvaluator() throws Exception
{
+    List resultsForMember1 = createResultObjects("1", "2", "3");
+    List resultsForMember2 = createResultObjects("4", "5", "6");
+
+    PartitionedQueryScenario scenario = new PartitionedQueryScenario(localNode, allNodes,
noFailingMembers, createFakeBucketMap(), new ProcessDataFaker() {
+      public void processData(PartitionedRegionQueryEvaluator prqe) {
+        prqe.processData(resultsForMember2, remoteNodeA, 0, true);
+      }
+      
+      public void executeQueryLocally(Collection resultsCollector) {
+        resultsCollector.add(resultsForMember1);
+      }
+    });
+    
+    Set allBucketsToQuery = scenario.getAllBucketsToQuery();
+    Queue<PartitionedQueryScenario> scenarios = createScenariosQueue(scenario);
+    dataStore.setScenarios(scenarios);
+    
+    PartitionedRegionQueryEvaluator prqe = new ExtendedPartitionedRegionQueryEvaluator(system,
pr, query, null, new LinkedResultSet(), allBucketsToQuery, scenarios);
+    Collection results = prqe.queryBuckets(null).asList();
+    List expectedResults = new LinkedList();
+    expectedResults.addAll(resultsForMember1);
+    expectedResults.addAll(resultsForMember2);
+    assertNotNull(results);
+    assertEquals(expectedResults.size(), results.size());
+    results.removeAll(expectedResults);
+    assertTrue(results.isEmpty());
+  }
+  
+  
+  @Test
+  public void testFailingRemoteNodeAndRetryOnLocalNodeDoesNotSquashResultsOfOriginalQueryOnLocalNode()
throws Exception {
+    List resultsForMember1 = createResultObjects("1", "2", "3");
+    List resultsForMember2 = createResultObjects("A", "B", "C");
+    List resultsForMember1ForRetry = createResultObjects("&", "$", "!");
+    
+    Set<InternalDistributedMember> failingMembers = new HashSet<>();
+    failingMembers.add(remoteNodeB);
+    PartitionedQueryScenario allNodesUpAtBeginning = new PartitionedQueryScenario(localNode,
allNodes, failingMembers, createFakeBucketMap(), new ProcessDataFaker() {
+      public void processData(PartitionedRegionQueryEvaluator prqe) {
+        prqe.processData(resultsForMember2, remoteNodeA, 0, true);
+      }
+      
+      public void executeQueryLocally(Collection resultsCollector) {
+        resultsCollector.add(resultsForMember1);
+      }
+    });
+    
+    PartitionedQueryScenario afterFailureScenario = new PartitionedQueryScenario(localNode,
allNodes, noFailingMembers, createFakeBucketMapFailedNodesToLocalMember(), new ProcessDataFaker()
{
+      public void processData(PartitionedRegionQueryEvaluator prqe) {
+        //on retry we do not need to fake a retry on a remote node for this test
+      }
+      public void executeQueryLocally(Collection resultsCollector) {
+        resultsCollector.add(resultsForMember1);
+      }
+    });
+    Set allBucketsToQuery = allNodesUpAtBeginning.getAllBucketsToQuery();
+    Queue<PartitionedQueryScenario> scenarios = createScenariosQueue(allNodesUpAtBeginning,
afterFailureScenario);
+    dataStore.setScenarios(scenarios);
+    
+    PartitionedRegionQueryEvaluator prqe = new ExtendedPartitionedRegionQueryEvaluator(system,
pr, query, null, new LinkedResultSet(), allBucketsToQuery, scenarios);
+    Collection results = prqe.queryBuckets(null).asList();
+  
+    List expectedResults = new LinkedList();
+    expectedResults.addAll(resultsForMember1);
+    expectedResults.addAll(resultsForMember2);
+    expectedResults.addAll(resultsForMember1ForRetry);
+    assertNotNull(results);
+    assertEquals(expectedResults.size(), results.size());
+    results.removeAll(expectedResults);
+    assertTrue(results.isEmpty());
+    
+  }
+  
+  private  Map<InternalDistributedMember, List<Integer>> createFakeBucketMap()
{
+    Map<InternalDistributedMember, List<Integer>> bucketToNodeMap = new HashMap<>();
+    bucketToNodeMap.put(localNode, createBucketList(1, 2, 3));
+    bucketToNodeMap.put(remoteNodeA, createBucketList(4, 5, 6));
+    bucketToNodeMap.put(remoteNodeB, createBucketList(7, 8, 9));
+    return bucketToNodeMap;
+  }
+  
+  //fake bucket map to use after we fake a node failure
+  private  Map<InternalDistributedMember, List<Integer>> createFakeBucketMapFailedNodesToLocalMember()
{
+    Map<InternalDistributedMember, List<Integer>> bucketToNodeMap = new HashMap<>();
+    bucketToNodeMap.put(localNode, createBucketList(1, 2, 3, 7, 8, 9));
+    bucketToNodeMap.put(remoteNodeA, createBucketList(4, 5, 6));
+    return bucketToNodeMap;
+  }
+
+  private List<Integer> createBucketList(int... buckets) {
+    List<Integer> bucketList = new ArrayList<>();
+    for (int i : buckets) {
+      bucketList.add(i);
+    }
+    return bucketList;
+  }
+  
+  private List createResultObjects(Object... resultObjects) {
+    List results = new LinkedList();
+    for (Object o: resultObjects) {
+      results.add(o);
+    }
+    return results;
+  }
+  
+  class ExtendedPartitionedRegionDataStore extends PartitionedRegionDataStore {
+    //Must be the same referenced queue as that used by the ExtendedPartitionedRegionQueryEvaluator
+    //That way they will be synched to the same scenario;
+    Queue<PartitionedQueryScenario> scenarios;
+
+    void setScenarios(Queue<PartitionedQueryScenario> scenarios) {
+      this.scenarios = scenarios;
+    }
+    
+    @Override
+    public boolean isManagingBucket(int bucketId) {
+      return scenarios.peek().isLocalManagingBucket(bucketId);
+    }
+  }
+
+  class ExtendedPartitionedRegionQueryEvaluator extends PartitionedRegionQueryEvaluator {
+    Queue<PartitionedQueryScenario> scenarios;
+    //pass through so we can fake out the executeQuery locally
+    PRQueryProcessor extendedPRQueryProcessor;
+    
+    public ExtendedPartitionedRegionQueryEvaluator(InternalDistributedSystem sys, PartitionedRegion
pr, DefaultQuery query, Object[] parameters,
+        SelectResults cumulativeResults, Set<Integer> bucketsToQuery, Queue<PartitionedQueryScenario>
scenarios) {
+      super(sys, pr, query, parameters, cumulativeResults, bucketsToQuery);
+      this.scenarios = scenarios;
+      extendedPRQueryProcessor = new ExtendedPRQueryProcessor(pr, query, null, new LinkedList(bucketsToQuery));
+    }
+    
+    private PartitionedQueryScenario currentScenario() {
+      return scenarios.peek();
+    }
+
+    // (package access for unit test purposes)
+    Map<InternalDistributedMember, List<Integer>> buildNodeToBucketMap() throws
QueryException
+    {
+      return currentScenario().bucketMap;
+    }
+
+    @Override
+    protected StreamingQueryPartitionResponse createStreamingQueryPartitionResponse(InternalDistributedSystem
system, HashMap<InternalDistributedMember, List<Integer>> n2b) {
+      StreamingQueryPartitionResponse response = new FakeNumFailStreamingQueryPartitionResponse(system,
n2b, this, scenarios);
+      return response;
+    }
+
+    @Override
+    protected Set sendMessage(DistributionMessage m) {
+      //Don't need to actually send the message...
+      return null;
+    }
+    
+    @Override
+    protected PRQueryProcessor createLocalPRQueryProcessor(List<Integer> bucketList)
{
+      return extendedPRQueryProcessor;
+    }
+    
+    @Override
+    protected ArrayList getAllNodes() {
+      return currentScenario().allNodes;
+    }
+    
+    @Override 
+    protected Set<InternalDistributedMember> getBucketOwners(Integer bid) {
+      return currentScenario().getBucketOwners(bid);
+    }
+    
+    class ExtendedPRQueryProcessor extends PRQueryProcessor {
+
+      public ExtendedPRQueryProcessor(PartitionedRegion pr, DefaultQuery query, Object[]
parameters, List buckets) {
+        super(pr, query, parameters, buckets);
+      }
+
+      public ExtendedPRQueryProcessor(PartitionedRegionDataStore prDS, DefaultQuery query,
Object[] parameters, List<Integer> buckets) {
+        super(prDS, query, parameters, buckets);
+      }
+      
+      @Override
+      public boolean executeQuery(Collection resultsCollector) {
+        currentScenario().processDataFaker.executeQueryLocally(resultsCollector);
+        return true;
+      }
+    }
+
+    class FakeNumFailStreamingQueryPartitionResponse extends StreamingQueryPartitionResponse
{
+      private PartitionedRegionQueryEvaluator processor;
+      Queue<PartitionedQueryScenario> scenarios;
+      
+      public FakeNumFailStreamingQueryPartitionResponse(InternalDistributedSystem system,
HashMap<InternalDistributedMember, List<Integer>> n2b, PartitionedRegionQueryEvaluator
processor, Queue<PartitionedQueryScenario> scenarios) {
+        super(system, n2b.keySet());
+        this.processor = processor;
+        this.scenarios = scenarios;
+      }
+
+      @Override
+      public Set<InternalDistributedMember> waitForCacheOrQueryException() throws CacheException,
QueryException {
+        currentScenario().processDataFaker.processData(processor);
+        Set<InternalDistributedMember> returnValue = currentScenario().failingNodes;
+        advanceTheScenario();
+        return returnValue;
+      }
+      
+      private void advanceTheScenario() {
+        if (scenarios.isEmpty()) {
+          return;
+        }
+        scenarios.remove();
+      }
+    }
+  }
+  
+  interface ProcessDataFaker {
+    void processData(PartitionedRegionQueryEvaluator processor);
+    void executeQueryLocally(Collection resultsCollector);
+  }
+  
+  //holds information on how the PRQE is to behave and what responses are "returned"
+  private class PartitionedQueryScenario {
+    private InternalDistributedMember localNode;
+    private ArrayList allNodes;
+    private Set<InternalDistributedMember> failingNodes;
+    private ProcessDataFaker processDataFaker;
+    private Map<InternalDistributedMember, List<Integer>> bucketMap;
+    
+    public PartitionedQueryScenario(InternalDistributedMember localNode, ArrayList allNodes,
Set<InternalDistributedMember> failingNodes,  Map<InternalDistributedMember, List<Integer>>
bucketMap, ProcessDataFaker processDataFaker) {
+      this.localNode = localNode;
+      this.allNodes = allNodes;
+      this.failingNodes = failingNodes;
+      this.bucketMap = bucketMap;
+      this.processDataFaker = processDataFaker;
+    }
+    
+    public Set getAllBucketsToQuery() {
+      Set allBuckets = new HashSet();
+      bucketMap.values().stream().forEach(list -> allBuckets.addAll(list));
+      return allBuckets;
+    }
+    
+    public Set<InternalDistributedMember> getBucketOwners(Integer bucketId) {
+      Set<InternalDistributedMember> owners = new HashSet<>();
+      bucketMap.entrySet().stream().forEach(entrySet -> {
+          if (entrySet.getValue().contains(bucketId)) {
+            owners.add(entrySet.getKey());
+          }
+      });
+      return owners;
+    }
+    
+    public boolean isLocalManagingBucket(int bucketId) {
+      return getBucketOwners(bucketId).contains(localNode);
+    }
+  }
+  
+  private Queue<PartitionedQueryScenario> createScenariosQueue(PartitionedQueryScenario...
scenarios) {
+    Queue<PartitionedQueryScenario> queue = new LinkedList<>();
+    for (PartitionedQueryScenario scenario: scenarios) {
+      queue.add(scenario);
+    }
+    return queue;
+    
+  }
+}



Mime
View raw message