geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject incubator-geode git commit: GEODE-1691: protect EntryEvent off-heap readers
Date Wed, 17 Aug 2016 17:07:19 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/develop e08c1f544 -> b2b5fca70


GEODE-1691: protect EntryEvent off-heap readers

The methods on EntryEvent that can read off-heap
data from the event are now protected from a concurrent
release of the off-heap data.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b2b5fca7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b2b5fca7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b2b5fca7

Branch: refs/heads/develop
Commit: b2b5fca70f47f47e58c844214a01b6665919e265
Parents: e08c1f5
Author: Darrel Schneider <dschneider@pivotal.io>
Authored: Fri Aug 12 17:29:49 2016 -0700
Committer: Darrel Schneider <dschneider@pivotal.io>
Committed: Wed Aug 17 10:02:15 2016 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/EntryEventImpl.java  | 170 +++++++----
 .../internal/cache/EntryEventImplTest.java      | 301 ++++++++++++++++++-
 2 files changed, 409 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b2b5fca7/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
index f559e2a..bd7596f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
@@ -55,6 +55,7 @@ import com.gemstone.gemfire.pdx.internal.PeerTypeRegistration;
 import org.apache.logging.log4j.Logger;
 
 import java.io.*;
+import java.util.function.Function;
 
 import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE;
 import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE;
