Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D7B772009F3 for ; Fri, 20 May 2016 18:29:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D62C3160A24; Fri, 20 May 2016 16:29:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B1B65160A25 for ; Fri, 20 May 2016 18:29:45 +0200 (CEST) Received: (qmail 63541 invoked by uid 500); 20 May 2016 16:29:44 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 63532 invoked by uid 99); 20 May 2016 16:29:44 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 May 2016 16:29:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 560D8CAFEB for ; Fri, 20 May 2016 16:29:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id GJKYswaejeHg for ; Fri, 20 May 2016 16:29:37 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id A6F0C60E5F for ; Fri, 20 May 2016 16:29:34 +0000 (UTC) Received: (qmail 60255 invoked by uid 99); 20 May 2016 16:29:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 May 2016 16:29:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A15BEE7E34; Fri, 20 May 2016 16:29:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: klund@apache.org To: commits@geode.incubator.apache.org Date: Fri, 20 May 2016 16:30:13 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [42/49] incubator-geode git commit: GEODE-1209: Added new attribute to forward eviction/expiration to AEQ. Following changes are made: 1. Added new attribue/flag to forward eviction and expiration destroy events. 2. Added new option in CreateAsyncEventQu archived-at: Fri, 20 May 2016 16:29:48 -0000 GEODE-1209: Added new attribute to forward eviction/expiration to AEQ. Following changes are made: 1. Added new attribue/flag to forward eviction and expiration destroy events. 2. Added new option in CreateAsyncEventQueue Gfsh command. Also cleaned up arguments for create AEQ function invoked by gfsh. Instead of object array the args are wrapped in args object. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/46056a66 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/46056a66 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/46056a66 Branch: refs/heads/feature/GEODE-835-test Commit: 46056a6611970c06ab1414900e35e4135533af87 Parents: 8266f6b Author: Anil Authored: Tue May 3 13:52:18 2016 -0700 Committer: Anil Committed: Tue May 17 15:59:39 2016 -0700 ---------------------------------------------------------------------- .../SharedConfigurationEndToEndDUnitTest.java | 1 + .../cache/asyncqueue/AsyncEventQueue.java | 9 + .../asyncqueue/AsyncEventQueueFactory.java | 13 + .../internal/AsyncEventQueueFactoryImpl.java | 8 +- .../internal/AsyncEventQueueImpl.java | 9 +- .../gemfire/cache/wan/GatewaySender.java | 2 + .../gemfire/internal/cache/LocalRegion.java | 52 ++- .../cache/wan/AbstractGatewaySender.java | 100 +++--- .../cache/wan/GatewaySenderAttributes.java | 7 + .../cache/xmlcache/AsyncEventQueueCreation.java | 11 + .../internal/cache/xmlcache/CacheXml.java | 1 + .../cache/xmlcache/CacheXmlGenerator.java | 8 + .../internal/cache/xmlcache/CacheXmlParser.java | 7 + .../internal/cli/commands/QueueCommands.java | 18 +- .../functions/AsyncEventQueueFunctionArgs.java | 134 +++++++ .../CreateAsyncEventQueueFunction.java | 54 ++- .../internal/cli/i18n/CliStrings.java | 2 + .../controllers/QueueCommandsController.java | 1 + .../geode.apache.org/schema/cache/cache-1.0.xsd | 1 + ...ventQueueEvictionAndExpirationJUnitTest.java | 346 +++++++++++++++++++ .../cache30/CacheXmlGeode10DUnitTest.java | 78 +++++ .../cli/commands/QueueCommandsDUnitTest.java | 2 + .../codeAnalysis/sanctionedSerializables.txt | 1 + 23 files changed, 747 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-assembly/src/test/java/com/gemstone/gemfire/management/internal/configuration/SharedConfigurationEndToEndDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-assembly/src/test/java/com/gemstone/gemfire/management/internal/configuration/SharedConfigurationEndToEndDUnitTest.java b/geode-assembly/src/test/java/com/gemstone/gemfire/management/internal/configuration/SharedConfigurationEndToEndDUnitTest.java index a6221e9..5f3bf1f 100644 --- a/geode-assembly/src/test/java/com/gemstone/gemfire/management/internal/configuration/SharedConfigurationEndToEndDUnitTest.java +++ b/geode-assembly/src/test/java/com/gemstone/gemfire/management/internal/configuration/SharedConfigurationEndToEndDUnitTest.java @@ -226,6 +226,7 @@ public class SharedConfigurationEndToEndDUnitTest extends CliCommandTestBase { csb.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISPATCHERTHREADS, "4"); csb.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__ENABLEBATCHCONFLATION, "true"); csb.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISKSYNCHRONOUS, "true"); + csb.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__IGNORE_EVICTION_EXPIRATION, "true"); csb.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY, "1000"); csb.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__ORDERPOLICY, OrderPolicy.KEY.toString()); csb.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__PERSISTENT, "true"); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueue.java index a2b8b0f..c2d04a1 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueue.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueue.java @@ -147,4 +147,13 @@ public interface AsyncEventQueue { * AsyncEventQueue */ public GatewayEventSubstitutionFilter getGatewayEventSubstitutionFilter(); + + /** + * Represents if eviction and expiration events/operations are ignored (not passed) + * with AsyncEventListener. + * + * @return boolen True if eviction and expiration operations are ignored. + */ + public boolean isIgnoreEvictionAndExpiration(); + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java index 3e30b38..c607142 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueFactory.java @@ -170,7 +170,20 @@ public interface AsyncEventQueueFactory { public AsyncEventQueueFactory setGatewayEventSubstitutionListener( GatewayEventSubstitutionFilter filter); + /** + * Ignores the eviction and expiration events. + * By default its set to ignore eviction and expiration events (true), by + * setting it to false, the AEQ will receive destroy events due to eviction + * and expiration action. + * Note, setting this to false doesn't propagate invalidate events due to + * expiration action. + * + * @param ignore + * boolean to indicate whether to ignore eviction and expiration events. + */ + public AsyncEventQueueFactory setIgnoreEvictionAndExpiration(boolean ignore); + /** * Creates the AsyncEventQueue. It accepts Id of AsyncEventQueue * and instance of AsyncEventListener. Multiple queues can be created using http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java index 312e880..1ec3ba0 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java @@ -277,7 +277,7 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory { this.attrs.eventFilters = asyncQueueCreation.getGatewayEventFilters(); this.attrs.eventSubstitutionFilter = asyncQueueCreation.getGatewayEventSubstitutionFilter(); this.attrs.isForInternalUse = true; - + this.attrs.ignoreEvictionAndExpiration = asyncQueueCreation.isIgnoreEvictionAndExpiration(); } public AsyncEventQueueFactory setParallel(boolean isParallel) { @@ -292,4 +292,10 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory { this.attrs.isMetaQueue = isMetaQueue; return this; } + + @Override + public AsyncEventQueueFactory setIgnoreEvictionAndExpiration(boolean ignore) { + this.attrs.ignoreEvictionAndExpiration = ignore; + return this; + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java index 6b3eb4a..5a0b370 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java @@ -36,7 +36,7 @@ public class AsyncEventQueueImpl implements AsyncEventQueue { private GatewaySender sender = null; private AsyncEventListener asyncEventListener = null; - + public static final String ASYNC_EVENT_QUEUE_PREFIX = "AsyncEventQueue_"; public AsyncEventQueueImpl(GatewaySender sender, AsyncEventListener eventListener) { @@ -200,6 +200,9 @@ public class AsyncEventQueueImpl implements AsyncEventQueue { public boolean isBucketSorted() { // TODO Auto-generated method stub return false; - } - + } + + public boolean isIgnoreEvictionAndExpiration() { + return ((AbstractGatewaySender)this.sender).isIgnoreEvictionAndExpiration(); + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java index c5b5d3a..b0ad410 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java @@ -96,6 +96,8 @@ public interface GatewaySender { public static final int DEFAULT_DISPATCHER_THREADS = 5; + public static final boolean DEFAULT_IGNORE_EVICTION_EXPIRATION = true; + public static final OrderPolicy DEFAULT_ORDER_POLICY = OrderPolicy.KEY; /** * The default maximum amount of memory (MB) to allow in the queue before http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java index e9f5819..b5ff7ee 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java @@ -1147,8 +1147,7 @@ public class LocalRegion extends AbstractRegion @Override public boolean generateEventID() { - return !(isUsedForPartitionedRegionAdmin() - || isUsedForPartitionedRegionBucket() ); + return !isUsedForPartitionedRegionAdmin(); } public final Object destroy(Object key, Object aCallbackArgument) @@ -6647,10 +6646,10 @@ public class LocalRegion extends AbstractRegion protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) { - if (event.isConcurrencyConflict()) { // usually concurrent cache modification problem + if (isPdxTypesRegion() || event.isConcurrencyConflict() /* usually concurrent cache modification problem */) { return; } - + // Return if the inhibit all notifications flag is set if (event.inhibitAllNotifications()){ if(logger.isDebugEnabled()) { @@ -6659,34 +6658,31 @@ public class LocalRegion extends AbstractRegion return; } - if (!event.getOperation().isLocal()) { - Set allGatewaySenderIds = null; - checkSameSenderIdsAvailableOnAllNodes(); - if (event.getOperation() == Operation.UPDATE_VERSION_STAMP) { - allGatewaySenderIds = getGatewaySenderIds(); - } else { - allGatewaySenderIds = getAllGatewaySenderIds(); - } - - List allRemoteDSIds = getRemoteDsIds(allGatewaySenderIds); + + Set allGatewaySenderIds = null; + checkSameSenderIdsAvailableOnAllNodes(); + if (event.getOperation() == Operation.UPDATE_VERSION_STAMP) { + allGatewaySenderIds = getGatewaySenderIds(); + } else { + allGatewaySenderIds = getAllGatewaySenderIds(); + } - if (allRemoteDSIds != null) { - for (GatewaySender sender : getCache().getAllGatewaySenders()) { - if (!isPdxTypesRegion()) { - if (allGatewaySenderIds.contains(sender.getId())) { - //TODO: This is a BUG. Why return and not continue? - if((!this.getDataPolicy().withStorage()) && sender.isParallel()){ - return; - } - if(logger.isDebugEnabled()) { - logger.debug("Notifying the GatewaySender : {}", sender.getId()); - } - ((AbstractGatewaySender)sender).distribute(operation, event, - allRemoteDSIds); - } + List allRemoteDSIds = getRemoteDsIds(allGatewaySenderIds); + if (allRemoteDSIds != null) { + for (GatewaySender sender : getCache().getAllGatewaySenders()) { + if (allGatewaySenderIds.contains(sender.getId())) { + //TODO: This is a BUG. Why return and not continue? + if((!this.getDataPolicy().withStorage()) && sender.isParallel()){ + return; + } + if(logger.isDebugEnabled()) { + logger.debug("Notifying the GatewaySender : {}", sender.getId()); } + ((AbstractGatewaySender)sender).distribute(operation, event, + allRemoteDSIds); } } + // if (shouldNotifyGatewaySender()) { // // Get All WAN site DSID's to be sent to each WAN site so that they http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java index fe09d03..713023f 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java @@ -34,6 +34,7 @@ import com.gemstone.gemfire.cache.AttributesFactory; import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.cache.CacheException; import com.gemstone.gemfire.cache.DataPolicy; +import com.gemstone.gemfire.cache.Operation; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.RegionAttributes; import com.gemstone.gemfire.cache.RegionDestroyedException; @@ -137,6 +138,8 @@ public abstract class AbstractGatewaySender implements GatewaySender, protected List listeners; + protected boolean ignoreEvictionAndExpiration; + protected GatewayEventSubstitutionFilter substitutionFilter; protected LocatorDiscoveryCallback locatorDiscoveryCallback; @@ -269,55 +272,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, initializeEventIdIndex(); } this.isBucketSorted = attrs.isBucketSorted(); - } - - public void createSender(Cache cache, GatewaySenderAttributes attrs){ - this.cache = cache; - this.id = attrs.getId(); - this.socketBufferSize = attrs.getSocketBufferSize(); - this.socketReadTimeout = attrs.getSocketReadTimeout(); - this.queueMemory = attrs.getMaximumQueueMemory(); - this.batchSize = attrs.getBatchSize(); - this.batchTimeInterval = attrs.getBatchTimeInterval(); - this.isConflation = attrs.isBatchConflationEnabled(); - this.isPersistence = attrs.isPersistenceEnabled(); - this.alertThreshold = attrs.getAlertThreshold(); - this.manualStart = attrs.isManualStart(); - this.isParallel = attrs.isParallel(); - this.isForInternalUse = attrs.isForInternalUse(); - this.diskStoreName = attrs.getDiskStoreName(); - this.remoteDSId = attrs.getRemoteDSId(); - this.eventFilters = attrs.getGatewayEventFilters(); - this.transFilters = attrs.getGatewayTransportFilters(); - this.listeners = attrs.getAsyncEventListeners(); - this.substitutionFilter = attrs.getGatewayEventSubstitutionFilter(); - this.locatorDiscoveryCallback = attrs.getGatewayLocatoDiscoveryCallback(); - this.isDiskSynchronous = attrs.isDiskSynchronous(); - this.policy = attrs.getOrderPolicy(); - this.dispatcherThreads = attrs.getDispatcherThreads(); - this.parallelismForReplicatedRegion = attrs.getParallelismForReplicatedRegion(); - //divide the maximumQueueMemory of sender equally using number of dispatcher threads. - //if dispatcherThreads is 1 then maxMemoryPerDispatcherQueue will be same as maximumQueueMemory of sender - this.maxMemoryPerDispatcherQueue = this.queueMemory / this.dispatcherThreads; - this.myDSId = InternalDistributedSystem.getAnyInstance().getDistributionManager().getDistributedSystemId(); - this.serialNumber = DistributionAdvisor.createSerialNumber(); - if (!(this.cache instanceof CacheCreation)) { - this.stopper = new Stopper(cache.getCancelCriterion()); - this.senderAdvisor = GatewaySenderAdvisor.createGatewaySenderAdvisor(this); - if (!this.isForInternalUse()) { - this.statistics = new AsyncEventQueueStats(cache.getDistributedSystem(), - id); - } - else {// this sender lies underneath the AsyncEventQueue. Need to have - // AsyncEventQueueStats - this.statistics = new AsyncEventQueueStats( - cache.getDistributedSystem(), AsyncEventQueueImpl - .getAsyncEventQueueIdFromSenderId(id)); - } - initializeEventIdIndex(); - } - this.isBucketSorted = attrs.isBucketSorted(); - + this.ignoreEvictionAndExpiration = attrs.isIgnoreEvictionAndExpiration(); } public GatewaySenderAdvisor getSenderAdvisor() { @@ -392,6 +347,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, return !this.listeners.isEmpty(); } + public boolean isIgnoreEvictionAndExpiration() { + return this.ignoreEvictionAndExpiration; + } + public boolean isManualStart() { return this.manualStart; } @@ -839,16 +798,49 @@ public abstract class AbstractGatewaySender implements GatewaySender, return this.eventProcessor; } + /** + * Check if this event can be distributed by senders. + * @param event + * @param stats + * @return boolean True if the event is allowed. + */ + private boolean checkForDistribution(EntryEventImpl event, GatewaySenderStats stats) { + if (event.getRegion().getDataPolicy().equals(DataPolicy.NORMAL)) + { + return false; + } + + // Eviction and expirations are not passed to WAN. + // Eviction and Expiration are passed to AEQ based on its configuration. + if (event.getOperation().isLocal() || event.getOperation().isExpiration()) { + // Check if its AEQ and AEQ is configured to forward eviction/expiration events. + if (this.isAsyncEventQueue() && !this.isIgnoreEvictionAndExpiration()) { + return true; + } + return false; + } + + return true; + } + + public void distribute(EnumListenerEvent operation, EntryEventImpl event, List allRemoteDSIds) { + final boolean isDebugEnabled = logger.isDebugEnabled(); + // If this gateway is not running, return + if (!isRunning()) { + if (isDebugEnabled) { + logger.debug("Returning back without putting into the gateway sender queue"); + } + return; + } + final GatewaySenderStats stats = getStatistics(); stats.incEventsReceived(); - // If the event is local (see bug 35831) or an expiration ignore it. - //removed the check of isLocal as in notifyGAtewayHub this has been taken care - if (/*event.getOperation().isLocal() || */event.getOperation().isExpiration() - || event.getRegion().getDataPolicy().equals(DataPolicy.NORMAL)) { + + if (!checkForDistribution(event, stats)) { getStatistics().incEventsNotQueued(); return; } @@ -941,6 +933,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, } try { // If this gateway is not running, return + // The sender may have stopped, after we have checked the status in the beginning. if (!isRunning()) { if (isDebugEnabled) { logger.debug("Returning back without putting into the gateway sender queue"); @@ -988,6 +981,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, } } + /** * During sender is getting started, if there are any cache operation on queue then that event will be stored in temp queue. * Once sender is started, these event from tmp queue will be added to sender queue. http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java index 1cef940..163943f 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java @@ -83,6 +83,8 @@ public class GatewaySenderAttributes { public boolean isMetaQueue = GatewaySenderAttributes.DEFAULT_IS_META_QUEUE; + public boolean ignoreEvictionAndExpiration = GatewaySender.DEFAULT_IGNORE_EVICTION_EXPIRATION; + public int getSocketBufferSize() { return this.socketBufferSize; } @@ -192,4 +194,9 @@ public class GatewaySenderAttributes { public boolean isMetaQueue() { return this.isMetaQueue; } + + public boolean isIgnoreEvictionAndExpiration() { + return this.ignoreEvictionAndExpiration; + } + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java index 0015665..4c2943e 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java @@ -43,6 +43,7 @@ public class AsyncEventQueueCreation implements AsyncEventQueue { private boolean isBucketSorted = false; private int dispatcherThreads = 1; private OrderPolicy orderPolicy = OrderPolicy.KEY; + private boolean ignoreEvictionAndExpiration = true; public AsyncEventQueueCreation() { } @@ -62,6 +63,7 @@ public class AsyncEventQueueCreation implements AsyncEventQueue { this.asyncEventListener = eventListener; this.isBucketSorted = senderAttrs.isBucketSorted; this.gatewayEventSubstitutionFilter = senderAttrs.eventSubstitutionFilter; + this.ignoreEvictionAndExpiration = senderAttrs.ignoreEvictionAndExpiration; } @Override @@ -211,4 +213,13 @@ public class AsyncEventQueueCreation implements AsyncEventQueue { public void setBucketSorted(boolean isBucketSorted) { this.isBucketSorted = isBucketSorted; } + + public void setIgnoreEvictionAndExpiration(boolean ignore) { + this.ignoreEvictionAndExpiration = ignore; + } + + @Override + public boolean isIgnoreEvictionAndExpiration() { + return this.ignoreEvictionAndExpiration; + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java index aa7d49a..c3eccd2 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java @@ -762,6 +762,7 @@ public abstract class CacheXml implements EntityResolver2, ErrorHandler { protected static final String ASYNC_EVENT_LISTENER = "async-event-listener"; public static final String ASYNC_EVENT_QUEUE = "async-event-queue"; protected static final String ASYNC_EVENT_QUEUE_IDS = "async-event-queue-ids"; + protected static final String IGNORE_EVICTION_AND_EXPIRATION = "ignore-eviction-expiration"; /** The name of the compressor attribute */ protected static final String COMPRESSOR = "compressor"; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java index ea3c975..f0b1368 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java @@ -1521,10 +1521,18 @@ public class CacheXmlGenerator extends CacheXml implements XMLReader { atts.addAttribute("", "", ORDER_POLICY, "", String.valueOf(asyncEventQueue .getOrderPolicy())); } + // eviction and expiration events + if (this.version.compareTo(CacheXmlVersion.GEODE_1_0) >= 0) { + if (generateDefaults() || asyncEventQueue.isIgnoreEvictionAndExpiration() != (GatewaySender.DEFAULT_IGNORE_EVICTION_EXPIRATION)) + atts.addAttribute("", "", IGNORE_EVICTION_AND_EXPIRATION, "", String.valueOf(asyncEventQueue + .isIgnoreEvictionAndExpiration())); + } // disk-synchronous if (generateDefaults() || asyncEventQueue.isDiskSynchronous() != GatewaySender.DEFAULT_DISK_SYNCHRONOUS) atts.addAttribute("", "", DISK_SYNCHRONOUS, "", String.valueOf(asyncEventQueue .isDiskSynchronous())); + + // AsyncEventQueue element start handler.startElement("", ASYNC_EVENT_QUEUE, ASYNC_EVENT_QUEUE, atts); List eventFilters = asyncEventQueue.getGatewayEventFilters(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java index f344938..aec2dc3 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java @@ -2313,6 +2313,12 @@ public class CacheXmlParser extends CacheXml implements ContentHandler { } } + // forward eviction and expiration events. + String ignoreEvictionExpiration = atts.getValue(IGNORE_EVICTION_AND_EXPIRATION); + if (ignoreEvictionExpiration != null) { + asyncEventQueueCreation.setIgnoreEvictionAndExpiration(Boolean.parseBoolean(ignoreEvictionExpiration)); + } + stack.push(asyncEventQueueCreation); } @@ -2346,6 +2352,7 @@ public class CacheXmlParser extends CacheXml implements ContentHandler { factory.setMaximumQueueMemory(asyncEventChannelCreation.getMaximumQueueMemory()); factory.setDispatcherThreads(asyncEventChannelCreation.getDispatcherThreads()); factory.setOrderPolicy(asyncEventChannelCreation.getOrderPolicy()); + factory.setIgnoreEvictionAndExpiration(asyncEventChannelCreation.isIgnoreEvictionAndExpiration()); List gatewayEventFilters = asyncEventChannelCreation.getGatewayEventFilters(); for (GatewayEventFilter gatewayEventFilter : gatewayEventFilters) { factory.addGatewayEventFilter(gatewayEventFilter); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/QueueCommands.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/QueueCommands.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/QueueCommands.java index 89534a6..b9c853d 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/QueueCommands.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/QueueCommands.java @@ -25,6 +25,7 @@ import com.gemstone.gemfire.management.cli.Result; import com.gemstone.gemfire.management.cli.Result.Status; import com.gemstone.gemfire.management.internal.cli.CliUtil; import com.gemstone.gemfire.management.internal.cli.domain.AsyncEventQueueDetails; +import com.gemstone.gemfire.management.internal.cli.functions.AsyncEventQueueFunctionArgs; import com.gemstone.gemfire.management.internal.cli.functions.CliFunctionResult; import com.gemstone.gemfire.management.internal.cli.functions.CreateAsyncEventQueueFunction; import com.gemstone.gemfire.management.internal.cli.functions.ListAsyncEventQueuesFunction; @@ -36,6 +37,7 @@ import com.gemstone.gemfire.management.internal.cli.shell.Gfsh; import com.gemstone.gemfire.management.internal.configuration.SharedConfigurationWriter; import com.gemstone.gemfire.management.internal.configuration.domain.XmlEntity; import com.gemstone.gemfire.management.internal.security.ResourceOperation; + import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.annotation.CliAvailabilityIndicator; import org.springframework.shell.core.annotation.CliCommand; @@ -105,6 +107,11 @@ public class QueueCommands implements CommandMarker { specifiedDefaultValue = "true", help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISKSYNCHRONOUS__HELP) Boolean diskSynchronous, + @CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__IGNORE_EVICTION_EXPIRATION, + unspecifiedDefaultValue = "true", + specifiedDefaultValue = "true", + help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__IGNORE_EVICTION_EXPIRATION__HELP) + Boolean ignoreEvictionAndExpiration, @CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY, unspecifiedDefaultValue = "100", help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY__HELP) @@ -159,8 +166,15 @@ public class QueueCommands implements CommandMarker { return crex.getResult(); } - ResultCollector rc = CliUtil.executeFunction(new CreateAsyncEventQueueFunction(), new Object[] { id, parallel, enableBatchConflation, batchSize,batchTimeInterval, - persistent, diskStore, diskSynchronous, maxQueueMemory, dispatcherThreads, orderPolicy, gatewayEventFilters, gatewaySubstitutionListener, listener, listenerProperties }, targetMembers); + AsyncEventQueueFunctionArgs aeqArgs = new AsyncEventQueueFunctionArgs(id, parallel, + enableBatchConflation, batchSize,batchTimeInterval, + persistent, diskStore, diskSynchronous, maxQueueMemory, dispatcherThreads, orderPolicy, + gatewayEventFilters, gatewaySubstitutionListener, listener, listenerProperties, + ignoreEvictionAndExpiration); + + ResultCollector rc = CliUtil.executeFunction(new CreateAsyncEventQueueFunction(), + aeqArgs, targetMembers); + List results = CliFunctionResult.cleanResults((List) rc.getResult()); XmlEntity xmlEntity = null; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/AsyncEventQueueFunctionArgs.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/AsyncEventQueueFunctionArgs.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/AsyncEventQueueFunctionArgs.java new file mode 100644 index 0000000..2066628 --- /dev/null +++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/AsyncEventQueueFunctionArgs.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.management.internal.cli.functions; + +import java.io.Serializable; +import java.util.Properties; + +/** + * This class stores the arguments provided for create async event queue command. + */ +public class AsyncEventQueueFunctionArgs implements Serializable { + + private static final long serialVersionUID = -6524494645663740872L; + + private String asyncEventQueueId; + private boolean isParallel; + private boolean enableBatchConflation; + private int batchSize; + private int batchTimeInterval; + private boolean persistent; + private String diskStoreName; + private boolean diskSynchronous; + private int maxQueueMemory; + private int dispatcherThreads; + private String orderPolicy; + private String[] gatewayEventFilters; + private String gatewaySubstitutionFilter; + private String listenerClassName; + private Properties listenerProperties; + private boolean ignoreEvictionAndExpiration; + + public AsyncEventQueueFunctionArgs(String asyncEventQueueId, + boolean isParallel, boolean enableBatchConflation, int batchSize, + int batchTimeInterval, boolean persistent, String diskStoreName, + boolean diskSynchronous, int maxQueueMemory, int dispatcherThreads, + String orderPolicy, String[] gatewayEventFilters, + String gatewaySubstitutionFilter, String listenerClassName, + Properties listenerProperties, boolean ignoreEvictionAndExpiration) { + this.asyncEventQueueId = asyncEventQueueId; + this.isParallel = isParallel; + this.enableBatchConflation = enableBatchConflation; + this.batchSize = batchSize; + this.batchTimeInterval = batchTimeInterval; + this.persistent = persistent; + this.diskStoreName = diskStoreName; + this.diskSynchronous = diskSynchronous; + this.maxQueueMemory = maxQueueMemory; + this.dispatcherThreads = dispatcherThreads; + this.orderPolicy = orderPolicy; + this.gatewayEventFilters = gatewayEventFilters; + this.gatewaySubstitutionFilter = gatewaySubstitutionFilter; + this.listenerClassName = listenerClassName; + this.listenerProperties = listenerProperties; + this.ignoreEvictionAndExpiration = ignoreEvictionAndExpiration; + } + + public String getAsyncEventQueueId() { + return asyncEventQueueId; + } + + public boolean isParallel() { + return isParallel; + } + + public boolean isEnableBatchConflation() { + return enableBatchConflation; + } + + public int getBatchSize() { + return batchSize; + } + + public int getBatchTimeInterval() { + return batchTimeInterval; + } + + public boolean isPersistent() { + return persistent; + } + + public String getDiskStoreName() { + return diskStoreName; + } + + public boolean isDiskSynchronous() { + return diskSynchronous; + } + + public int getMaxQueueMemory() { + return maxQueueMemory; + } + + public int getDispatcherThreads() { + return dispatcherThreads; + } + + public String getOrderPolicy() { + return orderPolicy; + } + + public String[] getGatewayEventFilters() { + return gatewayEventFilters; + } + + public String getGatewaySubstitutionFilter() { + return gatewaySubstitutionFilter; + } + + public String getListenerClassName() { + return listenerClassName; + } + + public Properties getListenerProperties() { + return listenerProperties; + } + + public boolean isIgnoreEvictionAndExpiration() { + return ignoreEvictionAndExpiration; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/CreateAsyncEventQueueFunction.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/CreateAsyncEventQueueFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/CreateAsyncEventQueueFunction.java index 7481c0e..695fbfb 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/CreateAsyncEventQueueFunction.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/CreateAsyncEventQueueFunction.java @@ -62,23 +62,8 @@ public class CreateAsyncEventQueueFunction extends FunctionAdapter implements In String memberId = ""; try { - final Object[] args = (Object[]) context.getArguments(); - final String asyncEventQueueId = (String) args[0]; - final boolean isParallel = (Boolean) args[1]; - final boolean enableBatchConflation = (Boolean) args[2]; - final int batchSize = (Integer) args[3]; - final int batchTimeInterval =(Integer) args[4]; - final boolean persistent = (Boolean) args[5]; - final String diskStoreName = (String) args[6]; - final boolean diskSynchronous =(Boolean) args[7]; - final int maxQueueMemory = (Integer) args[8]; - final int dispatcherThreads =(Integer) args[9]; - final String orderPolicy= (String) args[10]; - final String[] gatewayEventFilters =(String[]) args[11]; - final String gatewaySubstitutionFilter = (String) args[12]; - final String listenerClassName = (String) args[13]; - final Properties listenerProperties = (Properties) args[14]; - + AsyncEventQueueFunctionArgs aeqArgs = (AsyncEventQueueFunctionArgs)context.getArguments(); + GemFireCacheImpl cache = (GemFireCacheImpl) CacheFactory.getAnyInstance(); DistributedMember member = cache.getDistributedSystem().getDistributedMember(); @@ -89,32 +74,39 @@ public class CreateAsyncEventQueueFunction extends FunctionAdapter implements In memberId = member.getName(); } - AsyncEventQueueFactory asyncEventQueueFactory = cache.createAsyncEventQueueFactory(); - asyncEventQueueFactory.setParallel(isParallel); - asyncEventQueueFactory.setBatchConflationEnabled(enableBatchConflation); - asyncEventQueueFactory.setBatchSize(batchSize); - asyncEventQueueFactory.setBatchTimeInterval(batchTimeInterval); - asyncEventQueueFactory.setPersistent(persistent); - asyncEventQueueFactory.setDiskStoreName(diskStoreName); - asyncEventQueueFactory.setDiskSynchronous(diskSynchronous); - asyncEventQueueFactory.setMaximumQueueMemory(maxQueueMemory); - asyncEventQueueFactory.setDispatcherThreads(dispatcherThreads); - asyncEventQueueFactory.setOrderPolicy(OrderPolicy.valueOf(orderPolicy)); + AsyncEventQueueFactory asyncEventQueueFactory = cache.createAsyncEventQueueFactory() + .setParallel(aeqArgs.isParallel()) + .setBatchConflationEnabled(aeqArgs.isEnableBatchConflation()) + .setBatchSize(aeqArgs.getBatchSize()) + .setBatchTimeInterval(aeqArgs.getBatchTimeInterval()) + .setPersistent(aeqArgs.isPersistent()) + .setDiskStoreName(aeqArgs.getDiskStoreName()) + .setDiskSynchronous(aeqArgs.isDiskSynchronous()) + .setIgnoreEvictionAndExpiration(aeqArgs.isIgnoreEvictionAndExpiration()) + .setMaximumQueueMemory(aeqArgs.getMaxQueueMemory()) + .setDispatcherThreads(aeqArgs.getDispatcherThreads()) + .setOrderPolicy(OrderPolicy.valueOf(aeqArgs.getOrderPolicy())); + + String[] gatewayEventFilters = aeqArgs.getGatewayEventFilters(); if (gatewayEventFilters != null) { for (String gatewayEventFilter : gatewayEventFilters) { Class gatewayEventFilterKlass = forName(gatewayEventFilter, CliStrings.CREATE_ASYNC_EVENT_QUEUE__GATEWAYEVENTFILTER); asyncEventQueueFactory.addGatewayEventFilter((GatewayEventFilter) newInstance(gatewayEventFilterKlass, CliStrings.CREATE_ASYNC_EVENT_QUEUE__GATEWAYEVENTFILTER)); } } + + String gatewaySubstitutionFilter = aeqArgs.getGatewaySubstitutionFilter(); if (gatewaySubstitutionFilter != null) { Class gatewayEventSubstitutionFilterKlass = forName(gatewaySubstitutionFilter, CliStrings.CREATE_ASYNC_EVENT_QUEUE__SUBSTITUTION_FILTER); asyncEventQueueFactory.setGatewayEventSubstitutionListener((GatewayEventSubstitutionFilter) newInstance(gatewayEventSubstitutionFilterKlass, CliStrings.CREATE_ASYNC_EVENT_QUEUE__SUBSTITUTION_FILTER)); } - + + String listenerClassName = aeqArgs.getListenerClassName(); Object listenerInstance; Class listenerClass = InternalDataSerializer.getCachedClass(listenerClassName); listenerInstance = listenerClass.newInstance(); + Properties listenerProperties = aeqArgs.getListenerProperties(); if (listenerProperties != null && !listenerProperties.isEmpty()) { if (!(listenerInstance instanceof Declarable)) { throw new IllegalArgumentException("Listener properties were provided, but the listener specified does not implement Declarable."); @@ -127,9 +119,9 @@ public class CreateAsyncEventQueueFunction extends FunctionAdapter implements In cache.addDeclarableProperties(declarablesMap); } - asyncEventQueueFactory.create(asyncEventQueueId, (AsyncEventListener) listenerInstance); + asyncEventQueueFactory.create(aeqArgs.getAsyncEventQueueId(), (AsyncEventListener) listenerInstance); - XmlEntity xmlEntity = new XmlEntity(CacheXml.ASYNC_EVENT_QUEUE, "id", asyncEventQueueId); + XmlEntity xmlEntity = new XmlEntity(CacheXml.ASYNC_EVENT_QUEUE, "id", aeqArgs.getAsyncEventQueueId()); context.getResultSender().lastResult(new CliFunctionResult(memberId, xmlEntity, "Success")); } catch (CacheClosedException cce) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/i18n/CliStrings.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/i18n/CliStrings.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/i18n/CliStrings.java index 28ba856..241c9e2 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/i18n/CliStrings.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/i18n/CliStrings.java @@ -423,6 +423,8 @@ public class CliStrings { public static final String CREATE_ASYNC_EVENT_QUEUE__DISK_STORE__HELP = "Disk store to be used by this queue."; public static final String CREATE_ASYNC_EVENT_QUEUE__DISKSYNCHRONOUS = "disk-synchronous"; public static final String CREATE_ASYNC_EVENT_QUEUE__DISKSYNCHRONOUS__HELP = "Whether disk writes are synchronous."; + public static final String CREATE_ASYNC_EVENT_QUEUE__IGNORE_EVICTION_EXPIRATION = "ignore-eviction-expiration"; + public static final String CREATE_ASYNC_EVENT_QUEUE__IGNORE_EVICTION_EXPIRATION__HELP = "Whether to ignore eviction and expiration events."; public static final String CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY = "max-queue-memory"; public static final String CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY__HELP = "Maximum amount of memory, in megabytes, that the queue can consume before overflowing to disk."; public static final String CREATE_ASYNC_EVENT_QUEUE__GATEWAYEVENTFILTER = "gateway-event-filter"; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/java/com/gemstone/gemfire/management/internal/web/controllers/QueueCommandsController.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/web/controllers/QueueCommandsController.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/web/controllers/QueueCommandsController.java index b8353fa..83d7bc1 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/web/controllers/QueueCommandsController.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/web/controllers/QueueCommandsController.java @@ -58,6 +58,7 @@ public class QueueCommandsController extends AbstractCommandsController { @RequestParam(value = CliStrings.CREATE_ASYNC_EVENT_QUEUE__PERSISTENT, defaultValue = "false") final Boolean persistent, @RequestParam(value = CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISK_STORE, required = false) final String diskStore, @RequestParam(value = CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISKSYNCHRONOUS, defaultValue = "true") final Boolean diskSynchronous, + @RequestParam(value = CliStrings.CREATE_ASYNC_EVENT_QUEUE__IGNORE_EVICTION_EXPIRATION, defaultValue = "true") final Boolean isIgnoreEvictionAndExpiration, @RequestParam(value = CliStrings.CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY, defaultValue = "100") final Integer maxQueueMemory, @RequestParam(value = CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISPATCHERTHREADS, defaultValue = "1") final Integer dispatcherThreads, @RequestParam(value = CliStrings.CREATE_ASYNC_EVENT_QUEUE__ORDERPOLICY, defaultValue = "KEY") final String orderPolicy, http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd ---------------------------------------------------------------------- diff --git a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd index cc6d189..688ff1f 100755 --- a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd +++ b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd @@ -254,6 +254,7 @@ declarative caching XML file elements unless indicated otherwise. + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/test/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueEvictionAndExpirationJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueEvictionAndExpirationJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueEvictionAndExpirationJUnitTest.java new file mode 100644 index 0000000..5efac05 --- /dev/null +++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/asyncqueue/AsyncEventQueueEvictionAndExpirationJUnitTest.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.asyncqueue; + +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.EvictionAction; +import com.gemstone.gemfire.cache.EvictionAttributes; +import com.gemstone.gemfire.cache.ExpirationAction; +import com.gemstone.gemfire.cache.ExpirationAttributes; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.RegionFactory; +import com.gemstone.gemfire.cache.RegionShortcut; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.test.junit.categories.IntegrationTest; +import com.jayway.awaitility.Awaitility; + +import static org.mockito.Mockito.*; + +@Category(IntegrationTest.class) +public class AsyncEventQueueEvictionAndExpirationJUnitTest { + + private AsyncEventQueue aeq; + private Cache cache; + + @Rule + public TestName name = new TestName(); + + @Before + public void getCache() { + try { + cache = CacheFactory.getAnyInstance(); + } catch (Exception e) { + //ignore + } + if (null == cache) { + cache = (GemFireCacheImpl) new CacheFactory().set("mcast-port", "0").create(); + } + } + + @After + public void destroyCache() { + if (cache != null && !cache.isClosed()) { + cache.close(); + cache = null; + } + } + + + @Test + public void isIgnoreEvictionAndExpirationAttributeTrueByDefault() { + AsyncEventListener al = mock(AsyncEventListener.class); + aeq = cache.createAsyncEventQueueFactory().create("aeq", al); + // Test for default value of isIgnoreEvictionAndExpiration setting. + assertTrue(aeq.isIgnoreEvictionAndExpiration()); + } + + @Test + public void canSetFalseForIgnoreEvictionAndExpiration() { + AsyncEventListener al = mock(AsyncEventListener.class); + aeq = cache.createAsyncEventQueueFactory().setIgnoreEvictionAndExpiration(false).create("aeq", al); + // Test for default value of isIgnoreEvictionAndExpiration setting. + assertFalse(aeq.isIgnoreEvictionAndExpiration()); + } + + + @Test + public void evictionDestroyOpEventsNotPropogatedByDefault() { + // For Replicated Region with eviction-destroy op. + // Number of expected events 2. Two for create and none for eviction destroy. + createPopulateAndVerifyEvents(false /*isPR */, true /* ignoreEvictionExpiration */, + 2 /* expectedEvents */ , true /* eviction */, false /* evictionOverflow */, + false /* expirationDestroy */, false /* expirationInvalidate */, + false /* checkForDestroyOp */, false /* checkForInvalidateOp */); + } + + @Test + public void evictionDestroyOpEventsNotPropogatedByDefaultForPR() { + // For PR with eviction-destroy op. + // Number of expected events 2. Two for create and none for eviction destroy. + createPopulateAndVerifyEvents(true /*isPR */, true /* ignoreEvictionExpiration */, + 2 /* expectedEvents */ , true /* eviction */, false /* evictionOverflow */, + false /* expirationDestroy */, false /* expirationInvalidate */, + false /* checkForDestroyOp */, false /* checkForInvalidateOp */); + } + + @Test + public void expirationDestroyOpEventsNotPropogatedByDefault() { + // For Replicated Region with expiration-destroy op. + // Number of expected events 2. Two for create and none for eviction destroy. + createPopulateAndVerifyEvents(false /*isPR */, true /* ignoreEvictionExpiration */, + 2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */, + true /* expirationDestroy */, false /* expirationInvalidate */, + false /* checkForDestroyOp */, false /* checkForInvalidateOp */); + } + + @Test + public void expirationDestroyOpEventsNotPropogatedByDefaultForPR() { + // For PR with expiration-destroy op. + // Number of expected events 2. Two for create and none for eviction destroy. + createPopulateAndVerifyEvents(true /*isPR */, true /* ignoreEvictionExpiration */, + 2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */, + true /* expirationDestroy */, false /* expirationInvalidate */, + false /* checkForDestroyOp */, false /* checkForInvalidateOp */); + } + + @Test + public void expirationInvalidOpEventsNotPropogatedByDefault() { + // For Replicated Region with expiration-invalid op. + // Number of expected events 2. Two for create and none for eviction destroy. + createPopulateAndVerifyEvents(false /*isPR */, true /* ignoreEvictionExpiration */, + 2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */, + false /* expirationDestroy */, true /* expirationInvalidate */, + false /* checkForDestroyOp */, false /* checkForInvalidateOp */); + } + + @Test + public void expirationInvalidOpEventsNotPropogatedByDefaultForPR() { + // For Replicated Region with expiration-invalid op. + // Number of expected events 2. Two for create and none for eviction destroy. + createPopulateAndVerifyEvents(true /*isPR */, true /* ignoreEvictionExpiration */, + 2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */, + false /* expirationDestroy */, true /* expirationInvalidate */, + false /* checkForDestroyOp */, false /* checkForInvalidateOp */); + } + + @Test + public void evictionPropogatedUsingIgnoreEvictionAndExpirationAttribute() { + // For Replicated Region with eviction-destroy op. + // Number of expected events 3. Two for create and One for eviction destroy. + createPopulateAndVerifyEvents(false /*isPR */, false /* ignoreEvictionExpiration */, + 3 /* expectedEvents */ , true /* eviction */, false /* evictionOverflow */, + false /* expirationDestroy */, false /* expirationInvalidate */, + true /* checkForDestroyOp */, false /* checkForInvalidateOp */); + } + + @Test + public void evictionPropogatedUsingIgnoreEvictionAndExpirationAttributeForPR() { + // For PR with eviction-destroy op. + // Number of expected events 3. Two for create and One for eviction destroy. + createPopulateAndVerifyEvents(true /*isPR */, false /* ignoreEvictionExpiration */, + 3 /* expectedEvents */ , true /* eviction */, false /* evictionOverflow */, + false /* expirationDestroy */, false /* expirationInvalidate */, + true /* checkForDestroyOp */, false /* checkForInvalidateOp */); + } + + @Test + public void overflowNotPropogatedUsingIgnoreEvictionAndExpirationAttribute() { + // For Replicated Region with eviction-overflow op. + // Number of expected events 2. Two for create and non for eviction overflow. + createPopulateAndVerifyEvents(false /*isPR */, false /* ignoreEvictionExpiration */, + 2 /* expectedEvents */ , false /* eviction */, true /* evictionOverflow */, + false /* expirationDestroy */, false /* expirationInvalidate */, + false /* checkForDestroyOp */, false /* checkForInvalidateOp */); + } + + @Test + public void overflowNotPropogatedUsingIgnoreEvictionAndExpirationAttributeForPR() { + // For PR with eviction-overflow op. + // Number of expected events 2. Two for create and non for eviction overflow. + createPopulateAndVerifyEvents(true /*isPR */, false /* ignoreEvictionExpiration */, + 2 /* expectedEvents */ , false /* eviction */, true /* evictionOverflow */, + false /* expirationDestroy */, false /* expirationInvalidate */, + false /* checkForDestroyOp */, false /* checkForInvalidateOp */); + } + + @Test + public void expirationDestroyPropogatedUsingIgnoreEvictionAndExpirationAttribute() { + // For Replicated Region with expiration-destroy op. + // Number of expected events 4. Two for create and Two for expiration destroy. + createPopulateAndVerifyEvents(false /*isPR */, false /* ignoreEvictionExpiration */, + 4 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */, + true /* expirationDestroy */, false /* expirationInvalidate */, + true /* checkForDestroyOp */, false /* checkForInvalidateOp */); + } + + @Test + public void expirationDestroyPropogatedUsingIgnoreEvictionAndExpirationAttributeForPR() { + // For PR with expiration-destroy op. + // Number of expected events 4. Two for create and Two for expiration destroy. + createPopulateAndVerifyEvents(true /*isPR */, false /* ignoreEvictionExpiration */, + 4 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */, + true /* expirationDestroy */, false /* expirationInvalidate */, + true /* checkForDestroyOp */, false /* checkForInvalidateOp */); + } + + @Test + public void expirationInvalidateNotPropogatedUsingIgnoreEvictionAndExpirationAttribute() { + // For Replicated Region with expiration-invalidate op. + // Currently invalidate event callbacks are not made to GateWay sender. + // Invalidates are not sent to AEQ. + // Number of expected events 2. None for expiration invalidate. + createPopulateAndVerifyEvents(false /*isPR */, false /* ignoreEvictionExpiration */, + 2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */, + false /* expirationDestroy */, true /* expirationInvalidate */, + false /* checkForDestroyOp */, false /* checkForInvalidateOp */); + } + + @Test + public void expirationInvalidateNotPropogatedUsingIgnoreEvictionAndExpirationAttributeForPR() { + // For PR with expiration-invalidate op. + // Currently invalidate event callbacks are not made to GateWay sender. + // Invalidates are not sent to AEQ. + // Number of expected events 2. None for expiration invalidate. + createPopulateAndVerifyEvents(true /*isPR */, false /* ignoreEvictionExpiration */, + 2 /* expectedEvents */ , false /* eviction */, false /* evictionOverflow */, + false /* expirationDestroy */, true /* expirationInvalidate */, + false /* checkForDestroyOp */, false /* checkForInvalidateOp */); + } + + + + private void createPopulateAndVerifyEvents(boolean isPR, boolean ignoreEvictionExpiration, int expectedEvents, boolean eviction, boolean evictionOverflow, + boolean expirationDestroy, boolean expirationInvalidate, boolean checkForDestroyOp, boolean checkForInvalidateOp) { + + // String aeqId = "AEQTest"; + String aeqId = name.getMethodName(); + + // To store AEQ events for validation. + List events = new ArrayList(); + + // Create AEQ + createAsyncEventQueue(aeqId, ignoreEvictionExpiration, events); + + // Create region with eviction/expiration + Region r = createRegion("ReplicatedRegionForAEQ", isPR, aeqId, eviction, evictionOverflow, expirationDestroy, expirationInvalidate); + + // Populate region with two entires. + r.put("Key-1", "Value-1"); + r.put("Key-2", "Value-2"); + + // The AQListner should get two events. One for create, one for destroy. + Awaitility.await().atMost(100, TimeUnit.SECONDS).until(() -> {return (events.size() == expectedEvents);}); + + // Check for the expected operation. + if (checkForDestroyOp) { + assertTrue("Expiration event not arrived", checkForOperation(events, false, true)); + } + + if (checkForInvalidateOp) { + assertTrue("Invalidate event not arrived", checkForOperation(events, true, false)); + } + + // Test complete. Destroy region. + r.destroyRegion(); + } + + private boolean checkForOperation(List events, boolean invalidate, boolean destroy) { + boolean found = false; + for (AsyncEvent e : events) { + if (invalidate && e.getOperation().isInvalidate()) { + found = true; + break; + } + if (destroy && e.getOperation().isDestroy()) { + found = true; + break; + } + } + return found; + } + + private void createAsyncEventQueue(String id, boolean ignoreEvictionExpiration, List storeEvents) { + AsyncEventListener al = this.createAsyncListener(storeEvents); + aeq = cache.createAsyncEventQueueFactory().setParallel(false) + .setIgnoreEvictionAndExpiration(ignoreEvictionExpiration) + .setBatchSize(1).setBatchTimeInterval(1).create(id, al); + } + + private Region createRegion(String name, boolean isPR, String aeqId, boolean evictionDestroy, + boolean evictionOverflow, boolean expirationDestroy, boolean expirationInvalidate) { + RegionFactory rf; + if (isPR) { + rf = cache.createRegionFactory(RegionShortcut.PARTITION); + } else { + rf = cache.createRegionFactory(RegionShortcut.REPLICATE); + } + // Set AsyncQueue. + rf.addAsyncEventQueueId(aeqId); + if (evictionDestroy) { + rf.setEvictionAttributes(EvictionAttributes.createLIFOEntryAttributes(1, EvictionAction.LOCAL_DESTROY)); + } + if (evictionOverflow) { + rf.setEvictionAttributes(EvictionAttributes.createLIFOEntryAttributes(1, EvictionAction.OVERFLOW_TO_DISK)); + } + if (expirationDestroy) { + rf.setEntryTimeToLive(new ExpirationAttributes(1, ExpirationAction.DESTROY)); + } + if (expirationInvalidate) { + rf.setEntryTimeToLive(new ExpirationAttributes(1, ExpirationAction.INVALIDATE)); + } + + return rf.create(name); + } + + private AsyncEventListener createAsyncListener(List list) { + AsyncEventListener listener = new AsyncEventListener() { + private List aeList = list; + + @Override + public void close() { + // TODO Auto-generated method stub + } + + @Override + public boolean processEvents(List arg0) { + System.out.println("AEQ Listener.process()"); + new Exception("Stack trace for AsyncEventQueue").printStackTrace(); + // TODO Auto-generated method stub + aeList.addAll(arg0); + System.out.println("AEQ Event :" + arg0); + return true; + } + }; + return listener; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheXmlGeode10DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheXmlGeode10DUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheXmlGeode10DUnitTest.java index 57e3a13..afcb9b0 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheXmlGeode10DUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheXmlGeode10DUnitTest.java @@ -20,8 +20,16 @@ */ package com.gemstone.gemfire.cache30; +import java.util.List; +import java.util.Properties; + import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.Declarable; import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory; import com.gemstone.gemfire.distributed.internal.DistributionConfig; import com.gemstone.gemfire.internal.cache.LocalRegion; import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation; @@ -231,4 +239,74 @@ public class CacheXmlGeode10DUnitTest extends CacheXml81DUnitTest { System.clearProperty("gemfire."+DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME); } } + + @SuppressWarnings("rawtypes") + public void testAsyncEventQueueIsEnableEvictionAndExpirationAttribute() { + + final String regionName = "testAsyncEventQueueIsEnableEvictionAndExpirationAttribute"; + + // Create AsyncEventQueue with Listener + final CacheCreation cache = new CacheCreation(); + AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory(); + + + AsyncEventListener listener = new MyAsyncEventListenerGeode10(); + + // Test for default ignoreEvictionAndExpiration attribute value (which is true) + String aeqId1 = "aeqWithDefaultIgnoreEE"; + factory.create(aeqId1,listener); + AsyncEventQueue aeq1 = cache.getAsyncEventQueue(aeqId1); + assertTrue(aeq1.isIgnoreEvictionAndExpiration()); + + // Test by setting ignoreEvictionAndExpiration attribute value. + String aeqId2 = "aeqWithIgnoreEEsetToFalse"; + factory.setIgnoreEvictionAndExpiration(false); + factory.create(aeqId2,listener); + + AsyncEventQueue aeq2 = cache.getAsyncEventQueue(aeqId2); + assertFalse(aeq2.isIgnoreEvictionAndExpiration()); + + // Create region and set the AsyncEventQueue + final RegionAttributesCreation attrs = new RegionAttributesCreation(cache); + attrs.addAsyncEventQueueId(aeqId2); + + final Region regionBefore = cache.createRegion(regionName, attrs); + assertNotNull(regionBefore); + assertTrue(regionBefore.getAttributes().getAsyncEventQueueIds().size() == 1); + + + testXml(cache); + + final Cache c = getCache(); + assertNotNull(c); + + aeq1 = c.getAsyncEventQueue(aeqId1); + assertTrue(aeq1.isIgnoreEvictionAndExpiration()); + + aeq2 = c.getAsyncEventQueue(aeqId2); + assertFalse(aeq2.isIgnoreEvictionAndExpiration()); + + final Region regionAfter = c.getRegion(regionName); + assertNotNull(regionAfter); + assertTrue(regionAfter.getAttributes().getAsyncEventQueueIds().size() == 1); + + regionAfter.localDestroyRegion(); + + // Clear AsyncEventQueues. + c.close(); + } + + public static class MyAsyncEventListenerGeode10 implements AsyncEventListener, Declarable { + + public boolean processEvents(List events) { + return true; + } + + public void close() { + } + + public void init(Properties properties) { + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/test/java/com/gemstone/gemfire/management/internal/cli/commands/QueueCommandsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/management/internal/cli/commands/QueueCommandsDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/management/internal/cli/commands/QueueCommandsDUnitTest.java index db14fdc..4472193 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/management/internal/cli/commands/QueueCommandsDUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/management/internal/cli/commands/QueueCommandsDUnitTest.java @@ -165,6 +165,7 @@ public class QueueCommandsDUnitTest extends CliCommandTestBase { commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__SUBSTITUTION_FILTER, "com.qcdunit.QueueCommandsDUnitTestHelper"); commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISKSYNCHRONOUS, "false"); + commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__IGNORE_EVICTION_EXPIRATION, "false"); commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__LISTENER, "com.qcdunit.QueueCommandsDUnitTestHelper"); commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__LISTENER_PARAM_AND_VALUE, "param1"); @@ -206,6 +207,7 @@ public class QueueCommandsDUnitTest extends CliCommandTestBase { assertEquals(queue.getGatewayEventSubstitutionFilter().getClass().getName(), "com.qcdunit.QueueCommandsDUnitTestHelper"); assertEquals(queue.isDiskSynchronous(), false); + assertEquals(queue.isIgnoreEvictionAndExpiration(), false); assertEquals(queue.getAsyncEventListener().getClass().getName(), "com.qcdunit.QueueCommandsDUnitTestHelper"); } }); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46056a66/geode-core/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedSerializables.txt ---------------------------------------------------------------------- diff --git a/geode-core/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedSerializables.txt b/geode-core/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedSerializables.txt index 1ee16d5..b8ef985 100755 --- a/geode-core/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedSerializables.txt +++ b/geode-core/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedSerializables.txt @@ -678,6 +678,7 @@ com/gemstone/gemfire/management/internal/cli/exceptions/CliException,false com/gemstone/gemfire/management/internal/cli/exceptions/CreateSubregionException,true,4387344870743824916 com/gemstone/gemfire/management/internal/cli/exceptions/IndexNotFoundException,true,1,indexName:java/lang/String,message:java/lang/String com/gemstone/gemfire/management/internal/cli/functions/AlterRuntimeConfigFunction,true,1 +com/gemstone/gemfire/management/internal/cli/functions/AsyncEventQueueFunctionArgs,true,-6524494645663740872,asyncEventQueueId:java/lang/String,batchSize:int,batchTimeInterval:int,diskStoreName:java/lang/String,diskSynchronous:boolean,dispatcherThreads:int,enableBatchConflation:boolean,gatewayEventFilters:java/lang/String[],gatewaySubstitutionFilter:java/lang/String,ignoreEvictionAndExpiration:boolean,isParallel:boolean,listenerClassName:java/lang/String,listenerProperties:java/util/Properties,maxQueueMemory:int,orderPolicy:java/lang/String,persistent:boolean com/gemstone/gemfire/management/internal/cli/functions/ChangeLogLevelFunction,true,1 com/gemstone/gemfire/management/internal/cli/functions/CloseDurableClientFunction,true,1 com/gemstone/gemfire/management/internal/cli/functions/CloseDurableCqFunction,true,1