geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jasonhu...@apache.org
Subject [1/2] geode git commit: GEODE-2787: StateFlushOperation will now wait for in flight operations from the recipient
Date Mon, 15 May 2017 19:03:40 GMT
Repository: geode
Updated Branches:
  refs/heads/develop 9a1aeddca -> 3a1062e24


GEODE-2787: StateFlushOperation will now wait for in flight operations from the recipient


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/3a1062e2
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/3a1062e2
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/3a1062e2

Branch: refs/heads/develop
Commit: 3a1062e245b3ded52ea3f6b6de0aff94ce846fa3
Parents: 161a9a0
Author: Jason Huynh <huynhja@gmail.com>
Authored: Mon May 15 09:05:14 2017 -0700
Committer: Jason Huynh <huynhja@gmail.com>
Committed: Mon May 15 12:03:24 2017 -0700

----------------------------------------------------------------------
 .../internal/cache/StateFlushOperation.java     | 59 +++++++++++++-------
 .../geode/internal/cache/GIIDeltaDUnitTest.java |  2 +-
 2 files changed, 41 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/3a1062e2/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
index 8ffdf06..b7726b1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
@@ -187,7 +187,7 @@ public class StateFlushOperation {
    * @return true if the state was flushed, false if not
    */
   public boolean flush(Set recipients, DistributedMember target, int processorType,
-                       boolean flushNewOps) throws InterruptedException {
+      boolean flushNewOps) throws InterruptedException {
 
     Set recips = recipients; // do not use recipients parameter past this point
     if (Thread.interrupted()) {
@@ -355,6 +355,17 @@ public class StateFlushOperation {
     protected void process(DistributionManager dm) {
       logger.trace(LogMarker.STATE_FLUSH_OP, "Processing {}", this);
       if (dm.getDistributionManagerId().equals(relayRecipient)) {
+        // wait for inflight operations to the aeqs even if the recipient is the primary
+        Set<DistributedRegion> regions = getRegions(dm);
+        for (DistributedRegion r : regions) {
+          if (r != null) {
+            if (this.allRegions && r.doesNotDistribute()) {
+              // no need to flush a region that does no distribution
+              continue;
+            }
+            waitForCurrentOperations(r, r.isInitialized());
+          }
+        }
         // no need to send a relay request to this process - just send the
         // ack back to the sender
         StateStabilizedMessage ga = new StateStabilizedMessage();
@@ -374,12 +385,7 @@ public class StateFlushOperation {
         gr.requestingMember = this.getSender();
         gr.processorId = processorId;
         try {
-          Set<DistributedRegion> regions;
-          if (this.allRegions) {
-            regions = getAllRegions(dm);
-          } else {
-            regions = Collections.singleton(this.getRegion(dm));
-          }
+          Set<DistributedRegion> regions = getRegions(dm);
           for (DistributedRegion r : regions) {
             if (r == null) {
               if (logger.isTraceEnabled(LogMarker.DM)) {
@@ -392,18 +398,7 @@ public class StateFlushOperation {
                 continue;
               }
               boolean initialized = r.isInitialized();
-              if (initialized) {
-                if (this.flushNewOps) {
-                  r.getDistributionAdvisor().forceNewMembershipVersion(); // force a new
"view" so
-                  // we can track current
-                  // ops
-                }
-                try {
-                  r.getDistributionAdvisor().waitForCurrentOperations();
-                } catch (RegionDestroyedException e) {
-                  // continue with the next region
-                }
-              }
+              waitForCurrentOperations(r, initialized);
               boolean useMulticast =
                   r.getMulticastEnabled() && r.getSystem().getConfig().getMcastPort()
!= 0;
               if (initialized) {
@@ -455,6 +450,31 @@ public class StateFlushOperation {
       }
     }
 
+    private void waitForCurrentOperations(final DistributedRegion r, final boolean initialized)
{
+      if (initialized) {
+        if (this.flushNewOps) {
+          r.getDistributionAdvisor().forceNewMembershipVersion(); // force a new "view" so
+          // we can track current
+          // ops
+        }
+        try {
+          r.getDistributionAdvisor().waitForCurrentOperations();
+        } catch (RegionDestroyedException e) {
+          // continue with the next region
+        }
+      }
+    }
+
+    private Set<DistributedRegion> getRegions(final DistributionManager dm) {
+      Set<DistributedRegion> regions;
+      if (this.allRegions) {
+        regions = getAllRegions(dm);
+      } else {
+        regions = Collections.singleton(this.getRegion(dm));
+      }
+      return regions;
+    }
+
     @Override
     public void toData(DataOutput dout) throws IOException {
       super.toData(dout);
@@ -780,3 +800,4 @@ public class StateFlushOperation {
     }
   }
 }
+

http://git-wip-us.apache.org/repos/asf/geode/blob/3a1062e2/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
index e80b82d..f06cfdd 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/GIIDeltaDUnitTest.java
@@ -772,7 +772,7 @@ public class GIIDeltaDUnitTest extends JUnit4CacheTestCase {
     waitForToVerifyRVV(P, memberP, 9, null, 8); // P's rvv=p9, gc=8
     waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0
     P.invoke(() -> GIIDeltaDUnitTest.resetSlowGII());
-    
+
     // restart and gii, R's rvv should be the same as P's
     checkIfFullGII(P, REGION_NAME, R_rvv_bytes, true);
     createDistributedRegion(R);


Mime
View raw message