Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 61E5918D01 for ; Fri, 4 Mar 2016 17:30:19 +0000 (UTC) Received: (qmail 31605 invoked by uid 500); 4 Mar 2016 17:30:19 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 31561 invoked by uid 500); 4 Mar 2016 17:30:19 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 31547 invoked by uid 99); 4 Mar 2016 17:30:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2016 17:30:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id D5B3E1806E2 for ; Fri, 4 Mar 2016 17:30:18 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.549 X-Spam-Level: X-Spam-Status: No, score=-3.549 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.329] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 4G2gvoqiDmDT for ; Fri, 4 Mar 2016 17:30:16 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id CC8A65FBE8 for ; Fri, 4 Mar 2016 17:30:14 +0000 (UTC) Received: (qmail 30561 invoked by uid 99); 4 Mar 2016 17:30:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2016 17:30:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E1A9AE69FD; Fri, 4 Mar 2016 17:30:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: klund@apache.org To: commits@geode.incubator.apache.org Date: Fri, 04 Mar 2016 17:30:14 -0000 Message-Id: In-Reply-To: <96b6b252494842f082a5f2c89924a503@git.apache.org> References: <96b6b252494842f082a5f2c89924a503@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/14] incubator-geode git commit: GEODE-967: Changed to serialize substitute value only if necessary GEODE-967: Changed to serialize substitute value only if necessary Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/d067f41d Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/d067f41d Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/d067f41d Branch: refs/heads/feature/GEODE-949-2 Commit: d067f41d5411e54f1fa83caca6c3494182b6fe78 Parents: aa92530 Author: Barry Oglesby Authored: Thu Feb 25 16:56:12 2016 -0800 Committer: Barry Oglesby Committed: Tue Mar 1 10:13:05 2016 -0800 ---------------------------------------------------------------------- .../cache/wan/GatewaySenderEventImpl.java | 38 +++++++-- .../cache/wan/AsyncEventQueueTestBase.java | 86 +++++++++++++++++++- .../asyncqueue/AsyncEventListenerDUnitTest.java | 21 ++++- 3 files changed, 132 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d067f41d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java index e19d7bf..0e506f7 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java @@ -518,6 +518,9 @@ public class GatewaySenderEventImpl implements } Object rawValue = this.value; if (rawValue == null) { + rawValue = this.substituteValue; + } + if (rawValue == null) { @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE) Object vo = this.valueObj; if (vo instanceof StoredObject) { @@ -548,6 +551,8 @@ public class GatewaySenderEventImpl implements @Retained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE) Object result = this.value; if (result == null) { + result = this.substituteValue; + if (result == null) { result = this.valueObj; if (result instanceof ObjectChunk) { if (this.valueObjReleased) { @@ -562,6 +567,7 @@ public class GatewaySenderEventImpl implements } } } + } } return result; } @@ -582,7 +588,6 @@ public class GatewaySenderEventImpl implements * @return this event's deserialized value */ public Object getDeserializedValue() { -// TODO OFFHEAP MERGE: handle substituteValue here? if (this.valueIsObject == 0x00) { Object result = this.value; if (result == null) { @@ -616,6 +621,9 @@ public class GatewaySenderEventImpl implements Object result = EntryEventImpl.deserialize(this.value); this.valueObj = result; return result; + } else if (this.substituteValue != null) { + // If the substitute value is set, return it. + return this.substituteValue; } else { if (this.valueObjReleased) { throw new IllegalStateException("Value is no longer available. getDeserializedValue must be called before processEvents returns."); @@ -633,8 +641,10 @@ public class GatewaySenderEventImpl implements * the value. This is a debugging exception. */ public String getValueAsString(boolean deserialize) { -// TODO OFFHEAP MERGE: handle substituteValue here? Object v = this.value; + if (v == null) { + v = this.substituteValue; + } if (deserialize) { try { v = getDeserializedValue(); @@ -672,6 +682,13 @@ public class GatewaySenderEventImpl implements public byte[] getSerializedValue() { byte[] result = this.value; if (result == null) { + if (this.substituteValue != null) { + // The substitute value is set. Serialize it + isSerializingValue.set(Boolean.TRUE); + result = EntryEventImpl.serialize(this.substituteValue); + isSerializingValue.set(Boolean.FALSE); + return result; + } @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE) Object vo = this.valueObj; if (vo instanceof StoredObject) { @@ -687,7 +704,9 @@ public class GatewaySenderEventImpl implements synchronized (this) { result = this.value; if (result == null && vo != null && !(vo instanceof Token)) { + isSerializingValue.set(Boolean.TRUE); result = EntryEventImpl.serialize(vo); + isSerializingValue.set(Boolean.FALSE); this.value = result; } else if (result == null) { if (this.valueObjReleased) { @@ -982,15 +1001,12 @@ public class GatewaySenderEventImpl implements this.value = (byte[]) this.substituteValue; this.valueIsObject = 0x00; } else if (this.substituteValue == TOKEN_NULL) { - // The substituteValue represents null. Set the value to null. + // The substituteValue represents null. Set the value and substituteValue to null. this.value = null; + this.substituteValue = null; this.valueIsObject = 0x01; } else { - // The substituteValue is an object. Serialize it. - isSerializingValue.set(Boolean.TRUE); - this.value = CacheServerHelper.serialize(this.substituteValue); - isSerializingValue.set(Boolean.FALSE); - event.setCachedSerializedNewValue(this.value); + // The substituteValue is an object. Leave it as is. this.valueIsObject = 0x01; } } @@ -1219,7 +1235,11 @@ public class GatewaySenderEventImpl implements if (vo instanceof StoredObject) { return ((StoredObject) vo).getSizeInBytes(); } else { + if (this.substituteValue != null) { + return sizeOf(this.substituteValue); + } else { return CachedDeserializableFactory.calcMemSize(getSerializedValue()); + } } } @@ -1247,7 +1267,7 @@ public class GatewaySenderEventImpl implements * If it was stored off-heap and is no longer available (because it was released) then return null. */ public GatewaySenderEventImpl makeHeapCopyIfOffHeap() { - if (this.value != null) { + if (this.value != null || this.substituteValue != null) { // we have the value stored on the heap so return this return this; } else { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d067f41d/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java index 93e91c9..d834017 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java @@ -16,6 +16,8 @@ */ package com.gemstone.gemfire.internal.cache.wan; +import java.io.DataInput; +import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -34,6 +36,8 @@ import java.util.StringTokenizer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import com.gemstone.gemfire.DataSerializable; +import com.gemstone.gemfire.DataSerializer; import com.gemstone.gemfire.cache.AttributesFactory; import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.cache.CacheClosedException; @@ -72,6 +76,7 @@ import com.gemstone.gemfire.internal.cache.ForceReattemptException; import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.internal.cache.PartitionedRegion; import com.gemstone.gemfire.internal.cache.RegionQueue; +import com.gemstone.gemfire.internal.cache.lru.Sizeable; import com.gemstone.gemfire.test.dunit.Assert; import com.gemstone.gemfire.test.dunit.DistributedTestCase; import com.gemstone.gemfire.test.dunit.IgnoredException; @@ -1375,20 +1380,20 @@ public class AsyncEventQueueTestBase extends DistributedTestCase { } } - public static void verifySubstitutionFilterInvocations(String asyncEventQueueId, int numInvocations) { + public static void verifySubstitutionFilterInvocations(String asyncEventQueueId, int expectedNumInvocations) { AsyncEventQueue queue = cache.getAsyncEventQueue(asyncEventQueueId); assertNotNull(queue); // Verify the GatewayEventSubstitutionFilter has been invoked the appropriate number of times MyGatewayEventSubstitutionFilter filter = (MyGatewayEventSubstitutionFilter) queue.getGatewayEventSubstitutionFilter(); assertNotNull(filter); - assertEquals(numInvocations, filter.getNumInvocations()); + assertEquals(expectedNumInvocations, filter.getNumInvocations()); // Verify the AsyncEventListener has received the substituted values MyAsyncEventListener listener = (MyAsyncEventListener) queue.getAsyncEventListener(); final Map eventsMap = listener.getEventsMap(); assertNotNull(eventsMap); - assertEquals(numInvocations, eventsMap.size()); + assertEquals(expectedNumInvocations, eventsMap.size()); for (Iterator i = eventsMap.entrySet().iterator(); i.hasNext();) { Map.Entry entry = (Map.Entry) i.next(); @@ -1396,6 +1401,17 @@ public class AsyncEventQueueTestBase extends DistributedTestCase { } } + + public static void verifySubstitutionFilterToDataInvocations(String asyncEventQueueId, int expectedToDataInvoations) { + AsyncEventQueue queue = cache.getAsyncEventQueue(asyncEventQueueId); + assertNotNull(queue); + + // Verify the GatewayEventSubstitutionFilter has been invoked the appropriate number of times + SizeableGatewayEventSubstitutionFilter filter = (SizeableGatewayEventSubstitutionFilter) queue.getGatewayEventSubstitutionFilter(); + assertNotNull(filter); + assertEquals(expectedToDataInvoations, filter.getNumToDataInvocations()); + } + public static int getAsyncEventListenerMapSize(String asyncEventQueueId) { AsyncEventListener theListener = null; @@ -1649,6 +1665,70 @@ class MyCacheLoader implements CacheLoader, Declarable { } +class SizeableGatewayEventSubstitutionFilter implements GatewayEventSubstitutionFilter, Declarable { + + private AtomicInteger numToDataInvocations = new AtomicInteger(); + + protected static final String SUBSTITUTION_PREFIX = "substituted_"; + + public Object getSubstituteValue(EntryEvent event) { + return new GatewayEventSubstituteObject(this, SUBSTITUTION_PREFIX + event.getKey()); + } + + public void close() { + } + + public void init(Properties properties) { + } + + protected void incNumToDataInvocations() { + this.numToDataInvocations.incrementAndGet(); + } + + protected int getNumToDataInvocations() { + return this.numToDataInvocations.get(); + } +} + +class GatewayEventSubstituteObject implements DataSerializable, Sizeable { + + private String id; + + private SizeableGatewayEventSubstitutionFilter filter; + + public GatewayEventSubstituteObject(SizeableGatewayEventSubstitutionFilter filter, String id) { + this.filter = filter; + this.id = id; + } + + public String getId() { + return this.id; + } + + public void toData(DataOutput out) throws IOException { + this.filter.incNumToDataInvocations(); + DataSerializer.writeString(this.id, out); + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + this.id = DataSerializer.readString(in); + } + + public int getSizeInBytes() { + return 0; + } + + public String toString() { + return new StringBuilder() + .append(getClass().getSimpleName()) + .append("[") + .append("id=") + .append(this.id) + .append("]") + .toString(); + } +} + class MyGatewayEventSubstitutionFilter implements GatewayEventSubstitutionFilter, Declarable { private AtomicInteger numInvocations = new AtomicInteger(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d067f41d/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java index 59b2caa..779fc75 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java @@ -943,7 +943,26 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm4.invoke(() -> verifySubstitutionFilterInvocations( "ln" , numPuts )); + vm4.invoke(() -> verifySubstitutionFilterInvocations( "ln" ,numPuts )); + } + + public void testParallelAsyncEventQueueWithSubstitutionFilterNoSubstituteValueToDataInvocations() { + Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); + + vm4.invoke(createCacheRunnable(lnPort)); + + vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", + true, 100, 100, false, false, null, false, "MyAsyncEventListener", "SizeableGatewayEventSubstitutionFilter" )); + + String regionName = getTestMethodName() + "_PR"; + vm4.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( regionName, "ln", isOffHeap() )); + + int numPuts = 10; + vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( regionName, numPuts )); + + vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); + + vm4.invoke(() -> verifySubstitutionFilterToDataInvocations( "ln" ,0 )); } /**