@@ -740,12 +741,13 @@ public class EntryEventImpl
       boolean doCopyOnRead = getRegion().isCopyOnRead();
       if (ov != null) {
         if (ov instanceof CachedDeserializable) {
-          CachedDeserializable cd = (CachedDeserializable)ov;
-          if (doCopyOnRead) {
-            return cd.getDeserializedWritableCopy(this.region, this.re);
-          } else {
-            return cd.getDeserializedValue(this.region, this.re);
-          }
+          return callWithOffHeapLock((CachedDeserializable)ov, oldValueCD -> {
+            if (doCopyOnRead) {
+              return oldValueCD.getDeserializedWritableCopy(this.region, this.re);
+            } else {
+              return oldValueCD.getDeserializedValue(this.region, this.re);
+            }
+          });
         }
         else {
           if (doCopyOnRead) {
@@ -955,15 +957,16 @@ public class EntryEventImpl
         return AbstractRegion.handleNotAvailable(nv);
       }
       if (nv instanceof CachedDeserializable) {
-        CachedDeserializable cd = (CachedDeserializable)nv;
-        Object v = null;
-        if (doCopyOnRead) {
-          v = cd.getDeserializedWritableCopy(this.region, this.re);
-        } else {
-          v = cd.getDeserializedValue(this.region, this.re);
-        }
-        assert !(v instanceof CachedDeserializable) : "for key "+this.getKey()+" found nested
CachedDeserializable";
-        return v;
+        return callWithOffHeapLock((CachedDeserializable)nv, newValueCD -> {
+          Object v = null;
+          if (doCopyOnRead) {
+            v = newValueCD.getDeserializedWritableCopy(this.region, this.re);
+          } else {
+            v = newValueCD.getDeserializedValue(this.region, this.re);
+          }
+          assert !(v instanceof CachedDeserializable) : "for key "+this.getKey()+" found
nested CachedDeserializable";
+          return v;
+        });
       }
       else {
         if (doCopyOnRead) {
@@ -975,6 +978,25 @@ public class EntryEventImpl
     }
     return null;
   }
+  
+  /**
+   * Invoke the given function with a lock if the given value is offheap.
+   * @return the value returned from invoking the function
+   */
+  private <T,R> R callWithOffHeapLock(T value, Function<T, R> function) {
+    if (isOffHeapReference(value)) {
+      synchronized (this.offHeapLock) {
+        if (!this.offHeapOk) {
+          throw new IllegalStateException("Attempt to access off heap value after the EntryEvent
was released.");
+        }
+        return function.apply(value);
+      }
+    } else {
+      return function.apply(value);
+    }
+  }
+  
+  private final Object offHeapLock = new Object();
 
   public final String getNewValueStringForm() {
     return StringUtils.forceToString(basicGetNewValue());
@@ -2001,13 +2023,17 @@ public class EntryEventImpl
     buf.append(this.getKey());
     buf.append(";oldValue=");
     try {
-      ArrayUtils.objectStringNonRecursive(basicGetOldValue(), buf);
+      synchronized (this.offHeapLock) {
+        ArrayUtils.objectStringNonRecursive(basicGetOldValue(), buf);
+      }
     } catch (IllegalStateException ex) {
       buf.append("OFFHEAP_VALUE_FREED");
     }
     buf.append(";newValue=");
     try {
-      ArrayUtils.objectStringNonRecursive(basicGetNewValue(), buf);
+      synchronized (this.offHeapLock) {
+        ArrayUtils.objectStringNonRecursive(basicGetNewValue(), buf);
+      }
     } catch (IllegalStateException ex) {
       buf.append("OFFHEAP_VALUE_FREED");
     }
@@ -2542,11 +2568,14 @@ public class EntryEventImpl
       this.serializedValue = serializedBytes;
     }
 
+    @Override
     public byte[] getSerializedValue() {
       if(this.serializedValue != null){
         return this.serializedValue;
       }
-      return getCd().getSerializedValue();
+      return callWithOffHeapLock(cd -> {
+        return cd.getSerializedValue();
+      });
     }
     
     private CachedDeserializable getCd() {
@@ -2555,19 +2584,37 @@ public class EntryEventImpl
       }
       return this.cd;
     }
+    /**
+     * The only methods that need to use this method are those on the external SerializedCacheValue
interface
+     * and any other method that a customer could call that may access the off-heap values.
+     * For example if toString was implemented on this class to access the value then it
would
+     * need to use this method.
+     */
+    private <R> R callWithOffHeapLock(Function<CachedDeserializable, R> function)
{
+      if (this.event != null) {
+        // this call does not use getCd() to access this.cd
+        // because the check for offHeapOk is done by event.callWithOffHeapLock
+        return this.event.callWithOffHeapLock(this.cd, function);
+      } else {
+        return function.apply(getCd());
+      }
+    }
     
+    @Override
     public Object getDeserializedValue() {
       return getDeserializedValue(this.r, this.re);
     }
     public Object getDeserializedForReading() {
-      return OffHeapHelper.getHeapForm(getCd().getDeserializedForReading());
+      return getCd().getDeserializedForReading();
     }
     public Object getDeserializedWritableCopy(Region rgn, RegionEntry entry) {
-      return OffHeapHelper.getHeapForm(getCd().getDeserializedWritableCopy(rgn, entry));
+      return getCd().getDeserializedWritableCopy(rgn, entry);
     }
 
     public Object getDeserializedValue(Region rgn, RegionEntry reentry) {
-      return OffHeapHelper.getHeapForm(getCd().getDeserializedValue(rgn, reentry));
+      return callWithOffHeapLock(cd -> {
+        return cd.getDeserializedValue(rgn, reentry);
+      });
     }
     public Object getValue() {
       if(this.serializedValue != null){
@@ -2694,31 +2741,38 @@ public class EntryEventImpl
    * True if it is ok to use old/new values that are stored off heap.
    * False if an exception should be thrown if an attempt is made to access old/new offheap
values.
    */
-  private transient boolean offHeapOk = true;
+  transient boolean offHeapOk = true;
  
   @Override
   @Released({ENTRY_EVENT_NEW_VALUE, ENTRY_EVENT_OLD_VALUE})
   public void release() {
     // noop if already freed or values can not be off-heap
     if (!this.offHeapOk) return;
-    // Note that this method does not set the old/new values to null but
-    // leaves them set to the off-heap value so that future calls to getOld/NewValue
-    // will fail with an exception.
-    Object ov = basicGetOldValue();
-    Object nv = basicGetNewValue();
-    this.offHeapOk = false;
-    
-    if (ov instanceof StoredObject) {
-      //this.region.getCache().getLogger().info("DEBUG freeing ref to old value on " + System.identityHashCode(ov));
-      if (ReferenceCountHelper.trackReferenceCounts()) {
-        ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner());
-        ((StoredObject) ov).release();
-        ReferenceCountHelper.setReferenceCountOwner(null);
-      } else {
-        ((StoredObject) ov).release();
+    synchronized (this.offHeapLock) {
+      // Note that this method does not set the old/new values to null but
+      // leaves them set to the off-heap value so that future calls to getOld/NewValue
+      // will fail with an exception.
+      testHookReleaseInProgress();
+      Object ov = basicGetOldValue();
+      Object nv = basicGetNewValue();
+      this.offHeapOk = false;
+
+      if (ov instanceof StoredObject) {
+        //this.region.getCache().getLogger().info("DEBUG freeing ref to old value on " +
System.identityHashCode(ov));
+        if (ReferenceCountHelper.trackReferenceCounts()) {
+          ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner());
+          ((StoredObject) ov).release();
+          ReferenceCountHelper.setReferenceCountOwner(null);
+        } else {
+          ((StoredObject) ov).release();
+        }
       }
+      OffHeapHelper.releaseAndTrackOwner(nv, this);
     }
-    OffHeapHelper.releaseAndTrackOwner(nv, this);
+  }
+  
+  void testHookReleaseInProgress() {
+    // unit test can mock or override this method
   }
 
   /**
@@ -2729,7 +2783,9 @@ public class EntryEventImpl
     if (isOffHeapReference(this.newValue) || isOffHeapReference(this.oldValue)) {
       throw new IllegalStateException("This event does not support off-heap values");
     }
-    this.offHeapOk = false;
+    synchronized (this.offHeapLock) {
+      this.offHeapOk = false;
+    }
   }
   
   /**
@@ -2738,26 +2794,28 @@ public class EntryEventImpl
    */
   @Released({ENTRY_EVENT_NEW_VALUE, ENTRY_EVENT_OLD_VALUE})
   public void copyOffHeapToHeap() {
-    Object ov = basicGetOldValue();
-    if (isOffHeapReference(ov)) {
-      if (ReferenceCountHelper.trackReferenceCounts()) {
-        ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner());
-        this.oldValue = OffHeapHelper.copyAndReleaseIfNeeded(ov);
+    synchronized (this.offHeapLock) {
+      Object ov = basicGetOldValue();
+      if (isOffHeapReference(ov)) {
+        if (ReferenceCountHelper.trackReferenceCounts()) {
+          ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner());
+          this.oldValue = OffHeapHelper.copyAndReleaseIfNeeded(ov);
+          ReferenceCountHelper.setReferenceCountOwner(null);
+        } else {
+          this.oldValue = OffHeapHelper.copyAndReleaseIfNeeded(ov);
+        }
+      }
+      Object nv = basicGetNewValue();
+      if (isOffHeapReference(nv)) {
+        ReferenceCountHelper.setReferenceCountOwner(this);
+        this.newValue = OffHeapHelper.copyAndReleaseIfNeeded(nv);
         ReferenceCountHelper.setReferenceCountOwner(null);
-      } else {
-        this.oldValue = OffHeapHelper.copyAndReleaseIfNeeded(ov);
       }
+      if (isOffHeapReference(this.newValue) || isOffHeapReference(this.oldValue)) {
+        throw new IllegalStateException("event's old/new value still off-heap after calling
copyOffHeapToHeap");
+      }
+      this.offHeapOk = false;
     }
-    Object nv = basicGetNewValue();
-    if (isOffHeapReference(nv)) {
-      ReferenceCountHelper.setReferenceCountOwner(this);
-      this.newValue = OffHeapHelper.copyAndReleaseIfNeeded(nv);
-      ReferenceCountHelper.setReferenceCountOwner(null);
-    }
-    if (isOffHeapReference(this.newValue) || isOffHeapReference(this.oldValue)) {
-      throw new IllegalStateException("event's old/new value still off-heap after calling
copyOffHeapToHeap");
-    }
-    this.offHeapOk = false;
   }
 
   public boolean isOldValueOffHeap() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b2b5fca7/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/EntryEventImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/EntryEventImplTest.java
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/EntryEventImplTest.java
index 265795f..bc8a64e 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/EntryEventImplTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/EntryEventImplTest.java
@@ -19,11 +19,16 @@ package com.gemstone.gemfire.internal.cache;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.jayway.awaitility.Awaitility;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.SerializedCacheValue;
 import com.gemstone.gemfire.internal.cache.EntryEventImpl.NewValueImporter;
 import com.gemstone.gemfire.internal.cache.EntryEventImpl.OldValueImporter;
 import com.gemstone.gemfire.internal.offheap.StoredObject;
@@ -351,7 +356,7 @@ public class EntryEventImplTest {
   }
 
   @Test
-  public void verifyExportOewValueWithSerializedStoredObjectAndUnretainedOldReferenceOk()
{
+  public void verifyExportOldValueWithSerializedStoredObjectAndUnretainedOldReferenceOk()
{
     LocalRegion region = mock(LocalRegion.class);
     StoredObject oldValue = mock(StoredObject.class);
     when(oldValue.isSerialized()).thenReturn(true);
@@ -367,16 +372,300 @@ public class EntryEventImplTest {
     verify(ovImporter).importOldObject(oldValue, true);
   }
 
-  private EntryEventImpl createEntryEvent(LocalRegion l, Object newValue) {
-    // create a dummy event id
-    byte[] memId = { 1,2,3 };
-    EventID eventId = new EventID(memId, 11, 12, 13);
+  @Test
+  public void verifyExternalReadMethodsBlockedByRelease() throws InterruptedException {
+    LocalRegion region = mock(LocalRegion.class);
+    StoredObject newValue = mock(StoredObject.class);
+    when(newValue.hasRefCount()).thenReturn(true);
+    when(newValue.isSerialized()).thenReturn(true);
+    when(newValue.retain()).thenReturn(true);
+    when(newValue.getDeserializedValue(any(), any())).thenReturn("newValue");
+    final byte[] serializedNewValue = new byte[]{(byte)'n', (byte)'e', (byte)'w'};
+    when(newValue.getSerializedValue()).thenReturn(serializedNewValue);
+    StoredObject oldValue = mock(StoredObject.class);
+    when(oldValue.hasRefCount()).thenReturn(true);
+    when(oldValue.isSerialized()).thenReturn(true);
+    when(oldValue.retain()).thenReturn(true);
+    when(oldValue.getDeserializedValue(any(), any())).thenReturn("oldValue");
+    final byte[] serializedOldValue = new byte[]{(byte)'o', (byte)'l', (byte)'d'};
+    when(oldValue.getSerializedValue()).thenReturn(serializedOldValue);
+    final CountDownLatch releaseCountDown = new CountDownLatch(1);
+    final TestableEntryEventImpl e = new TestableEntryEventImpl(region, key, newValue, releaseCountDown);
+    e.setOldValue(oldValue);
+    assertEquals("newValue", e.getNewValue());
+    assertEquals("oldValue", e.getOldValue());
+    final SerializedCacheValue<?> serializableNewValue = e.getSerializedNewValue();
+    assertEquals(serializedNewValue, serializableNewValue.getSerializedValue());
+    assertEquals("newValue", serializableNewValue.getDeserializedValue());
+    final SerializedCacheValue<?> serializableOldValue = e.getSerializedOldValue();
+    assertEquals(serializedOldValue, serializableOldValue.getSerializedValue());
+    assertEquals("oldValue", serializableOldValue.getDeserializedValue());
+    Thread doRelease = new Thread(() -> {e.release();});
+    doRelease.start(); // release thread will be stuck until releaseCountDown changes
+    Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).pollDelay(1, TimeUnit.MILLISECONDS).timeout(15,
TimeUnit.SECONDS)
+    .until(() -> assertEquals(true, e.isWaitingOnRelease()));
+    assertEquals(true, e.offHeapOk);
+    assertEquals(true, doRelease.isAlive());
+    
+    // Now start a getNewValue. It should block on the release.
+    Thread doGetNewValue = new Thread(() -> {e.getAndCacheNewValue();});
+    doGetNewValue.start();
+    // Now start a getOldValue. It should block on the release.
+    Thread doGetOldValue = new Thread(() -> {e.getAndCacheOldValue();});
+    doGetOldValue.start();
+    // Now start a getSerializedValue on serializableNewValue. It should block on the release.
+    Thread doSNVgetSerializedValue = new Thread(() -> {e.getAndCacheSerializedNew(serializableNewValue);});
+    doSNVgetSerializedValue.start();
+    // Now start a getDeserializedValue on serializableNewValue. It should block on the release.
+    Thread doSNVgetDeserializedValue = new Thread(() -> {e.getAndCachDeserializedNew(serializableNewValue);});
+    doSNVgetDeserializedValue.start();
+    // Now start a getSerializedValue on serializableOldValue. It should block on the release.
+    Thread doSOVgetSerializedValue = new Thread(() -> {e.getAndCacheSerializedOld(serializableOldValue);});
+    doSOVgetSerializedValue.start();
+    // Now start a getDeserializedValue on serializableOldValue. It should block on the release.
+    Thread doSOVgetDeserializedValue = new Thread(() -> {e.getAndCachDeserializedOld(serializableOldValue);});
+    doSOVgetDeserializedValue.start();
+    
+    Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).pollDelay(1, TimeUnit.MILLISECONDS).timeout(15,
TimeUnit.SECONDS)
+    .until(() -> assertEquals(true, e.isAboutToCallGetNewValue() && e.isAboutToCallGetOldValue()
+        && e.isAboutToCallSerializedNew() && e.isAboutToCallDeserializedNew()
+        && e.isAboutToCallSerializedOld() && e.isAboutToCallDeserializedOld()
+        ));
+    // all the threads should now be hung waiting on release; so just wait for a little bit
for it to improperly finish
+    doGetNewValue.join(50);
+    if (e.hasFinishedCallOfGetNewValue()) {
+      fail("expected doGetNewValue thread to be hung. It completed with " + e.getCachedNewValue());
+    }
+    if (e.hasFinishedCallOfGetOldValue()) {
+      fail("expected doGetOldValue thread to be hung. It completed with " + e.getCachedOldValue());
+    }
+    if (e.hasFinishedCallOfSerializedNew()) {
+      fail("expected doSNVgetSerializedValue thread to be hung. It completed with " + e.getTestCachedSerializedNew());
+    }
+    if (e.hasFinishedCallOfDeserializedNew()) {
+      fail("expected doSNVgetDeserializedValue thread to be hung. It completed with " + e.getCachedDeserializedNew());
+    }
+    if (e.hasFinishedCallOfSerializedOld()) {
+      fail("expected doSOVgetSerializedValue thread to be hung. It completed with " + e.getCachedSerializedOld());
+    }
+    if (e.hasFinishedCallOfDeserializedOld()) {
+      fail("expected doSOVgetDeserializedValue thread to be hung. It completed with " + e.getCachedDeserializedOld());
+    }
+   // now signal the release to go ahead
+    releaseCountDown.countDown();
+    doRelease.join();
+    assertEquals(false, e.offHeapOk);
+    // which should allow getNewValue to complete
+    Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).pollDelay(1, TimeUnit.MILLISECONDS).timeout(15,
TimeUnit.SECONDS)
+    .until(() -> assertEquals(true, e.hasFinishedCallOfGetNewValue()));
+    doGetNewValue.join();
+    if (!(e.getCachedNewValue() instanceof IllegalStateException)) {
+      // since the release happened before getNewValue we expect it to get an exception
+      fail("unexpected success of getNewValue. It returned " + e.getCachedNewValue());
+    }
+    // which should allow getOldValue to complete
+    Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).pollDelay(1, TimeUnit.MILLISECONDS).timeout(15,
TimeUnit.SECONDS)
+    .until(() -> assertEquals(true, e.hasFinishedCallOfGetOldValue()));
+    doGetOldValue.join();
+    if (!(e.getCachedOldValue() instanceof IllegalStateException)) {
+      fail("unexpected success of getOldValue. It returned " + e.getCachedOldValue());
+    }
+    // which should allow doSNVgetSerializedValue to complete
+    Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).pollDelay(1, TimeUnit.MILLISECONDS).timeout(15,
TimeUnit.SECONDS)
+    .until(() -> assertEquals(true, e.hasFinishedCallOfSerializedNew()));
+    doSNVgetSerializedValue.join();
+    if (!(e.getTestCachedSerializedNew() instanceof IllegalStateException)) {
+      fail("unexpected success of new getSerializedValue. It returned " + e.getTestCachedSerializedNew());
+    }
+    // which should allow doSNVgetDeserializedValue to complete
+    Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).pollDelay(1, TimeUnit.MILLISECONDS).timeout(15,
TimeUnit.SECONDS)
+    .until(() -> assertEquals(true, e.hasFinishedCallOfDeserializedNew()));
+    doSNVgetDeserializedValue.join();
+    if (!(e.getCachedDeserializedNew() instanceof IllegalStateException)) {
+      fail("unexpected success of new getDeserializedValue. It returned " + e.getCachedDeserializedNew());
+    }
+    // which should allow doSOVgetSerializedValue to complete
+    Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).pollDelay(1, TimeUnit.MILLISECONDS).timeout(15,
TimeUnit.SECONDS)
+    .until(() -> assertEquals(true, e.hasFinishedCallOfSerializedOld()));
+    doSOVgetSerializedValue.join();
+    if (!(e.getCachedSerializedOld() instanceof IllegalStateException)) {
+      fail("unexpected success of old getSerializedValue. It returned " + e.getCachedSerializedOld());
+    }
+    // which should allow doSOVgetDeserializedValue to complete
+    Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).pollDelay(1, TimeUnit.MILLISECONDS).timeout(15,
TimeUnit.SECONDS)
+    .until(() -> assertEquals(true, e.hasFinishedCallOfDeserializedOld()));
+    doSOVgetDeserializedValue.join();
+    if (!(e.getCachedDeserializedOld() instanceof IllegalStateException)) {
+      fail("unexpected success of old getDeserializedValue. It returned " + e.getCachedDeserializedOld());
+    }
+  }
 
