geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esh...@apache.org
Subject [geode] branch feature/GEODE-4083 updated: GEODE-4083: fix infinite loop caused by thread race changing version
Date Wed, 13 Dec 2017 18:29:27 GMT
This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-4083
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-4083 by this push:
     new 32c11e0  GEODE-4083: fix infinite loop caused by thread race changing version
32c11e0 is described below

commit 32c11e097900ca7e23108deecfea626dfa187bbd
Author: eshu <eshu@pivotal.io>
AuthorDate: Wed Dec 13 10:28:41 2017 -0800

    GEODE-4083: fix infinite loop caused by thread race changing version
---
 .../cache/versions/RegionVersionVector.java        |  90 +++++++++-------
 ...JUnitTest.java => RegionVersionVectorTest.java} | 118 +++++++++++++++++++--
 2 files changed, 163 insertions(+), 45 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
index aaa40a9..dd2a106 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
@@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelCriterion;
+import org.apache.geode.annotations.TestingOnly;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.DM;
 import org.apache.geode.distributed.internal.DistributionConfig;
@@ -144,22 +145,56 @@ public abstract class RegionVersionVector<T extends VersionSource<?>>
   private final transient Object clearLockSync = new Object(); // sync for coordinating thread
                                                                // startup and lockOwner setting
 
