geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [15/50] incubator-geode git commit: GEODE-74: Making the satisfy redundancy phase of rebalance parallel
Date Fri, 24 Jul 2015 23:15:38 GMT
GEODE-74: Making the satisfy redundancy phase of rebalance parallel

Tasks submitted to background threads to trigger redundancy
satisfaction. After the satisfy redundancy phase is done we wait for the
tasks to finish.

The number of buckets that can be recovering in parallel is controlled
by the system property gemfire.MAX_PARALLEL_BUCKET_RECOVERIES, currently
set to 8.

If a redundancy recovery/rebalance is restarted due to a membership
change, wait for any in progress operations to complete before fetching
new information from all of the members.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/9fa9ced0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/9fa9ced0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/9fa9ced0

Branch: refs/heads/feature/GEODE-86
Commit: 9fa9ced08f1851f97d2d2407f1519dcc3cac06e0
Parents: 8c411a8
Author: Dan Smith <dsmith@pivotal.io>
Authored: Fri Jun 12 17:35:24 2015 -0700
Committer: Dan Smith <dsmith@pivotal.io>
Committed: Tue Jul 14 12:25:42 2015 -0700

----------------------------------------------------------------------
 .../control/PartitionRebalanceDetailsImpl.java  |   8 +-
 .../PartitionedRegionRebalanceOp.java           | 100 +++++++++----
 .../partitioned/rebalance/BucketOperator.java   |  46 +++++-
 .../rebalance/ParallelBucketOperator.java       | 145 +++++++++++++++++++
 .../rebalance/PartitionedRegionLoadModel.java   |  45 +++++-
 .../rebalance/SatisfyRedundancy.java            |   7 +-
 .../rebalance/SatisfyRedundancyFPR.java         |   1 +
 .../rebalance/SimulatedBucketOperator.java      |  11 +-
 .../control/RebalanceOperationDUnitTest.java    |   8 +-
 .../PartitionedRegionLoadModelJUnitTest.java    |  97 ++++++++++++-
 10 files changed, 405 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9fa9ced0/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/PartitionRebalanceDetailsImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/PartitionRebalanceDetailsImpl.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/PartitionRebalanceDetailsImpl.java
index da67a83..b74bf69 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/PartitionRebalanceDetailsImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/PartitionRebalanceDetailsImpl.java
@@ -48,26 +48,26 @@ Serializable, Comparable<PartitionRebalanceDetailsImpl> {
     this.region = region;
   }
 