+  private static class TestableEntryEventImpl extends EntryEventImpl {
+    private final CountDownLatch releaseCountDown;
+    private volatile boolean waitingOnRelease = false;
+    private volatile boolean aboutToCallGetNewValue = false;
+    private volatile boolean finishedCallOfGetNewValue = false;
+    private volatile Object cachedNewValue = null;
+    private volatile boolean aboutToCallGetOldValue = false;
+    private volatile boolean finishedCallOfGetOldValue = false;
+    private volatile Object cachedOldValue = null;
+    private volatile boolean aboutToCallSerializedNew = false;
+    private volatile Object testCachedSerializedNew = null;
+    private volatile boolean finishedCallOfSerializedNew = false;
+    private volatile boolean aboutToCallDeserializedNew = false;
+    private volatile Object cachedDeserializedNew = null;
+    private volatile boolean finishedCallOfDeserializedNew = false;
+    private volatile boolean aboutToCallSerializedOld = false;
+    private volatile Object cachedSerializedOld = null;
+    private volatile boolean finishedCallOfSerializedOld = false;
+    private volatile boolean aboutToCallDeserializedOld = false;
+    private volatile Object cachedDeserializedOld = null;
+    private volatile boolean finishedCallOfDeserializedOld = false;
+    
+    public TestableEntryEventImpl(LocalRegion region, Object key,
+        Object newValue, CountDownLatch releaseCountDown) {
+      super(region, Operation.CREATE, key, newValue, null, false, null, false, createEventID());
+      callbacksInvoked(true);
+      this.releaseCountDown = releaseCountDown;
+    }
+    public Object getCachedDeserializedOld() {
+      return this.cachedDeserializedOld;
+    }
+    public boolean hasFinishedCallOfDeserializedOld() {
+      return this.finishedCallOfDeserializedOld;
+    }
+    public Object getCachedSerializedOld() {
+      return this.cachedSerializedOld;
+    }
+    public boolean hasFinishedCallOfSerializedOld() {
+      return this.finishedCallOfSerializedOld;
+    }
+    public Object getCachedDeserializedNew() {
+      return this.cachedDeserializedNew;
+    }
+    public Object getTestCachedSerializedNew() {
+      return this.testCachedSerializedNew;
+    }
+    public boolean hasFinishedCallOfDeserializedNew() {
+      return this.finishedCallOfDeserializedNew;
+    }
+    public boolean hasFinishedCallOfSerializedNew() {
+      return this.finishedCallOfSerializedNew;
+    }
+    public boolean isAboutToCallDeserializedOld() {
+      return this.aboutToCallDeserializedOld;
+    }
+    public boolean isAboutToCallSerializedOld() {
+      return this.aboutToCallSerializedOld;
+    }
+    public boolean isAboutToCallDeserializedNew() {
+      return this.aboutToCallDeserializedNew;
+    }
+    public boolean isAboutToCallSerializedNew() {
+      return this.aboutToCallSerializedNew;
+    }
+    public void getAndCachDeserializedOld(SerializedCacheValue<?> serializableOldValue)
{
+      try {
+        this.aboutToCallDeserializedOld = true;
+        this.cachedDeserializedOld = serializableOldValue.getDeserializedValue();
+      } catch (IllegalStateException ex) {
+        this.cachedDeserializedOld = ex;
+      } finally {
+        this.finishedCallOfDeserializedOld = true;
+      }
+    }
+    public void getAndCacheSerializedOld(SerializedCacheValue<?> serializableOldValue)
{
+      try {
+        this.aboutToCallSerializedOld = true;
+        this.cachedSerializedOld = serializableOldValue.getSerializedValue();
+      } catch (IllegalStateException ex) {
+        this.cachedSerializedOld = ex;
+      } finally {
+        this.finishedCallOfSerializedOld = true;
+      }
+    }
+    public void getAndCachDeserializedNew(SerializedCacheValue<?> serializableNewValue)
{
+      try {
+        this.aboutToCallDeserializedNew = true;
+        this.cachedDeserializedNew = serializableNewValue.getDeserializedValue();
+      } catch (IllegalStateException ex) {
+        this.cachedDeserializedNew = ex;
+      } finally {
+        this.finishedCallOfDeserializedNew = true;
+      }
+    }
+    public void getAndCacheSerializedNew(SerializedCacheValue<?> serializableNewValue)
{
+      try {
+        this.aboutToCallSerializedNew = true;
+        this.testCachedSerializedNew = serializableNewValue.getSerializedValue();
+      } catch (IllegalStateException ex) {
+        this.testCachedSerializedNew = ex;
+      } finally {
+        this.finishedCallOfSerializedNew = true;
+      }
+    }
+    public Object getCachedNewValue() {
+      return this.cachedNewValue;
+    }
+    public void getAndCacheNewValue() {
+      try {
+        this.aboutToCallGetNewValue = true;
+        this.cachedNewValue = getNewValue();
+      } catch (IllegalStateException ex) {
+        this.cachedNewValue = ex;
+      } finally {
+        this.finishedCallOfGetNewValue = true;
+      }
+    }
+    public Object getCachedOldValue() {
+      return this.cachedOldValue;
+    }
+    public void getAndCacheOldValue() {
+      try {
+        this.aboutToCallGetOldValue = true;
+        this.cachedOldValue = getOldValue();
+      } catch (IllegalStateException ex) {
+        this.cachedOldValue = ex;
+      } finally {
+        this.finishedCallOfGetOldValue = true;
+      }
+    }
+    public boolean isWaitingOnRelease() {
+      return this.waitingOnRelease;
+    }
+    public boolean isAboutToCallGetNewValue() {
+      return this.aboutToCallGetNewValue;
+    }
+    public boolean hasFinishedCallOfGetNewValue() {
+      return this.finishedCallOfGetNewValue;
+    }
+    public boolean isAboutToCallGetOldValue() {
+      return this.aboutToCallGetOldValue;
+    }
+    public boolean hasFinishedCallOfGetOldValue() {
+      return this.finishedCallOfGetOldValue;
+    }
+    @Override
+    void testHookReleaseInProgress() {
+      try {
+        this.waitingOnRelease = true;
+        this.releaseCountDown.await();
+      } catch (InterruptedException e) {
+        // quit waiting
+      }
+    }
+  }
+  private static EventID createEventID() {
+    byte[] memId = { 1,2,3 };
+    return new EventID(memId, 11, 12, 13);
+  }
+  
+  private EntryEventImpl createEntryEvent(LocalRegion l, Object newValue) {
     // create an event
     EntryEventImpl event = EntryEventImpl.create(l, Operation.CREATE, key,
         newValue, null,  false /* origin remote */, null,
         false /* generateCallbacks */,
-        eventId);
+        createEventID());
     // avoid calling invokeCallbacks
     event.callbacksInvoked(true);
 


Mime
View raw message