-  /** create a live version vector for a region */
+  /**
+   * constructor used to create a cloned vector
+   */
+  protected RegionVersionVector(T ownerId, ConcurrentHashMap<T, RegionVersionHolder<T>>
vector,
+      long version, ConcurrentHashMap<T, Long> gcVersions, long gcVersion, boolean
singleMember,
+      RegionVersionHolder<T> localExceptions) {
+    this.myId = ownerId;
+    this.memberToVersion = vector;
+    this.memberToGCVersion = gcVersions;
+    this.localGCVersion.set(gcVersion);
+    this.localVersion.set(version);
+    this.singleMember = singleMember;
+    this.localExceptions = localExceptions;
+  }
+
+  /**
+   * deserialize a cloned vector
+   */
+  public RegionVersionVector() {
+    this.memberToVersion = new ConcurrentHashMap<T, RegionVersionHolder<T>>(INITIAL_CAPACITY,
+        LOAD_FACTOR, CONCURRENCY_LEVEL);
+    this.memberToGCVersion =
+        new ConcurrentHashMap<T, Long>(INITIAL_CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
+  }
+
+  /**
+   * create a live version vector for a region
+   */
   public RegionVersionVector(T ownerId) {
     this(ownerId, null);
   }
 
-  /** create a live version vector for a region */
+  /**
+   * create a live version vector for a region
+   */
   public RegionVersionVector(T ownerId, LocalRegion owner) {
+    this(ownerId, owner, 0);
+  }
+
+  @TestingOnly
+  RegionVersionVector(T ownerId, LocalRegion owner, long version) {
     this.myId = ownerId;
     this.isLiveVector = true;
     this.region = owner;
-
     this.localExceptions = new RegionVersionHolder<T>(0);
-    this.memberToVersion = new ConcurrentHashMap<T, RegionVersionHolder<T>>(INITIAL_CAPACITY,
-        LOAD_FACTOR, CONCURRENCY_LEVEL);
+    this.memberToVersion =
+        new ConcurrentHashMap<>(INITIAL_CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
     this.memberToGCVersion =
-        new ConcurrentHashMap<T, Long>(INITIAL_CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
+        new ConcurrentHashMap<>(INITIAL_CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
+    this.localVersion.set(version);
   }
 
   /**
@@ -574,17 +609,20 @@ public abstract class RegionVersionVector<T extends VersionSource<?>>
     }
   }
 
-  private void updateLocalVersion(long version) {
-    boolean repeat = false;
+  void updateLocalVersion(long newVersion) {
+    boolean needToUpdate;
     do {
-      long myVersion = this.localVersion.get();
-      if (myVersion < version) {
-        repeat = !this.localVersion.compareAndSet(myVersion, version);
+      needToUpdate = false;
+      long currentVersion = this.localVersion.get();
+      if (currentVersion < newVersion) {
+        needToUpdate = !compareAndSetVersion(currentVersion, newVersion);
       }
-    } while (repeat);
+    } while (needToUpdate);
   }
 
-
+  boolean compareAndSetVersion(long currentVersion, long newVersion) {
+    return this.localVersion.compareAndSet(currentVersion, newVersion);
+  }
 
   /**
    * Records a received region-version. These are transmitted in VersionTags in messages
between
@@ -1093,32 +1131,6 @@ public abstract class RegionVersionVector<T extends VersionSource<?>>
   }
 
   /**
-   * constructor used to create a cloned vector
-   *
-   * @param localExceptions
-   */
-  protected RegionVersionVector(T ownerId, ConcurrentHashMap<T, RegionVersionHolder<T>>
vector,
-      long version, ConcurrentHashMap<T, Long> gcVersions, long gcVersion, boolean
singleMember,
-      RegionVersionHolder<T> localExceptions) {
-    this.myId = ownerId;
-    this.memberToVersion = vector;
-    this.memberToGCVersion = gcVersions;
-    this.localGCVersion.set(gcVersion);
-    this.localVersion.set(version);
-    this.singleMember = singleMember;
-    this.localExceptions = localExceptions;
-  }
-
-
-  /** deserialize a cloned vector */
-  public RegionVersionVector() {
-    this.memberToVersion = new ConcurrentHashMap<T, RegionVersionHolder<T>>(INITIAL_CAPACITY,
-        LOAD_FACTOR, CONCURRENCY_LEVEL);
-    this.memberToGCVersion =
-        new ConcurrentHashMap<T, Long>(INITIAL_CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
-  }
-
-  /**
    * after deserializing a version tag or RVV the IDs in it should be replaced with references
to
    * IDs returned by this method. This vastly reduces the memory footprint of tags/stamps/rvvs
    *
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
similarity index 86%
rename from geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorJUnitTest.java
rename to geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
index b124fc7..d914410 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.internal.cache.versions;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.*;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
@@ -28,6 +30,10 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -46,7 +52,9 @@ import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
-public class RegionVersionVectorJUnitTest {
+public class RegionVersionVectorTest {
+
+  private ExecutorService executor = Executors.newSingleThreadExecutor();
 
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
@@ -149,7 +157,6 @@ public class RegionVersionVectorJUnitTest {
     assertTrue(rv1.getExceptionCount(server2) == 0);
     assertTrue(rv1.contains(server2, 8));
 
-
     // Test RVV comparisons for GII Delta
     rv1 = new VMRegionVersionVector(server1);
     rv1.recordVersion(server2, 1);
@@ -255,8 +262,6 @@ public class RegionVersionVectorJUnitTest {
     }
     assertFalse(rv1.contains(server2, boundary + 1));
 
-    RegionVersionVector.DEBUG = true;
-
     rv1.recordVersion(server2, bitSetRollPoint);
     rv1.recordVersion(server2, bitSetRollPoint + 1); // bitSet should be rolled at this point
     RegionVersionHolder h = (RegionVersionHolder) rv1.getMemberToVersion().get(server2);
@@ -275,7 +280,7 @@ public class RegionVersionVectorJUnitTest {
     // now test the merge
     System.out.println("testing merge for " + rv1.fullToString());
     assertEquals(1, rv1.getExceptionCount(server2)); // one exception from boundary-1 to
-                                                     // bitSetRollPoint
+    // bitSetRollPoint
     assertFalse(rv1.contains(server2, bitSetRollPoint - 1));
     assertTrue(rv1.contains(server2, bitSetRollPoint));
     assertTrue(rv1.contains(server2, bitSetRollPoint + 1));
@@ -567,7 +572,56 @@ public class RegionVersionVectorJUnitTest {
     rvv.recordVersion(ownerId, tag);
   }
 
-  public RegionVersionVector createRegionVersionVector(InternalDistributedMember ownerId,
+  @Test
+  public void usesNewVersionIfGreaterThanOldVersion() throws Exception {
+    VersionSource<InternalDistributedMember> ownerId = mock(VersionSource.class);
+    long oldVersion = 1;
+    long newVersion = 2;
+
+    RegionVersionVector rvv = new TestableRegionVersionVector(ownerId, oldVersion);
+    rvv.updateLocalVersion(newVersion);
+    assertThat(rvv.getVersionForMember(ownerId)).isEqualTo(newVersion);
+  }
+
+  @Test
+  public void usesOldVersionIfGreaterThanNewVersion() throws Exception {
+    VersionSource<InternalDistributedMember> ownerId = mock(VersionSource.class);
+    long oldVersion = 2;
+    long newVersion = 1;
+
+    RegionVersionVector rvv = new TestableRegionVersionVector(ownerId, oldVersion);
+    rvv.updateLocalVersion(newVersion);
+    assertThat(rvv.getVersionForMember(ownerId)).isEqualTo(oldVersion);
+  }
+
+  @Test
+  public void doesNothingIfVersionsAreSame() throws Exception {
+    VersionSource<InternalDistributedMember> ownerId = mock(VersionSource.class);
+    long oldVersion = 2;
+    long sameVersion = 2;
+
+    RegionVersionVector rvv = new TestableRegionVersionVector(ownerId, oldVersion);
+    rvv.updateLocalVersion(sameVersion);
+    assertThat(rvv.getVersionForMember(ownerId)).isEqualTo(oldVersion);
+  }
+
+  @Test
+  public void doesNotHangIfOtherThreadChangedVersion() throws Exception {
+    VersionSource<InternalDistributedMember> ownerId = mock(VersionSource.class);
+    long oldVersion = 1;
+    long newVersion = 2;
+
+    RegionVersionVector rvv = new VersionRaceConditionRegionVersionVector(ownerId, oldVersion);
+    Future<Boolean> future = executor.submit(() -> {
+      rvv.updateLocalVersion(newVersion);
+      return true;
+    });
+
+    assertThat(future.get(2, SECONDS)).isTrue();
+    assertThat(rvv.getVersionForMember(ownerId)).isEqualTo(newVersion);
+  }
+
+  private RegionVersionVector createRegionVersionVector(InternalDistributedMember ownerId,
       LocalRegion owner) {
     @SuppressWarnings({"unchecked", "rawtypes"})
     RegionVersionVector rvv = new RegionVersionVector(ownerId, owner) {
@@ -615,4 +669,56 @@ public class RegionVersionVectorJUnitTest {
     assertEquals(0, rvv.getExceptionCount(id));
   }
 
+  private class TestableRegionVersionVector
+      extends RegionVersionVector<VersionSource<InternalDistributedMember>> {
+
+    TestableRegionVersionVector(VersionSource<InternalDistributedMember> ownerId, long
version) {
+      super(ownerId, null, version);
+    }
+
+    @Override
+    protected RegionVersionVector createCopy(VersionSource ownerId, ConcurrentHashMap vector,
+        long version, ConcurrentHashMap gcVersions, long gcVersion, boolean singleMember,
+        RegionVersionHolder clonedLocalHolder) {
+      return null;
+    }
+
+    @Override
+    protected VersionSource<InternalDistributedMember> readMember(DataInput in)
+        throws IOException, ClassNotFoundException {
+      return null;
+    }
+
+    @Override
+    protected void writeMember(VersionSource member, DataOutput out) throws IOException {
+
+    }
+
+    @Override
+    public int getDSFID() {
+      return 0;
+    }
+  }
+
+  private class VersionRaceConditionRegionVersionVector extends TestableRegionVersionVector
{
+
+    private boolean firstTime = true;
+
+    VersionRaceConditionRegionVersionVector(VersionSource<InternalDistributedMember>
ownerId,
+        long version) {
+      super(ownerId, version);
+    }
+
+    @Override
+    boolean compareAndSetVersion(long currentVersion, long newVersion) {
+      if (firstTime) {
+        firstTime = false;
+        super.compareAndSetVersion(currentVersion, newVersion);
+        return false;
+      } else {
+        return super.compareAndSetVersion(currentVersion, newVersion);
+      }
+    }
+
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <commits@geode.apache.org>'].

Mime
View raw message