geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aging...@apache.org
Subject geode git commit: GEODE-2776: Setting version tag on the client event from the current region entry after load. And refactoring the findObjectInSystem().
Date Mon, 08 May 2017 17:22:54 GMT
Repository: geode
Updated Branches:
  refs/heads/develop 288676dfe -> 72d0d4baa


GEODE-2776: Setting version tag on the client event from the current region entry after load.
And refactoring the findObjectInSystem().


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/72d0d4ba
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/72d0d4ba
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/72d0d4ba

Branch: refs/heads/develop
Commit: 72d0d4baaccfb90e011286cb57d97174065256ae
Parents: 288676d
Author: Anil <agingade@pivotal.io>
Authored: Wed Apr 19 17:35:11 2017 -0700
Committer: Anil <agingade@pivotal.io>
Committed: Mon May 8 10:22:24 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/DistributedRegion.java | 246 +++++++++++--------
 .../geode/internal/cache/LocalRegion.java       |   3 +
 .../DistributedRegionSearchLoadJUnitTest.java   | 187 ++++++++++++++
 3 files changed, 335 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/72d0d4ba/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 0c967c9..c3a4961 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2265,123 +2265,167 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
       boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD,
       ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
       boolean returnTombstones) throws CacheLoaderException, TimeoutException {
+    @Released
+    EntryEventImpl event = null;
     checkForLimitedOrNoAccess();
+    final Operation op = isCreate ? Operation.CREATE : Operation.UPDATE;
+    long lastModified = 0L;
 
+    try {
+      event = findOnServer(keyInfo, op, generateCallbacks, clientEvent);
+      if (event == null) {
+        event = createEventForLoad(keyInfo, generateCallbacks, requestingClient, op);
+        lastModified = findUsingSearchLoad(txState, localValue, clientEvent, keyInfo, event);
+      }
+      // Update region with new value.
+      if (event.hasNewValue() && !isMemoryThresholdReachedForLoad()) {
+        putNewValueInRegion(isCreate, clientEvent, lastModified, event);
+      } else if (isCreate) {
+        recordMiss(null, event.getKey());
+      }
+      return determineResult(preferCD, event);
+    } finally {
+      if (event != null) {
+        event.release();
+      }
+    }
+  }
+
+  private EntryEventImpl createEventForLoad(KeyInfo keyInfo, boolean generateCallbacks,
+      ClientProxyMembershipID requestingClient, Operation op) {
+    // Do not generate Event ID
+    EntryEventImpl event = EntryEventImpl.create(this, op, keyInfo.getKey(), null /* newValue
*/,
+        keyInfo.getCallbackArg(), false, getMyId(), generateCallbacks);
+    if (requestingClient != null) {
+      event.setContext(requestingClient);
+    }
+    return event;
+  }
+
+  private Object determineResult(boolean preferCD, EntryEventImpl event) {
+    if (preferCD) {
+      return event.getRawNewValueAsHeapObject();
+    }
+    return event.getNewValue();
+  }
+
+  private void putNewValueInRegion(boolean isCreate, EntryEventImpl clientEvent, long lastModified,
+      EntryEventImpl event) {
     RegionEntry re = null;
-    final Object key = keyInfo.getKey();
-    final Object aCallbackArgument = keyInfo.getCallbackArg();
-    Operation op;
+    // Set eventId. Required for interested clients.
+    event.setNewEventId(cache.getDistributedSystem());
+
+    long startPut = CachePerfStats.getStatTime();
+    validateKey(event.getKey());
+    // this next step also distributes the object to other processes, if necessary
+    try {
+      // set the tail key so that the event is passed to GatewaySender queues.
+      // if the tailKey is not set, the event gets filtered out in ParallelGatewaySenderQueue
+      if (this instanceof BucketRegion) {
+        if (((BucketRegion) this).getPartitionedRegion().isParallelWanEnabled())
+          ((BucketRegion) this).handleWANEvent(event);
+      }
+      re = basicPutEntry(event, lastModified);
+
+      // Update client event with latest version tag from re.
+      if (re != null && clientEvent != null) {
+        clientEvent.setVersionTag(event.getVersionTag());
+      }
+      if (!isTX()) {
+        getCachePerfStats().endPut(startPut, event.isOriginRemote());
+      }
+    } catch (ConcurrentCacheModificationException e) {
+      // the cache was modified while we were searching for this entry and
+      // the netsearch result was elided. Return the current value from the cache
+      updateEventWithCurrentRegionEntry(event, clientEvent);
+    } catch (CacheWriterException cwe) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("findObjectInSystem: writer exception putting entry {} : {}", event,
cwe);
+      }
+    }
     if (isCreate) {
-      op = Operation.CREATE;
-    } else {
-      op = Operation.UPDATE;
+      recordMiss(re, event.getKey());
     }