-  public void incCreates(long bytes, long time) {
+  public synchronized void incCreates(long bytes, long time) {
     bucketCreateBytes+= bytes;
     bucketCreateTime += time;
     bucketCreatesCompleted++;
   }
   
-  public void incRemoves(long bytes, long time) {
+  public synchronized void incRemoves(long bytes, long time) {
     bucketRemoveBytes+= bytes;
     bucketRemoveTime += time;
     bucketRemovesCompleted++; 
     
   }
   
-  public void incTransfers(long bytes, long time) {
+  public synchronized void incTransfers(long bytes, long time) {
     bucketTransferBytes+= bytes;
     bucketTransferTime += time;
     bucketTransfersCompleted++;
   }
   
-  public void incPrimaryTransfers(long time) {
+  public synchronized void incPrimaryTransfers(long time) {
     primaryTransfersCompleted++;
     primaryTransferTime += time;
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9fa9ced0/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
index 7b231ff..39f4e97 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
@@ -38,6 +38,7 @@ import com.gemstone.gemfire.internal.cache.partitioned.BecomePrimaryBucketMessag
 import com.gemstone.gemfire.internal.cache.partitioned.MoveBucketMessage.MoveBucketResponse;
 import com.gemstone.gemfire.internal.cache.partitioned.RemoveBucketMessage.RemoveBucketResponse;
 import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator;
+import com.gemstone.gemfire.internal.cache.partitioned.rebalance.ParallelBucketOperator;
 import com.gemstone.gemfire.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel;
 import com.gemstone.gemfire.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.AddressComparor;
 import com.gemstone.gemfire.internal.cache.partitioned.rebalance.RebalanceDirector;
@@ -75,6 +76,8 @@ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 public class PartitionedRegionRebalanceOp {
   private static final Logger logger = LogService.getLogger();
   
+  private static final int MAX_PARALLEL_OPERATIONS = Integer.getInteger("gemfire.MAX_PARALLEL_BUCKET_RECOVERIES",
8);
+  
   private final boolean simulate;
   private final boolean replaceOfflineData;
   private final PartitionedRegion leaderRegion;
@@ -177,9 +180,10 @@ public class PartitionedRegionRebalanceOp {
 
       GemFireCacheImpl cache = (GemFireCacheImpl) leaderRegion.getCache();
       Map<PartitionedRegion, InternalPRInfo> detailsMap = fetchDetails(cache);
-      BucketOperatorWrapper operator = getBucketOperator(detailsMap);
-      model = buildModel(operator, detailsMap, resourceManager);
-      for(PartitionRebalanceDetailsImpl details : operator.getDetailSet()) {
+      BucketOperatorWrapper serialOperator = getBucketOperator(detailsMap);
+      ParallelBucketOperator parallelOperator = new ParallelBucketOperator(MAX_PARALLEL_OPERATIONS,
cache.getDistributionManager().getWaitingThreadPool(), serialOperator);
+      model = buildModel(parallelOperator, detailsMap, resourceManager);
+      for(PartitionRebalanceDetailsImpl details : serialOperator.getDetailSet()) {
         details.setPartitionMemberDetailsBefore(model.getPartitionedMemberDetails(details.getRegionPath()));
       }
 
@@ -199,8 +203,9 @@ public class PartitionedRegionRebalanceOp {
           if(this.stats != null) {
             this.stats.incRebalanceMembershipChanges(1);
           }
+          model.waitForOperations();
           detailsMap = fetchDetails(cache);
-          model = buildModel(operator, detailsMap, resourceManager);
+          model = buildModel(parallelOperator, detailsMap, resourceManager);
           director.membershipChanged(model);
         }
 
@@ -222,14 +227,14 @@ public class PartitionedRegionRebalanceOp {
       }
       long end = System.nanoTime();
       
-      for(PartitionRebalanceDetailsImpl details : operator.getDetailSet()) {
+      for(PartitionRebalanceDetailsImpl details : serialOperator.getDetailSet()) {
         if(!simulate) {
           details.setTime(end - start);
         }
         details.setPartitionMemberDetailsAfter(model.getPartitionedMemberDetails(details.getRegionPath()));
       }
 
-      return Collections.<PartitionRebalanceInfo>unmodifiableSet(operator.getDetailSet());
+      return Collections.<PartitionRebalanceInfo>unmodifiableSet(serialOperator.getDetailSet());
     } finally {
       if(lock != null) {
         try {
@@ -620,6 +625,7 @@ public class PartitionedRegionRebalanceOp {
   
   private class BucketOperatorImpl implements BucketOperator {
 
+    @Override
     public boolean moveBucket(InternalDistributedMember source,
         InternalDistributedMember target, int bucketId,
         Map<String, Long> colocatedRegionBytes) {
@@ -629,6 +635,7 @@ public class PartitionedRegionRebalanceOp {
       return moveBucketForRegion(source, target, bucketId, leaderRegion);
     }
 
+    @Override
     public boolean movePrimary(InternalDistributedMember source,
         InternalDistributedMember target, int bucketId) {
 
@@ -637,13 +644,29 @@ public class PartitionedRegionRebalanceOp {
       return movePrimaryBucketForRegion(target, bucketId, leaderRegion, isRebalance); 
     }
 
-    public boolean createRedundantBucket(
+    @Override
+    public void createRedundantBucket(
         InternalDistributedMember targetMember, int bucketId,
-        Map<String, Long> colocatedRegionBytes) {
-      return createRedundantBucketForRegion(targetMember, bucketId,
+        Map<String, Long> colocatedRegionBytes, Completion completion) {
+      boolean result = false;
+      try {
+        result = createRedundantBucketForRegion(targetMember, bucketId,
           leaderRegion, isRebalance,replaceOfflineData);
+      } finally {
+        if(result) {
+          completion.onSuccess();
+        } else {
+          completion.onFailure();
+        }
+      }
+    }
+    
+    @Override
+    public void waitForOperations() {
+      //do nothing, all operations are synchronous
     }
 
+    @Override
     public boolean removeBucket(InternalDistributedMember targetMember, int bucketId,
         Map<String, Long> colocatedRegionBytes) {
       return removeRedundantBucketForRegion(targetMember, bucketId,
@@ -670,7 +693,7 @@ public class PartitionedRegionRebalanceOp {
       this.detailSet = rebalanceDetails;
       this.regionCount = detailSet.size();
     }
-    
+    @Override
     public boolean moveBucket(InternalDistributedMember sourceMember,
         InternalDistributedMember targetMember, int id,
         Map<String, Long> colocatedRegionBytes) {
@@ -715,23 +738,23 @@ public class PartitionedRegionRebalanceOp {
       return result;
     }
 
-    public boolean createRedundantBucket(
-        InternalDistributedMember targetMember, int i, 
-        Map<String, Long> colocatedRegionBytes) {
-      boolean result = false;
-      long elapsed = 0;
-      long totalBytes = 0;
-      
+    @Override
+    public void createRedundantBucket(
+        final InternalDistributedMember targetMember, final int i, 
+        final Map<String, Long> colocatedRegionBytes, final Completion completion)
{
       
       if(stats != null) {
         stats.startBucketCreate(regionCount);
       }
-      try {
-        long start = System.nanoTime();
-        result = delegate.createRedundantBucket(targetMember, i,  
-            colocatedRegionBytes);
-        elapsed= System.nanoTime() - start;
-        if (result) {
+      
+      final long start = System.nanoTime();
+      delegate.createRedundantBucket(targetMember, i,  
+          colocatedRegionBytes, new Completion() {
+
+        @Override
+        public void onSuccess() {
+          long totalBytes = 0;
+          long elapsed= System.nanoTime() - start;
           if(logger.isDebugEnabled()) {
             logger.debug("Rebalancing {} redundant bucket {} created on {}", leaderRegion,
i, targetMember);
           }
@@ -746,20 +769,29 @@ public class PartitionedRegionRebalanceOp {
               totalBytes += regionBytes;
             }
           }
-        } else {
+
+          if(stats != null) {
+            stats.endBucketCreate(regionCount, true, totalBytes, elapsed);
+          }
+
+        }
+
+        @Override
+        public void onFailure() {
+          long elapsed= System.nanoTime() - start;
+
           if (logger.isDebugEnabled()) {
             logger.debug("Rebalancing {} redundant bucket {} failed creation on {}", leaderRegion,
i, targetMember);
           }
+
+          if(stats != null) {
+            stats.endBucketCreate(regionCount, false, 0, elapsed);
+          }
         }
-      } finally {
-        if(stats != null) {
-          stats.endBucketCreate(regionCount, result, totalBytes, elapsed);
-        }
-      }
-      
-      return result;
+      });
     }
     
+    @Override
     public boolean removeBucket(
         InternalDistributedMember targetMember, int i, 
         Map<String, Long> colocatedRegionBytes) {
@@ -805,6 +837,7 @@ public class PartitionedRegionRebalanceOp {
       return result;
     }
   
+    @Override
     public boolean movePrimary(InternalDistributedMember source,
         InternalDistributedMember target, int bucketId) {
       boolean result = false;
@@ -838,6 +871,11 @@ public class PartitionedRegionRebalanceOp {
       
       return result;
     }
+    
+    @Override
+    public void waitForOperations() {
+      delegate.waitForOperations();
+    }
 
     public Set<PartitionRebalanceDetailsImpl> getDetailSet() {
       return this.detailSet;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9fa9ced0/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperator.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperator.java
index 3c62c7b..0bc51d3 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperator.java
@@ -20,14 +20,25 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM
 public interface BucketOperator {
 
   /**
-   * Create a redundancy copy of a bucket on a given node
-   * @param targetMember the node to create the bucket on
-   * @param bucketId the id of the bucket to create
-   * @param colocatedRegionBytes the size of the bucket in bytes
-   * @return true if a redundant copy of the bucket was created.
+   * Create a redundancy copy of a bucket on a given node. This call may be
+   * asynchronous, it will notify the completion when the the operation is done.
+   * 
+   * Note that the completion is not required to be threadsafe, so implementors
+   * should ensure the completion is invoked by the calling thread of
+   * createRedundantBucket, usually by invoking the completions in waitForOperations.
+   * 
+   * @param targetMember
+   *          the node to create the bucket on
+   * @param bucketId
+   *          the id of the bucket to create
+   * @param colocatedRegionBytes
+   *          the size of the bucket in bytes
+   * @param completion
+   *          a callback which will receive a notification on the success or
+   *          failure of the operation.
    */
-  boolean createRedundantBucket(InternalDistributedMember targetMember,
-      int bucketId, Map<String, Long> colocatedRegionBytes);
+  void createRedundantBucket(InternalDistributedMember targetMember,
+      int bucketId, Map<String, Long> colocatedRegionBytes, Completion completion);
 
   /**
    * Remove a bucket from the target member.
@@ -57,4 +68,25 @@ public interface BucketOperator {
    */
   boolean movePrimary(InternalDistributedMember source,
       InternalDistributedMember target, int bucketId);
+  
+  /**
+   * Wait for any pending asynchronous operations that this thread submitted
+   * earlier to complete. Currently only createRedundantBucket may be
+   * asynchronous.
+   */
+  public void waitForOperations();
+  
+  /**
+   * Callbacks for asnychonous operations. These methods will be invoked when an
+   * ansynchronous operation finishes.
+   * 
+   * The completions are NOT THREADSAFE.
+   * 
+   * They will be completed when createRedundantBucket or waitForOperations is
+   * called.
+   */
+  public interface Completion {
+    public void onSuccess();
+    public void onFailure();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9fa9ced0/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/ParallelBucketOperator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/ParallelBucketOperator.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/ParallelBucketOperator.java
new file mode 100644
index 0000000..acea21a
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/ParallelBucketOperator.java
@@ -0,0 +1,145 @@
+package com.gemstone.gemfire.internal.cache.partitioned.rebalance;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+
+/**
+ * A bucket operator that will perform operations on a bucket asynchronously.
+ * 
+ * This class wraps a delegate bucket operator that is synchronous. That is, the
+ * delegate bucket operator is expected to move the bucket and notify the
+ * Completion within the scope of the call to create bucket.
+ * 
+ * What this class does in make that call asynchronous. A task to create the
+ * bucket is handed to the thread pool and executed there. After it is done, the
+ * completion is notified.
+ * 
+ * Calling waitForOperations waits for all previously submitted operations and 
+ * ensures the completions are notified.
+ * 
+ * Note that only createRedundantBucket is asynchronous, the rest of the 
+ * operations are synchronous.
+ * 
+ */
+public class ParallelBucketOperator implements BucketOperator {
+
+  private final BucketOperator delegate;
+  private final ExecutorService executor;
+  private final Semaphore operationSemaphore;
+  private final int maxParallelOperations;
+  private final ConcurrentLinkedQueue<Completion> pendingSuccess = new ConcurrentLinkedQueue<BucketOperator.Completion>();
+  private final ConcurrentLinkedQueue<Completion> pendingFailure = new ConcurrentLinkedQueue<BucketOperator.Completion>();
+  
+
+  /**
+   * Create a parallel bucket operator
+   * @param maxParallelOperations The number of operations that can execute concurrently.
Futher calls to createRedundantBucket will block.
+   * @param executor the executor to submit tasks to. This executor should be able to create
at least maxParallelOperations threads.
+   * @param operator A bucket operator that is synchronous that will do the actual work of
creating a bucket.
+   */
+  public ParallelBucketOperator(int maxParallelOperations, ExecutorService executor, BucketOperator
operator) {
+    this.maxParallelOperations = maxParallelOperations;
+    this.operationSemaphore = new Semaphore(maxParallelOperations);
+    this.delegate = operator;
+    this.executor = executor;
+  }
+
+  /**
+   * Create a redundant bucket asynchronously. If maxParallelOperations is not reached, this
call will submit 
+   * a task and return immediately. Otherwise, it will block until an executor thread is
available to 
+   * take a task.
+   * 
+   * The completion will not be notified until the caller makes another call to createRedundant
bucket or
+   * waitForOperations.
+   */
+  @Override
+  public void createRedundantBucket(final InternalDistributedMember targetMember,
+      final int bucketId, final Map<String, Long> colocatedRegionBytes,
+      final Completion completion) {
+    drainCompletions();
+    operationSemaphore.acquireUninterruptibly();
+    executor.execute(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          delegate.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, new
Completion() {
+            @Override
+            public void onSuccess() {
+              pendingSuccess.add(completion);
+            }
+            
+            @Override
+            public void onFailure() {
+              pendingFailure.add(completion);
+            }
+          });
+        } finally {
+          operationSemaphore.release();
+        }
+      }
+    });
+    
+  }
+
+  @Override
+  public boolean removeBucket(InternalDistributedMember memberId, int id,
+      Map<String, Long> colocatedRegionSizes) {
+    return delegate.removeBucket(memberId, id, colocatedRegionSizes);
+  }
+
+  @Override
+  public boolean moveBucket(InternalDistributedMember sourceMember,
+      InternalDistributedMember targetMember, int bucketId,
+      Map<String, Long> colocatedRegionBytes) {
+      return delegate.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
+  }
+
+  @Override
+  public boolean movePrimary(InternalDistributedMember source,
+      InternalDistributedMember target, int bucketId) {
+    return delegate.movePrimary(source, target, bucketId);
+  }
+  
+  public void drainCompletions() {
+    Completion next = null;
+    while((next = pendingSuccess.poll()) != null) {
+      next.onSuccess();
+    }
+    
+    while((next = pendingFailure.poll()) != null) {
+      next.onFailure();
+    }
+    
+  }
+
+  /**
+   * Wait for any pending operations, and notify the the completions
+   * that the operations and done.
+   */
+  public void waitForOperations() {
+    boolean interrupted = false;
+    while(!executor.isShutdown()) {
+      try {
+        if(operationSemaphore.tryAcquire(maxParallelOperations, 1, TimeUnit.SECONDS)) {
+          operationSemaphore.release(maxParallelOperations);
+          
+          drainCompletions();
+          
+          if(interrupted) {
+            Thread.currentThread().interrupt();
+          }
+          
+          return; 
+        }
+      } catch (InterruptedException e) {
+        interrupted = true;
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9fa9ced0/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/PartitionedRegionLoadModel.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/PartitionedRegionLoadModel.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/PartitionedRegionLoadModel.java
index 45b7a1f..26109bd 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/PartitionedRegionLoadModel.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/PartitionedRegionLoadModel.java
@@ -363,14 +363,20 @@ public class PartitionedRegionLoadModel {
     return colocatedRegionSizes;
   }
 
-  public void createRedundantBucket(BucketRollup bucket,
-      Member targetMember) {
+  /**
+   * Trigger the creation of a redundant bucket, potentially asynchronously.
+   * 
+   * This method will find the best node to create a redundant bucket and 
+   * invoke the bucket operator to create a bucket on that node. Because the bucket
+   * operator is asynchronous, the bucket may not be created immediately, but
+   * the model will be updated regardless. Invoke {@link #waitForOperations()}
+   * to wait for those operations to actually complete
+   */
+  public void createRedundantBucket(final BucketRollup bucket,
+      final Member targetMember) {
     Map<String, Long> colocatedRegionSizes = getColocatedRegionSizes(bucket);
-    Move move = new Move(null, targetMember, bucket);
+    final Move move = new Move(null, targetMember, bucket);
     
-    if(!this.operator.createRedundantBucket(targetMember.getMemberId(), bucket.getId(), colocatedRegionSizes))
{
-      this.attemptedBucketCreations.add(move);
-    } else {
       this.lowRedundancyBuckets.remove(bucket);
       bucket.addMember(targetMember);
       //put the bucket back into the list if we still need to satisfy redundancy for
@@ -379,7 +385,24 @@ public class PartitionedRegionLoadModel {
         this.lowRedundancyBuckets.add(bucket);
       }
       resetAverages();
-    }
+    
+    this.operator.createRedundantBucket(targetMember.getMemberId(), bucket.getId(), colocatedRegionSizes,
new BucketOperator.Completion() {
+      @Override
+      public void onSuccess() {
+    }
+
+      @Override
+      public void onFailure() {
+        //If the bucket creation failed, we need to undo the changes
+        //we made to the model
+        attemptedBucketCreations.add(move);
+        bucket.removeMember(targetMember);
+        if(bucket.getRedundancy() < requiredRedundancy) {
+          lowRedundancyBuckets.add(bucket);
+        }
+        resetAverages();
+      }
+    });
   }
   
   
@@ -842,6 +865,14 @@ public class PartitionedRegionLoadModel {
     return variance;
   }
   
+  /**
+   * Wait for the bucket operator to complete
+   * any pending asynchronous operations.
+   */
+  public void waitForOperations() {
+    operator.waitForOperations();
+  }
+  
   @Override
   public String toString() {
     StringBuilder result = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9fa9ced0/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/SatisfyRedundancy.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/SatisfyRedundancy.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/SatisfyRedundancy.java
index 0f41b2e..3291056 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/SatisfyRedundancy.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/SatisfyRedundancy.java
@@ -39,7 +39,12 @@ public class SatisfyRedundancy extends RebalanceDirectorAdapter {
 
   @Override
   public boolean nextStep() {
-    return satisfyRedundancy();
+    if(satisfyRedundancy()) {
+      return true;
+    }  else {
+      model.waitForOperations();
+      return satisfyRedundancy();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9fa9ced0/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/SatisfyRedundancyFPR.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/SatisfyRedundancyFPR.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/SatisfyRedundancyFPR.java
index 36989f6..9b6ec3b 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/SatisfyRedundancyFPR.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/SatisfyRedundancyFPR.java
@@ -76,5 +76,6 @@ public class SatisfyRedundancyFPR extends RebalanceDirectorAdapter {
       
       model.createRedundantBucket(bucket, targetMember);
     }
+    model.waitForOperations();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9fa9ced0/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/SimulatedBucketOperator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/SimulatedBucketOperator.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/SimulatedBucketOperator.java
index fb8d331..76ce4c9 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/SimulatedBucketOperator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/SimulatedBucketOperator.java
@@ -18,9 +18,10 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM
  */
 public class SimulatedBucketOperator implements BucketOperator {
 
-  public boolean createRedundantBucket(
-      InternalDistributedMember targetMember, int i, Map<String, Long> colocatedRegionBytes)
{
-    return true;
+  public void createRedundantBucket(
+      InternalDistributedMember targetMember, int i, Map<String, Long> colocatedRegionBytes,

+      BucketOperator.Completion completion) {
+    completion.onSuccess();
   }
   
   public boolean moveBucket(InternalDistributedMember source,
@@ -38,4 +39,8 @@ public class SimulatedBucketOperator implements BucketOperator {
       Map<String, Long> colocatedRegionSizes) {
     return true;
   }
+
+  @Override
+  public void waitForOperations() {
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9fa9ced0/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
index 041a217..d1888e7 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
@@ -2753,17 +2753,19 @@ public class RebalanceOperationDUnitTest extends CacheTestCase {
     assertEquals(0, stats.getRebalanceBucketCreatesInProgress());
     assertEquals(results.getTotalBucketCreatesCompleted(), stats.getRebalanceBucketCreatesCompleted());
     assertEquals(0, stats.getRebalanceBucketCreatesFailed());
-//    assertEquals(results.getTotalBucketCreateTime(), TimeUnit.NANOSECONDS.toMillis(stats.getRebalanceBucketCreateTime()));
+    //The time stats may not be exactly the same, because they are not
+    //incremented at exactly the same time.
+    assertEquals(results.getTotalBucketCreateTime(), TimeUnit.NANOSECONDS.toMillis(stats.getRebalanceBucketCreateTime()),
2000);
     assertEquals(results.getTotalBucketCreateBytes(), stats.getRebalanceBucketCreateBytes());
     assertEquals(0, stats.getRebalanceBucketTransfersInProgress());
     assertEquals(results.getTotalBucketTransfersCompleted(), stats.getRebalanceBucketTransfersCompleted());
     assertEquals(0, stats.getRebalanceBucketTransfersFailed());
-    //assertEquals(results.getTotalBucketTransferTime(), TimeUnit.NANOSECONDS.toMillis(stats.getRebalanceBucketTransfersTime()));
+    assertEquals(results.getTotalBucketTransferTime(), TimeUnit.NANOSECONDS.toMillis(stats.getRebalanceBucketTransfersTime()),
2000);
     assertEquals(results.getTotalBucketTransferBytes(), stats.getRebalanceBucketTransfersBytes());
     assertEquals(0, stats.getRebalancePrimaryTransfersInProgress());
     assertEquals(results.getTotalPrimaryTransfersCompleted(), stats.getRebalancePrimaryTransfersCompleted());
     assertEquals(0, stats.getRebalancePrimaryTransfersFailed());
-//    assertEquals(results.getTotalPrimaryTransferTime(), TimeUnit.NANOSECONDS.toMillis(stats.getRebalancePrimaryTransferTime()));
+    assertEquals(results.getTotalPrimaryTransferTime(), TimeUnit.NANOSECONDS.toMillis(stats.getRebalancePrimaryTransferTime()),
2000);
   }
   
   private Set<Integer> getBucketList(final String regionName, VM vm0) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9fa9ced0/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionLoadModelJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionLoadModelJUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionLoadModelJUnitTest.java
index d1ea492..9195231 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionLoadModelJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionLoadModelJUnitTest.java
@@ -34,6 +34,7 @@ import com.gemstone.gemfire.DataSerializer;
 import com.gemstone.gemfire.cache.partition.PartitionMemberInfo;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes;
+import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator.Completion;
 import com.gemstone.gemfire.internal.cache.partitioned.rebalance.CompositeDirector;
 import com.gemstone.gemfire.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel;
 import com.gemstone.gemfire.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.AddressComparor;
@@ -477,13 +478,14 @@ public class PartitionedRegionLoadModelJUnitTest {
     InternalDistributedMember member3 = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"),
3);
     MyBucketOperator op = new MyBucketOperator() {
       @Override
-      public boolean createRedundantBucket(
+      public void createRedundantBucket(
           InternalDistributedMember targetMember, int i,
-          Map<String, Long> colocatedRegionBytes) {
+          Map<String, Long> colocatedRegionBytes, Completion completion) {
         if(targetMember.equals(member2)) {
-          return false;
+          completion.onFailure();
+        } else {
+          super.createRedundantBucket(targetMember, i, colocatedRegionBytes, completion);
         }
-        return super.createRedundantBucket(targetMember, i, colocatedRegionBytes);
       }
       
     };
@@ -510,6 +512,57 @@ public class PartitionedRegionLoadModelJUnitTest {
   }
   
   /**
+   * Test that redundancy satisfation can handle asynchronous failures
+   * and complete the job correctly. 
+   * @throws Exception
+   */
+  @Test
+  public void testRedundancySatisfactionWithAsyncFailures() throws Exception {
+    InternalDistributedMember member1 = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"),
1);
+    InternalDistributedMember member2 = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"),
2);
+    InternalDistributedMember member3 = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"),
3);
+    
+    BucketOperatorWithFailures operator = new BucketOperatorWithFailures();
+    operator.addBadMember(member2);
+    bucketOperator = operator;
+    PartitionedRegionLoadModel model = new PartitionedRegionLoadModel(
+        bucketOperator, 1, 6, getAddressComparor(false),
+        Collections.<InternalDistributedMember>emptySet(), null);
+    PartitionMemberInfoImpl details1 = buildDetails(member1, 500, 500, new long[] {1,1,1,1,1,1},
new long[] {1,1,1,1,1,1});
+    PartitionMemberInfoImpl details2 = buildDetails(member2, 500, 500, new long[] {0,0,0,0,0,0},
new long[] {0,0,0,0,0,0});
+    PartitionMemberInfoImpl details3 = buildDetails(member3, 500, 500, new long[] {0,0,0,0,0,0},
new long[] {0,0,0,0,0,0});
+    model.addRegion("a", Arrays.asList(details1, details2, details3), new FakeOfflineDetails(),
true);
+    
+    Set<PartitionMemberInfo> details = model.getPartitionedMemberDetails("a");
+    assertEquals(3, details.size());
+    
+    //TODO - make some assertions about what's in the details
+
+    //we expect 6 moves (3 of these will fail)
+    assertEquals(6, doMoves(new CompositeDirector(true, true, false, false), model));
+    
+    assertEquals(3, bucketOperator.creates.size());
+    for(Completion completion: operator.pendingSuccesses) {
+      completion.onSuccess();
+    }
+    for(Completion completion: operator.pendingFailures) {
+      completion.onFailure();
+    }
+    
+    //Now the last two moves will get reattempted to a new location (because the last location
failed)
+    assertEquals(3, doMoves(new CompositeDirector(true, true, false, false), model));
+    
+    List<Create> expectedCreates = new ArrayList<Create>();
+    expectedCreates.add(new Create(member3, 1));
+    expectedCreates.add(new Create(member3, 3));
+    expectedCreates.add(new Create(member3, 5));
+    expectedCreates.add(new Create(member3, 0));
+    expectedCreates.add(new Create(member3, 2));
+    expectedCreates.add(new Create(member3, 4));
+    assertEquals(expectedCreates, bucketOperator.creates);
+  }
+  
+  /**
    * Very basic test of moving primaries. Creates two nodes and four buckets, with a copy
    * of each bucket on both nodes. All of the primaries are on one node. It expects half
the
    * primaries to move to the other node.
@@ -1394,14 +1447,15 @@ public class PartitionedRegionLoadModelJUnitTest {
     private MoveType lastMove = null;
     
 
-    public boolean createRedundantBucket(
-        InternalDistributedMember targetMember, int i, Map<String, Long> colocatedRegionBytes)
{
+    @Override
+    public void createRedundantBucket(
+        InternalDistributedMember targetMember, int i, Map<String, Long> colocatedRegionBytes,
Completion completion) {
       creates.add(new Create(targetMember, i));
       if(DEBUG) {
         System.out.println("Created bucket " + i + " on " + targetMember);
       }
       lastMove = MoveType.CREATE;
-      return true;
+      completion.onSuccess();
     }
 
     @Override
@@ -1441,6 +1495,35 @@ public class PartitionedRegionLoadModelJUnitTest {
     
   }
   
+  public static class BucketOperatorWithFailures extends MyBucketOperator {
+    List<Completion> pendingSuccesses = new ArrayList<Completion>();
+    List<Completion> pendingFailures = new ArrayList<Completion>();
+    Set<InternalDistributedMember> badMembers = new HashSet<InternalDistributedMember>
();
+
+    public void addBadMember(InternalDistributedMember member) {
+      this.badMembers.add(member);
+    }
+    @Override
+    public void createRedundantBucket(InternalDistributedMember targetMember,
+        int i, Map<String, Long> colocatedRegionBytes, Completion completion) {
+      if(badMembers.contains(targetMember)) {
+        pendingFailures.add(completion);
+      } else {
+        super.createRedundantBucket(targetMember, i, colocatedRegionBytes, new Completion()
{
+          @Override
+          public void onSuccess() {
+          }
+
+          @Override
+          public void onFailure() {
+          }
+        });
+        
+        pendingSuccesses.add(completion);
+;      }
+    }
+  }
+  
   private static enum MoveType {
     CREATE,
     MOVE_PRIMARY,



Mime
View raw message