-    long lastModified = 0L;
-    boolean fromServer = false;
-    @Released
-    EntryEventImpl event = null;
-    @Retained
-    Object result = null;
+  }
+
+  private void updateEventWithCurrentRegionEntry(EntryEventImpl event, EntryEventImpl clientEvent)
{
+    // defer the lruUpdateCallback to prevent a deadlock (see bug 51121).
+    final boolean disabled = this.entries.disableLruUpdateCallback();
     try {
-      {
-        if (this.srp != null) {
-          VersionTagHolder holder = new VersionTagHolder();
-          Object value = this.srp.get(key, aCallbackArgument, holder);
-          fromServer = value != null;
-          if (fromServer) {
-            event = EntryEventImpl.create(this, op, key, value, aCallbackArgument, false,
getMyId(),
-                generateCallbacks);
-            event.setVersionTag(holder.getVersionTag());
-            event.setFromServer(fromServer); // fix for bug 39358
-            if (clientEvent != null && clientEvent.getVersionTag() == null) {
-              clientEvent.setVersionTag(holder.getVersionTag());
-            }
+      RegionEntry re = getRegionEntry(event.getKey());
+      if (re != null) {
+        synchronized (re) { // bug #51059 value & version must be obtained atomically
+          // Update client event with latest version tag from re
+          if (clientEvent != null) {
+            clientEvent.setVersionTag(re.getVersionStamp().asVersionTag());
           }
+          // OFFHEAP: need to incrc, copy to heap to setNewValue, decrc
+          event.setNewValue(re.getValue(this));
         }
       }
-
-      if (!fromServer) {
-        // Do not generate Event ID
-        event = EntryEventImpl.create(this, op, key, null /* newValue */, aCallbackArgument,
false,
-            getMyId(), generateCallbacks);
-        if (requestingClient != null) {
-          event.setContext(requestingClient);
-        }
-        // If this event is because of a register interest call, don't invoke the CacheLoader
-        boolean getForRegisterInterest = clientEvent != null && clientEvent.getOperation()
!= null
-            && clientEvent.getOperation().isGetForRegisterInterest();
-        if (!getForRegisterInterest) {
-          SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
-          try {
-            processor.initialize(this, key, aCallbackArgument);
-            // processor fills in event
-            processor.doSearchAndLoad(event, txState, localValue);
-            if (clientEvent != null && clientEvent.getVersionTag() == null) {
-              clientEvent.setVersionTag(event.getVersionTag());
-            }
-            lastModified = processor.getLastModified();
-          } finally {
-            processor.release();
-          }
-        } else {
-          if (logger.isDebugEnabled()) {
-            logger.debug("DistributedRegion.findObjectInSystem skipping loader for region="
-                + getFullPath() + "; key=" + key);
-          }
-        }
+    } finally {
+      if (disabled) {
+        this.entries.enableLruUpdateCallback();
       }
-      if (event.hasNewValue() && !isMemoryThresholdReachedForLoad()) {
-        try {
-          // Set eventId. Required for interested clients.
-          event.setNewEventId(cache.getDistributedSystem());
-
-          long startPut = CachePerfStats.getStatTime();
-          validateKey(key);
-          // if (event.getOperation().isLoad()) {
-          // this.performedLoad(event, lastModified, txState);
-          // }
-          // this next step also distributes the object to other processes, if necessary
-          try {
-            // set the tail key so that the event is passed to GatewaySender queues.
-            // if the tailKey is not set, the event gets filtered out in ParallelGatewaySenderQueue
-            if (this instanceof BucketRegion) {
-              if (((BucketRegion) this).getPartitionedRegion().isParallelWanEnabled())
-                ((BucketRegion) this).handleWANEvent(event);
-            }
-            re = basicPutEntry(event, lastModified);
-          } catch (ConcurrentCacheModificationException e) {
-            // the cache was modified while we were searching for this entry and
-            // the netsearch result was elided. Return the current value from the cache
-            re = getRegionEntry(key);
-            if (re != null) {
-              event.setNewValue(re.getValue(this)); // OFFHEAP: need to incrc, copy to heap
to
-                                                    // setNewValue, decrc
-            }
-          }
-          if (!isTX()) {
-            getCachePerfStats().endPut(startPut, event.isOriginRemote());
-          }
-        } catch (CacheWriterException cwe) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("findObjectInSystem: writer exception putting entry {} : {}", event,
cwe);
-          }
-        }
+      try {
+        this.entries.lruUpdateCallback();
+      } catch (DiskAccessException dae) {
+        this.handleDiskAccessException(dae);
+        throw dae;
       }
-      if (isCreate) {
-        recordMiss(re, key);
+    }
+  }
+
+  /**
+   * If its client, get the value from server.
+   */
+  private EntryEventImpl findOnServer(KeyInfo keyInfo, Operation op, boolean generateCallbacks,
+      EntryEventImpl clientEvent) {
+    if (this.srp == null) {
+      return null;
+    }
+    EntryEventImpl event = null;
+    VersionTagHolder holder = new VersionTagHolder();
+    Object aCallbackArgument = keyInfo.getCallbackArg();
+    Object value = this.srp.get(keyInfo.getKey(), aCallbackArgument, holder);
+    if (value != null) {
+      event = EntryEventImpl.create(this, op, keyInfo.getKey(), value, aCallbackArgument,
false,
+          getMyId(), generateCallbacks);
+      event.setVersionTag(holder.getVersionTag());
+      event.setFromServer(true); // fix for bug 39358
+      if (clientEvent != null && clientEvent.getVersionTag() == null) {
+        clientEvent.setVersionTag(holder.getVersionTag());
       }
+    }
+    return event;
+  }
 
-      if (preferCD) {
-        result = event.getRawNewValueAsHeapObject();
-      } else {
-        result = event.getNewValue();
+  private long findUsingSearchLoad(TXStateInterface txState, Object localValue,
+      EntryEventImpl clientEvent, final KeyInfo keyInfo, EntryEventImpl event) {
+    long lastModified = 0L;
+    // If this event is because of a register interest call, don't invoke the CacheLoader
+    boolean getForRegisterInterest = clientEvent != null && clientEvent.getOperation()
!= null
+        && clientEvent.getOperation().isGetForRegisterInterest();
+    if (!getForRegisterInterest) {
+      SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
+      try {
+        processor.initialize(this, keyInfo.getKey(), keyInfo.getCallbackArg());
+        // processor fills in event
+        processor.doSearchAndLoad(event, txState, localValue);
+        if (clientEvent != null && clientEvent.getVersionTag() == null) {
+          clientEvent.setVersionTag(event.getVersionTag());
+        }
+        lastModified = processor.getLastModified();
+      } finally {
+        processor.release();
       }
-      return result;
-    } finally {
-      if (event != null) {
-        event.release();
+    } else {
+      if (logger.isDebugEnabled()) {
+        logger.debug("DistributedRegion.findObjectInSystem skipping loader for region="
+            + getFullPath() + "; key=" + keyInfo.getKey());
       }
     }
+    return lastModified;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/geode/blob/72d0d4ba/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 2dec53b..cdba7e4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -1393,6 +1393,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
    * @param key the key used to fetch the region entry
    */
   final public void recordMiss(final RegionEntry re, Object key) {
+    if (!this.statisticsEnabled) {
+      return;
+    }
     final RegionEntry e;
     if (re == null && !isTX()) {
       e = basicGetEntry(key);

http://git-wip-us.apache.org/repos/asf/geode/blob/72d0d4ba/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionSearchLoadJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionSearchLoadJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionSearchLoadJUnitTest.java
new file mode 100755
index 0000000..30fb728
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionSearchLoadJUnitTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.*;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
+import org.apache.geode.internal.cache.versions.VersionStamp;
+import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.test.fake.Fakes;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("*.UnitTest")
+@PrepareForTest({SearchLoadAndWriteProcessor.class})
+public class DistributedRegionSearchLoadJUnitTest {
+
+  protected DistributedRegion createAndDefineRegion(boolean isConcurrencyChecksEnabled,
+      RegionAttributes ra, InternalRegionArguments ira, GemFireCacheImpl cache) {
+    DistributedRegion region = new DistributedRegion("testRegion", ra, null, cache, ira);
+    if (isConcurrencyChecksEnabled) {
+      region.enableConcurrencyChecks();
+    }
+
+    // since it is a real region object, we need to tell mockito to monitor it
+    region = spy(region);
+
+    doNothing().when(region).distributeUpdate(any(), anyLong(), anyBoolean(), anyBoolean(),
any(),
+        anyBoolean());
+    doNothing().when(region).distributeDestroy(any(), any());
+    doNothing().when(region).distributeInvalidate(any());
+    doNothing().when(region).distributeUpdateEntryVersion(any());
+
+    return region;
+  }
+
+  private RegionAttributes createRegionAttributes(boolean isConcurrencyChecksEnabled) {
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_ACK);
+    factory.setDataPolicy(DataPolicy.REPLICATE);
+    factory.setConcurrencyChecksEnabled(isConcurrencyChecksEnabled); //
+    RegionAttributes ra = factory.create();
+    return ra;
+  }
+
+  private EventID createDummyEventID() {
+    byte[] memId = {1, 2, 3};
+    EventID eventId = new EventID(memId, 11, 12, 13);
+    return eventId;
+  }
+
+  protected EntryEventImpl createDummyEvent(DistributedRegion region) {
+    // create a dummy event id
+    EventID eventId = createDummyEventID();
+    String key = "key1";
+    String value = "Value1";
+
+    // create an event
+    EntryEventImpl event = EntryEventImpl.create(region, Operation.CREATE, key, value, null,
+        false /* origin remote */, null, false /* generateCallbacks */, eventId);
+    // avoid calling invokeCallbacks
+    event.callbacksInvoked(true);
+
+    return event;
+  }
+
+  protected VersionTag createVersionTag(boolean validVersionTag) {
+    InternalDistributedMember remotemember = mock(InternalDistributedMember.class);
+    VersionTag tag = VersionTag.create(remotemember);
+    if (validVersionTag) {
+      tag.setRegionVersion(1);
+      tag.setEntryVersion(1);
+    }
+    return tag;
+  }
+
+  protected DistributedRegion prepare(boolean isConcurrencyChecksEnabled) {
+    GemFireCacheImpl cache = Fakes.cache();
+
+    // create region attributes and internal region arguments
+    RegionAttributes ra = createRegionAttributes(isConcurrencyChecksEnabled);
+    InternalRegionArguments ira = new InternalRegionArguments();
+
+    // create a region object
+    DistributedRegion region = createAndDefineRegion(isConcurrencyChecksEnabled, ra, ira,
cache);
+    if (isConcurrencyChecksEnabled) {
+      region.enableConcurrencyChecks();
+    }
+
+    doNothing().when(region).notifyGatewaySender(any(), any());
+    doReturn(true).when(region).hasSeenEvent(any(EntryEventImpl.class));
+    return region;
+  }
+
+  private void createSearchLoad() {
+    SearchLoadAndWriteProcessor proc = mock(SearchLoadAndWriteProcessor.class);
+    PowerMockito.mockStatic(SearchLoadAndWriteProcessor.class);
+    PowerMockito.when(SearchLoadAndWriteProcessor.getProcessor()).thenReturn(proc);
+
+    VersionTag tag = createVersionTag(true);
+
+    doAnswer(new Answer<EntryEventImpl>() {
+      public EntryEventImpl answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        if (args[0] instanceof EntryEventImpl) {
+          EntryEventImpl event = (EntryEventImpl) args[0];
+          event.setNewValue("NewLoadedValue");
+          event.setOperation(Operation.LOCAL_LOAD_CREATE);
+        }
+        return null;
+      }
+    }).when(proc).doSearchAndLoad(any(EntryEventImpl.class), anyObject(), anyObject());
+  }
+
+  @Test
+  public void testClientEventIsUpdatedWithCurrentEntryVersionTagAfterLoad() {
+    DistributedRegion region = prepare(true);
+    EntryEventImpl event = createDummyEvent(region);
+    region.basicInvalidate(event);
+
+    createSearchLoad();
+
+    KeyInfo ki = new KeyInfo(event.getKey(), null, null);
+    region.findObjectInSystem(ki, false, null, false, null, false, false, null, event, false);
+    assertNotNull("ClientEvent version tag is not set with region version tag.",
+        event.getVersionTag());
+  }
+
+  @Test
+  public void testClientEventIsUpdatedWithCurrentEntryVersionTagAfterSearchConcurrencyException()
{
+    DistributedRegion region = prepare(true);
+
+    EntryEventImpl event = createDummyEvent(region);
+    region.basicInvalidate(event);
+
+    VersionTag tag = createVersionTag(true);
+    RegionEntry re = mock(RegionEntry.class);
+    VersionStamp stamp = mock(VersionStamp.class);
+
+    doReturn(re).when(region).getRegionEntry(any());
+    when(re.getVersionStamp()).thenReturn(stamp);
+    when(stamp.asVersionTag()).thenReturn(tag);
+
+    createSearchLoad();
+    doThrow(new ConcurrentCacheModificationException()).when(region)
+        .basicPutEntry(any(EntryEventImpl.class), anyLong());
+
+    KeyInfo ki = new KeyInfo(event.getKey(), null, null);
+    region.findObjectInSystem(ki, false, null, false, null, false, false, null, event, false);
+    assertNotNull("ClientEvent version tag is not set with region version tag.",
+        event.getVersionTag());
+  }
+
+}
+


Mime
View raw message