Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BEDAD200C6F for ; Tue, 9 May 2017 20:17:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BD63A160BB6; Tue, 9 May 2017 18:17:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1006B160BD3 for ; Tue, 9 May 2017 20:17:35 +0200 (CEST) Received: (qmail 74879 invoked by uid 500); 9 May 2017 18:17:35 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 73526 invoked by uid 99); 9 May 2017 18:17:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 May 2017 18:17:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DC828F17B3; Tue, 9 May 2017 18:17:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: klund@apache.org To: commits@geode.apache.org Date: Tue, 09 May 2017 18:17:49 -0000 Message-Id: <68fc3c4d12144b13a7092df9d885e2bc@git.apache.org> In-Reply-To: <521e1277ab004287b4e04ef0470e25c5@git.apache.org> References: <521e1277ab004287b4e04ef0470e25c5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/49] geode git commit: GEODE-2632: change dependencies on GemFireCacheImpl to InternalCache archived-at: Tue, 09 May 2017 18:17:45 -0000 http://git-wip-us.apache.org/repos/asf/geode/blob/80bc00f0/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java index 74034e4..ab6794b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java @@ -36,11 +36,10 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe import org.apache.geode.internal.Assert; import org.apache.geode.internal.SetUtils; import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.control.MemoryThresholds; import org.apache.geode.internal.i18n.LocalizedStrings; -/** - */ public class MemberFunctionExecutor extends AbstractExecution { protected InternalDistributedSystem ds; @@ -131,7 +130,7 @@ public class MemberFunctionExecutor extends AbstractExecution { final FunctionContext context = new FunctionContextImpl(function.getId(), getArgumentsForMember(localVM.getId()), resultSender); boolean isTx = false; - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null) { isTx = cache.getTxManager().getTXState() == null ? false : true; } @@ -156,13 +155,9 @@ public class MemberFunctionExecutor extends AbstractExecution { return localRC; } - /** - * @param function - * @param dest - */ @Override public void validateExecution(final Function function, final Set dest) { - final GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + final InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null && cache.getTxManager().getTXState() != null) { if (dest.size() > 1) { throw new TransactionException( http://git-wip-us.apache.org/repos/asf/geode/blob/80bc00f0/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java index a3ae2c0..27542f5 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java @@ -20,7 +20,6 @@ import java.util.Map; import java.util.Random; import java.util.Set; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.LowMemoryException; import org.apache.geode.cache.Region; @@ -38,15 +37,12 @@ import org.apache.geode.internal.Assert; import org.apache.geode.internal.SetUtils; import org.apache.geode.internal.cache.DistributedRegion; import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.control.MemoryThresholds; import org.apache.geode.internal.i18n.LocalizedStrings; -/** - * - * - */ public class MultiRegionFunctionExecutor extends AbstractExecution { private final Set regions; @@ -210,7 +206,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution { LocalizedStrings.MemberFunctionExecutor_NO_MEMBER_FOUND_FOR_EXECUTING_FUNCTION_0 .toLocalizedString(function.getId())); } - final GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + final InternalCache cache = GemFireCacheImpl.getInstance(); if (function.optimizeForWrite() && cache != null && cache.getInternalResourceManager().getHeapMonitor().containsHeapCriticalMembers(dest) && !MemoryThresholds.isLowMemoryExceptionDisabled()) { @@ -218,7 +214,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution { Set sm = SetUtils.intersection(hcm, dest); throw new LowMemoryException( LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1 - .toLocalizedString(new Object[] {function.getId(), sm}), + .toLocalizedString(function.getId(), sm), sm); } setExecutionNodes(dest); @@ -243,7 +239,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution { Set regionPathSet = memberToRegionMap.get(localVM); Set regions = new HashSet(); if (regionPathSet != null) { - Cache cache1 = GemFireCacheImpl.getInstance(); + InternalCache cache1 = GemFireCacheImpl.getInstance(); for (String regionPath : regionPathSet) { regions.add(cache1.getRegion(regionPath)); } @@ -263,8 +259,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution { MultiRegionFunctionResultWaiter waiter = new MultiRegionFunctionResultWaiter(ds, localResultCollector, function, dest, memberArgs, resultSender, memberToRegionMap); - ResultCollector reply = waiter.getFunctionResultFrom(dest, function, this); - return reply; + return waiter.getFunctionResultFrom(dest, function, this); } return localResultCollector; } @@ -280,7 +275,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution { PartitionedRegion pr = (PartitionedRegion) region; Set prMembers = pr.getRegionAdvisor().advisePrimaryOwners(); if (pr.isDataStore()) { - GemFireCacheImpl cache = (GemFireCacheImpl) region.getCache(); + InternalCache cache = (InternalCache) region.getCache(); // Add local node InternalDistributedMember localVm = cache.getMyId(); Set regions = memberToRegions.get(localVm); @@ -334,7 +329,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution { memberToRegions.put(member, regions); } } else if (dp.withReplication()) { - GemFireCacheImpl cache = (GemFireCacheImpl) region.getCache(); + InternalCache cache = (InternalCache) region.getCache(); // Add local node InternalDistributedMember local = cache.getMyId(); Set regions = memberToRegions.get(local); @@ -345,7 +340,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution { memberToRegions.put(local, regions); } } else if (region instanceof LocalRegion) { - GemFireCacheImpl cache = (GemFireCacheImpl) region.getCache(); + InternalCache cache = (InternalCache) region.getCache(); // Add local node InternalDistributedMember local = cache.getMyId(); Set regions = memberToRegions.get(local); @@ -366,9 +361,9 @@ public class MultiRegionFunctionExecutor extends AbstractExecution { @Override public void validateExecution(Function function, Set targetMembers) { - GemFireCacheImpl cache = null; + InternalCache cache = null; for (Region r : regions) { - cache = (GemFireCacheImpl) r.getCache(); + cache = (InternalCache) r.getCache(); break; } if (cache != null && cache.getTxManager().getTXState() != null) { @@ -385,7 +380,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution { } else if (!target.equals(funcTarget)) { throw new TransactionDataNotColocatedException( LocalizedStrings.PartitionedRegion_TX_FUNCTION_EXECUTION_NOT_COLOCATED_0_1 - .toLocalizedString(new Object[] {target, funcTarget})); + .toLocalizedString(target, funcTarget)); } } } @@ -396,7 +391,7 @@ public class MultiRegionFunctionExecutor extends AbstractExecution { Set sm = SetUtils.intersection(hcm, targetMembers); throw new LowMemoryException( LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1 - .toLocalizedString(new Object[] {function.getId(), sm}), + .toLocalizedString(function.getId(), sm), sm); } } http://git-wip-us.apache.org/repos/asf/geode/blob/80bc00f0/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java index c7a7d36..6e13ebc 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java @@ -12,11 +12,9 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache.execute; import java.util.Iterator; -import java.util.NoSuchElementException; import java.util.Set; import org.apache.geode.cache.LowMemoryException; @@ -30,16 +28,12 @@ import org.apache.geode.cache.execute.ResultCollector; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.SetUtils; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.control.MemoryThresholds; import org.apache.geode.internal.i18n.LocalizedStrings; -/** - * - * - */ public class PartitionedRegionFunctionExecutor extends AbstractExecution { private final PartitionedRegion pr; @@ -332,16 +326,9 @@ public class PartitionedRegionFunctionExecutor extends AbstractExecution { return buf.toString(); } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.execute.AbstractExecution#validateExecution(org.apache.geode. - * cache.execute.Function, java.util.Set) - */ @Override public void validateExecution(Function function, Set targetMembers) { - GemFireCacheImpl cache = pr.getGemFireCache(); + InternalCache cache = pr.getGemFireCache(); if (cache != null && cache.getTxManager().getTXState() != null) { if (targetMembers.size() > 1) { throw new TransactionException( @@ -356,7 +343,7 @@ public class PartitionedRegionFunctionExecutor extends AbstractExecution { } else if (!target.equals(funcTarget)) { throw new TransactionDataRebalancedException( LocalizedStrings.PartitionedRegion_TX_FUNCTION_EXECUTION_NOT_COLOCATED_0_1 - .toLocalizedString(new Object[] {target, funcTarget})); + .toLocalizedString(target, funcTarget)); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/80bc00f0/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java index 3a20dc3..18ba32b 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java @@ -12,9 +12,12 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache.execute; +import java.util.Set; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.Region; import org.apache.geode.cache.client.internal.ProxyCache; @@ -26,28 +29,23 @@ import org.apache.geode.cache.execute.FunctionException; import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.cache.execute.ResultCollector; import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.TXStateProxyImpl; import org.apache.geode.internal.cache.execute.util.SynchronizedResultCollector; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; -import java.util.Set; - -import org.apache.logging.log4j.Logger; - /** - * * Executes Function with FunctionService#onRegion(Region region) in client server mode. * * @see FunctionService#onRegion(Region) * * @since GemFire 5.8 LA - * */ public class ServerRegionFunctionExecutor extends AbstractExecution { private static final Logger logger = LogService.getLogger(); - final private LocalRegion region; + private final LocalRegion region; private boolean executeOnBucketSet = false; public ServerRegionFunctionExecutor(Region r, ProxyCache proxyCache) { @@ -288,11 +286,12 @@ public class ServerRegionFunctionExecutor extends AbstractExecution { } return srp; } else { - StringBuffer message = new StringBuffer(); + StringBuilder message = new StringBuilder(); message.append(srp).append(": "); - message.append( - "No available connection was found. Server Region Proxy is not available for this region " - + region.getName()); + message + .append( + "No available connection was found. Server Region Proxy is not available for this region ") + .append(region.getName()); throw new FunctionException(message.toString()); } } @@ -340,16 +339,9 @@ public class ServerRegionFunctionExecutor extends AbstractExecution { return new ServerRegionFunctionExecutor(this, argument); } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.execute.AbstractExecution#validateExecution(org.apache.geode. - * cache.execute.Function, java.util.Set) - */ @Override public void validateExecution(Function function, Set targetMembers) { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null && cache.getTxManager().getTXState() != null) { TXStateProxyImpl tx = (TXStateProxyImpl) cache.getTxManager().getTXState(); tx.getRealDeal(null, region); @@ -357,7 +349,6 @@ public class ServerRegionFunctionExecutor extends AbstractExecution { } } - @Override public ResultCollector execute(final String functionName) { if (functionName == null) { @@ -472,6 +463,4 @@ public class ServerRegionFunctionExecutor extends AbstractExecution { public boolean getExecuteOnBucketSetFlag() { return this.executeOnBucketSet; } - - } http://git-wip-us.apache.org/repos/asf/geode/blob/80bc00f0/geode-core/src/main/java/org/apache/geode/internal/cache/execute/util/FindRestEnabledServersFunction.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/util/FindRestEnabledServersFunction.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/util/FindRestEnabledServersFunction.java index 13d8e18..f78de18 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/util/FindRestEnabledServersFunction.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/util/FindRestEnabledServersFunction.java @@ -21,18 +21,17 @@ import org.apache.geode.cache.execute.FunctionContext; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.InternalEntity; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.management.internal.RestAgent; /** * The FindRestEnabledServersFunction class is a gemfire function that gives details about REST * enabled gemfire servers. - *

* * @since GemFire 8.1 */ - public class FindRestEnabledServersFunction extends FunctionAdapter implements InternalEntity { + private static final long serialVersionUID = 7851518767859544678L; /** * This property defines internal function that will get executed on each node to fetch active @@ -40,20 +39,17 @@ public class FindRestEnabledServersFunction extends FunctionAdapter implements I */ public static final String FIND_REST_ENABLED_SERVERS_FUNCTION_ID = FindRestEnabledServersFunction.class.getName(); - private static final long serialVersionUID = 7851518767859544678L; - public void execute(FunctionContext context) { - try { - GemFireCacheImpl c = (GemFireCacheImpl) CacheFactory.getAnyInstance(); + InternalCache cache = (InternalCache) CacheFactory.getAnyInstance(); DistributionConfig config = InternalDistributedSystem.getAnyInstance().getConfig(); String bindAddress = RestAgent.getBindAddressForHttpService(config); final String protocolType = config.getHttpServiceSSLEnabled() ? "https" : "http"; - if (c.isRESTServiceRunning()) { + if (cache.isRESTServiceRunning()) { context.getResultSender() .lastResult(protocolType + "://" + bindAddress + ":" + config.getHttpServicePort()); @@ -62,7 +58,6 @@ public class FindRestEnabledServersFunction extends FunctionAdapter implements I } } catch (CacheClosedException ex) { context.getResultSender().lastResult(""); - } } http://git-wip-us.apache.org/repos/asf/geode/blob/80bc00f0/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java index 7ee5a8d..9c2ea23 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java @@ -14,8 +14,52 @@ */ package org.apache.geode.internal.cache.ha; -import org.apache.geode.*; -import org.apache.geode.cache.*; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.CancelException; +import org.apache.geode.InternalGemFireError; +import org.apache.geode.InternalGemFireException; +import org.apache.geode.StatisticsFactory; +import org.apache.geode.SystemFailure; +import org.apache.geode.cache.AttributesFactory; +import org.apache.geode.cache.CacheException; +import org.apache.geode.cache.CacheListener; +import org.apache.geode.cache.CacheWriterException; +import org.apache.geode.cache.CustomExpiry; +import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.EntryNotFoundException; +import org.apache.geode.cache.ExpirationAction; +import org.apache.geode.cache.ExpirationAttributes; +import org.apache.geode.cache.MirrorType; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.TimeoutException; import org.apache.geode.cache.query.internal.CqQueryVsdStats; import org.apache.geode.cache.query.internal.cq.CqService; @@ -30,9 +74,20 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe import org.apache.geode.internal.Assert; import org.apache.geode.internal.DataSerializableFixedID; import org.apache.geode.internal.Version; -import org.apache.geode.internal.cache.*; -import org.apache.geode.internal.cache.tier.sockets.*; +import org.apache.geode.internal.cache.CacheServerImpl; +import org.apache.geode.internal.cache.Conflatable; +import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.HARegion; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.RegionQueue; +import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier; +import org.apache.geode.internal.cache.tier.sockets.ClientMarkerMessageImpl; +import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; +import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessage; +import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl; import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl.CqNameToOp; +import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper; +import org.apache.geode.internal.cache.tier.sockets.HandShake; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LocalizedMessage; @@ -41,17 +96,6 @@ import org.apache.geode.internal.util.concurrent.StoppableCondition; import org.apache.geode.internal.util.concurrent.StoppableReentrantLock; import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock; import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock.StoppableWriteLock; -import org.apache.logging.log4j.Logger; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.*; /** * An implementation of Queue using Gemfire Region as the underlying datastructure. The key will be @@ -72,26 +116,23 @@ import java.util.concurrent.locks.*; * * 30 May 2008: 5.7 onwards the underlying GemFire Region will continue to have key as counter(long) * but the value will be a wrapper object(HAEventWrapper) which will be a key in a separate data - * strucure called haContainer (an implementation of Map). The value against this wrapper will be + * structure called haContainer (an implementation of Map). The value against this wrapper will be * the offered object in the queue. The purpose of this modification is to allow multiple * ha-region-queues share their offered values without storing separate copies in memory, upon GII. * * (See BlockingHARegionQueue) * - * * @since GemFire 4.3 - * */ public class HARegionQueue implements RegionQueue { private static final Logger logger = LogService.getLogger(); - /** The Region backing this queue */ + /** The {@code Region} backing this queue */ protected HARegion region; /** - * The key into the Region used when putting entries onto the queue. The counter uses + * The key into the {@code Region} used when putting entries onto the queue. The counter uses * incrementAndGet so counter will always be started from 1 - * */ protected final AtomicLong tailKey = new AtomicLong(0); @@ -100,26 +141,21 @@ public class HARegionQueue implements RegionQueue { * object. Every add operation will be identified by the ThreadIdentifier object & the position * recorded in the LastDispatchedAndCurrentEvents object. */ - protected final ConcurrentMap eventsMap = new ConcurrentHashMap(); /** - * The Map mapping the regionName->key to the queue key. This index allows fast - * updating of entries in the queue for conflation. + * The {@code Map} mapping the regionName->key to the queue key. This index allows fast updating + * of entries in the queue for conflation. */ protected volatile Map indexes = Collections.unmodifiableMap(new HashMap()); - // TODO:Asif: Should we worry about whether to some how make it writer - // preference? - /** Lock object for updating the queue size by different operations */ - // private final Object SIZE_LOCK = new Object(); private final StoppableReentrantReadWriteLock rwLock; private final StoppableReentrantReadWriteLock.StoppableReadLock readLock; private final StoppableWriteLock writeLock; - /** The name of the Region backing this queue */ + /** The name of the {@code Region} backing this queue */ private final String regionName; /** The ClientProxyMembershipID associated with the ha queue */ @@ -151,10 +187,7 @@ public class HARegionQueue implements RegionQueue { * A sequence violation can occur , if an HARegionQueue receives events thru GII & the same event * also arrives via Gemfire Put in that local VM. If the HARegionQueue does not receive any data * via GII , then there should not be any violation. If there is data arriving thru GII, such - * voiolations can be expected , but should be analyzed thoroughly. - * - *

- * author Asif + * violations can be expected , but should be analyzed thoroughly. */ protected boolean puttingGIIDataInQueue; @@ -166,14 +199,12 @@ public class HARegionQueue implements RegionQueue { /** * a thread local to store the counters corresponding to the events peeked by a particular thread. - * When remove() will be called, these events stored in thread-local will be - * destroyed. + * When {@code remove()} will be called, these events stored in thread-local will be destroyed. */ protected static final ThreadLocal peekedEventsContext = new ThreadLocal(); /** - * Thread which creates the QueueRemovalMessage and sends it to other nodes in the - * system + * Thread which creates the {@code QueueRemovalMessage} and sends it to other nodes in the system */ private static QueueRemovalThread qrmThread; @@ -284,30 +315,18 @@ public class HARegionQueue implements RegionQueue { protected long maxQueueSizeHitCount = 0; /** - * * Processes the given string and returns a string which is allowed for region names * - * @param regionName * @return legal region name */ public static String createRegionName(String regionName) { - String result = regionName.replace('/', '#'); // [yogi]: region name cannot - // contain the separator '/' - return result; + return regionName.replace('/', '#'); } /** - * - * @param regionName - * @param cache * @param isPrimary whether this is the primary queue for a client - * @throws IOException - * @throws ClassNotFoundException - * @throws CacheException - * @throws InterruptedException */ - - protected HARegionQueue(String regionName, GemFireCacheImpl cache, + protected HARegionQueue(String regionName, InternalCache cache, HARegionQueueAttributes haAttributes, Map haContainer, ClientProxyMembershipID clientProxyId, final byte clientConflation, boolean isPrimary) throws IOException, ClassNotFoundException, CacheException, InterruptedException { @@ -332,13 +351,13 @@ public class HARegionQueue implements RegionQueue { this.readLock = this.rwLock.readLock(); this.writeLock = this.rwLock.writeLock(); - this.putGIIDataInRegion(); + putGIIDataInRegion(); if (this.getClass() == HARegionQueue.class) { initialized.set(true); } } - private void createHARegion(String processedRegionName, GemFireCacheImpl cache) + private void createHARegion(String processedRegionName, InternalCache cache) throws IOException, ClassNotFoundException { AttributesFactory af = new AttributesFactory(); af.setMirrorType(MirrorType.KEYS_VALUES); @@ -358,7 +377,7 @@ public class HARegionQueue implements RegionQueue { * reinitialize the queue, presumably pulling current information from seconaries */ public void reinitializeRegion() { - GemFireCacheImpl cache = this.region.getCache(); + InternalCache cache = this.region.getCache(); String regionName = this.region.getName(); this.region.destroyRegion(); Exception problem = null; @@ -412,7 +431,7 @@ public class HARegionQueue implements RegionQueue { // use putIfAbsent to avoid overwriting newer dispatch information Object o = this.eventsMap.putIfAbsent(entry.getKey(), giiDace); if (o != null && isDebugEnabled_BS) { - sb.append(" -- could not store. found " + o); + sb.append(" -- could not store. found ").append(o); } } } @@ -425,11 +444,8 @@ public class HARegionQueue implements RegionQueue { * Repopulates the HARegion after the GII is over so as to reset the counters and populate the * DACE objects for the thread identifiers . This method should be invoked as the last method in * the constructor . Thus while creating BlockingQueue this method should be invoked lastly in the - * derived class contructor , after the HARegionQueue contructor is complete. Otherwise, the + * derived class constructor , after the HARegionQueue contructor is complete. Otherwise, the * ReentrantLock will be null. - * - * @throws CacheException - * @throws InterruptedException */ void putGIIDataInRegion() throws CacheException, InterruptedException { Set entrySet = this.region.entries(false); @@ -498,8 +514,6 @@ public class HARegionQueue implements RegionQueue { * Puts the GII'd entry into the ha region, if it was GII'd along with its ClientUpdateMessageImpl * instance. * - * @param val - * @throws InterruptedException * @since GemFire 5.7 */ protected void putInQueue(Object val) throws InterruptedException { @@ -507,7 +521,7 @@ public class HARegionQueue implements RegionQueue { if (logger.isDebugEnabled()) { logger.debug( "HARegionQueue.putGIIDataInRegion(): key={} was removed at sender side, so not putting it into the ha queue.", - ((HAEventWrapper) val).getKeyToConflate()); + ((Conflatable) val).getKeyToConflate()); } } else { this.put(val); @@ -567,9 +581,6 @@ public class HARegionQueue implements RegionQueue { * object & SIZE Lock * * @param object object to put onto the queue - * @throws InterruptedException - * @throws CacheException - * @return boolean */ public boolean put(Object object) throws CacheException, InterruptedException { this.giiLock.readLock().lock(); // fix for bug #41681 - durable client misses event @@ -663,11 +674,9 @@ public class HARegionQueue implements RegionQueue { dace = oldDace; } else { // Add the recently added ThreadIdentifier to the RegionQueue for expiry - this.region.put(ti, Long.valueOf(dace.lastDispatchedSequenceId)); + this.region.put(ti, dace.lastDispatchedSequenceId); // update the stats - // if (logger.isDebugEnabled()) { this.stats.incThreadIdentifiers(); - // } } if (!dace.putObject(event, sequenceID)) { this.put(object); @@ -677,11 +686,6 @@ public class HARegionQueue implements RegionQueue { } } } - // update the stats - // if (logger.isDebugEnabled()) { - // this.stats.incEventsEnqued(); - // } - } /** @@ -691,7 +695,7 @@ public class HARegionQueue implements RegionQueue { */ public void startGiiQueueing() { this.giiLock.writeLock().lock(); - this.giiCount++; + this.giiCount++; // TODO: non-atomic operation on volatile! if (logger.isDebugEnabled()) { logger.debug("{}: startGiiQueueing count is now {}", this.region.getName(), this.giiCount); } @@ -710,7 +714,7 @@ public class HARegionQueue implements RegionQueue { this.giiLock.writeLock().lock(); final boolean isDebugEnabled = logger.isDebugEnabled(); try { - this.giiCount--; + this.giiCount--; // TODO: non-atomic operation on volatile! if (isDebugEnabled) { logger.debug("{}: endGiiQueueing count is now {}", this.region.getName(), this.giiCount); } @@ -731,15 +735,7 @@ public class HARegionQueue implements RegionQueue { Object value; try { value = this.giiQueue.remove(); - } catch (NoSuchElementException e) { - // if (actualCount != expectedCount) { - // logger.severe(LocalizedStrings.DEBUG, "expected to drain " - // + expectedCount + " messages but drained " + actualCount - // + " queue.size() is now " + giiQueue.size() + ", in queue" + this, e); - // } else { - // logger.severe(LocalizedStrings.DEBUG, "drained " + actualCount + " messages. Queue - // size is " + giiQueue.size() + " in " + this); - // } + } catch (NoSuchElementException ignore) { break; } actualCount++; @@ -765,13 +761,11 @@ public class HARegionQueue implements RegionQueue { if (value instanceof HAEventWrapper) { decAndRemoveFromHAContainer((HAEventWrapper) value); } - } catch (NoSuchElementException e) { + } catch (NoSuchElementException ignore) { break; - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { // complete draining while holding the write-lock so nothing else // can get into the queue - // logger.severe(LocalizedStrings.DEBUG, "endGiiQueueing interrupted - ignoring until - // draining completes"); interrupted = true; } } @@ -804,19 +798,19 @@ public class HARegionQueue implements RegionQueue { */ public Map getEventMapForGII() { // fix for bug #41621 - concurrent modification exception while serializing event map - Map events = this.eventsMap; final boolean isDebugEnabled = logger.isDebugEnabled(); do { HashMap result = new HashMap(); try { - for (Map.Entry entry : events.entrySet()) { + for (Map.Entry entry : ((Map) this.eventsMap) + .entrySet()) { if (entry.getValue().isCountersEmpty()) { result.put(entry.getKey(), entry.getValue()); } } return result; - } catch (ConcurrentModificationException e) { // TODO:WTF: bad practice but eventsMap is - // ConcurrentHashMap + } catch (ConcurrentModificationException ignore) { + // TODO:WTF: bad practice but eventsMap is ConcurrentHashMap if (isDebugEnabled) { logger.debug( "HARegion encountered concurrent modification exception while analysing event state - will try again"); @@ -826,9 +820,7 @@ public class HARegionQueue implements RegionQueue { } /** - * Implementation in BlokcingHARegionQueue class - * - * @throws InterruptedException + * Implementation in BlockingHARegionQueue class */ void checkQueueSizeConstraint() throws InterruptedException { if (Thread.interrupted()) @@ -846,13 +838,11 @@ public class HARegionQueue implements RegionQueue { /** * Creates the static dispatchedMessagesMap (if not present) and starts the QueuRemovalThread if * not running - * */ - public static synchronized void startHAServices(GemFireCacheImpl c) { - + static synchronized void startHAServices(InternalCache cache) { if (qrmThread == null) { dispatchedMessagesMap = new ConcurrentHashMap(); - qrmThread = new QueueRemovalThread(c); + qrmThread = new QueueRemovalThread(cache); qrmThread.setName("Queue Removal Thread"); qrmThread.start(); } @@ -913,20 +903,16 @@ public class HARegionQueue implements RegionQueue { } } Object key = event.getKeyToConflate(); - Long previousPosition = (Long) latestIndexesForRegion.put(key, newPosition); - return previousPosition; - + return (Long) latestIndexesForRegion.put(key, newPosition); } /** - * * Creates and returns a ConcurrentMap. This method is over-ridden in test classes to test some * functionality * * @return new ConcurrentMap */ ConcurrentMap createConcurrentMap() { - return new ConcurrentHashMap(); } @@ -948,7 +934,7 @@ public class HARegionQueue implements RegionQueue { // if (!HARegionQueue.this.isPrimary()) { HARegionQueue.this.expireTheEventOrThreadIdentifier(event); // } - } catch (CancelException e) { + } catch (CancelException ignore) { // ignore, we're done } catch (CacheException ce) { if (!destroyInProgress) { @@ -967,11 +953,8 @@ public class HARegionQueue implements RegionQueue { * overridden function createCacheListenerForHARegion of the HARegionQueueJUnitTest class for the * test testConcurrentEventExpiryAndTake. This function provides the meaningful functionality for * expiry of the Event object as well as ThreadIdentifier - *

- * author Asif - * + * * @param event event object representing the data being expired - * @throws CacheException */ void expireTheEventOrThreadIdentifier(EntryEvent event) throws CacheException { final boolean isDebugEnabled = logger.isDebugEnabled(); @@ -980,11 +963,6 @@ public class HARegionQueue implements RegionQueue { "HARegionQueue::afterInvalidate. Entry Event being invalidated:{}, isPrimaryQueue:{}", event, HARegionQueue.this.isPrimary()); } - // if (HARegionQueue.this.isPrimary()) { - // logger.info(LocalizedStrings.DEBUG, - // "HARegionQueue: Entry Event being invalidated =" - // + event+", after current queue became primary."); - // } Object key = event.getKey(); if (key instanceof ThreadIdentifier) { // Check if the sequenceID present as value against this key is same @@ -998,7 +976,7 @@ public class HARegionQueue implements RegionQueue { (DispatchedAndCurrentEvents) HARegionQueue.this.eventsMap.get(key); Assert.assertTrue(dace != null); Long expirySequenceID = (Long) event.getOldValue(); - boolean expired = dace.expireOrUpdate(expirySequenceID.longValue(), (ThreadIdentifier) key); + boolean expired = dace.expireOrUpdate(expirySequenceID, (ThreadIdentifier) key); if (isDebugEnabled) { logger.debug( "HARegionQueue::afterInvalidate:Size of the region after expiring or updating the ThreadIdentifier={}", @@ -1028,21 +1006,18 @@ public class HARegionQueue implements RegionQueue { /** * This method adds the position of newly added object to the List of available IDs so that it is - * avaialble for peek or take. This method is called from DispatchedAndCurrentEvents object. This + * available for peek or take. This method is called from DispatchedAndCurrentEvents object. This * method is invoked in a write lock for non blocking queue & in a reentrant lock in a blocking - * queue. In case of blokcing queue , this method also signals the waiting take & peek threads to + * queue. In case of blocking queue , this method also signals the waiting take & peek threads to * awake. - *

- * author Asif - * + * * @param position The Long position of the object which has been added - * @throws InterruptedException */ void publish(Long position) throws InterruptedException { acquireWriteLock(); try { this.idsAvailable.add(position); - // Asif:Notify the wiating peek threads or take threads of blocking queue + // Notify the waiting peek threads or take threads of blocking queue // A void operation for the non blocking queue operations notifyPeekAndTakeThreads(); } finally { @@ -1055,12 +1030,8 @@ public class HARegionQueue implements RegionQueue { } /** - * - *

- * author Asif - * * @param position Long value present in the Available IDs map against which Event object is - * present in HARegion. This function is directly ivnoked from the basicInvalidate function + * present in HARegion. This function is directly invoked from the basicInvalidate function * where expiry is aborted if this function returns false * @return boolean true if the position could be removed from the Set * @throws InterruptedException * @@ -1087,12 +1058,10 @@ public class HARegionQueue implements RegionQueue { * in the AvailableID Set. If the position existed in the Set, then only it is removed from the * Set & the underlying Region * - * @param position Long poistion counter for entry in the Region + * @param position Long position counter for entry in the Region * @return true if the entry with
* position
* specified was removed from the Set - * @throws InterruptedException - * */ protected boolean destroyFromAvailableIDsAndRegion(Long position) throws InterruptedException { boolean removedOK = this.destroyFromAvailableIDs(position); @@ -1100,9 +1069,7 @@ public class HARegionQueue implements RegionQueue { if (removedOK) { try { this.destroyFromQueue(position); - } catch (EntryNotFoundException enfe) { - // if (!this.region.isDestroyed()) { - // if (!HARegionQueue.this.destroyInProgress || !this.region.isDestroyed()) { + } catch (EntryNotFoundException ignore) { if (!HARegionQueue.this.destroyInProgress) { if (!this.region.isDestroyed()) { Assert.assertTrue(false, "HARegionQueue::remove: The position " + position @@ -1128,7 +1095,7 @@ public class HARegionQueue implements RegionQueue { maintainCqStats(event, -1); } - /** Returns the toString for this RegionQueue object */ + /** Returns the {@code toString} for this RegionQueue object */ @Override public String toString() { return "RegionQueue on " + this.regionName + "(" + (this.isPrimary ? "primary" : "backup") @@ -1144,12 +1111,9 @@ public class HARegionQueue implements RegionQueue { /** - * This method is inoked by the take function . For non blocking queue it returns null or a valid + * This method is invoked by the take function . For non blocking queue it returns null or a valid * long position while for blocking queue it waits for data in the queue or throws Exception if * the thread encounters exception while waiting. - * - * @throws CacheException - * @throws InterruptedException */ protected Long getAndRemoveNextAvailableID() throws InterruptedException { Long next = null; @@ -1177,14 +1141,9 @@ public class HARegionQueue implements RegionQueue { /** * Returns the next position counter present in idsAvailable set. This method is invoked by the * peek function. In case of BlockingQueue, this method waits till a valid ID is available. - * - *

- * author Asif - * + * * @return valid Long poistion or null depending upon the nature of the queue - * @throws InterruptedException * @throws TimeoutException if operation is interrupted (unfortunately) - * */ private Long getNextAvailableID() throws InterruptedException { Long next = null; @@ -1208,7 +1167,6 @@ public class HARegionQueue implements RegionQueue { * For non blocking queue , this method either returns null or an Object. For blocking queue it * will always return with an Object or wait for queue to be populated. * - * @throws InterruptedException * @throws CacheException The exception can be thrown by BlockingQueue if it encounters * InterruptedException while waiting for data */ @@ -1281,8 +1239,6 @@ public class HARegionQueue implements RegionQueue { /** * Removes the events that were peeked by this thread. The events are destroyed from the queue and * conflation map and DispatchedAndCurrentEvents are updated accordingly. - * - * @throws InterruptedException */ public void remove() throws InterruptedException { List peekedIds = (List) HARegionQueue.peekedEventsContext.get(); @@ -1324,10 +1280,10 @@ public class HARegionQueue implements RegionQueue { List countersList; if ((countersList = (List) groupedThreadIDs.get(threadid)) != null) { countersList.add(info); - countersList.set(0, Long.valueOf(sequenceId)); + countersList.set(0, sequenceId); } else { countersList = new ArrayList(); - countersList.add(Long.valueOf(sequenceId)); + countersList.add(sequenceId); countersList.add(info); groupedThreadIDs.put(threadid, countersList); } @@ -1344,7 +1300,7 @@ public class HARegionQueue implements RegionQueue { Map.Entry element = (Map.Entry) iter.next(); ThreadIdentifier tid = (ThreadIdentifier) element.getKey(); List removedEvents = (List) element.getValue(); - long lastDispatchedId = ((Long) removedEvents.remove(0)).longValue(); + long lastDispatchedId = (Long) removedEvents.remove(0); DispatchedAndCurrentEvents dace = (DispatchedAndCurrentEvents) this.eventsMap.get(tid); if (dace != null && dace.lastDispatchedSequenceId < lastDispatchedId) { try { @@ -1387,7 +1343,7 @@ public class HARegionQueue implements RegionQueue { if (next == null) { break; } - } catch (TimeoutException te) { + } catch (TimeoutException ignore) { throw new InterruptedException(); } object = (Conflatable) this.region.get(next); @@ -1459,8 +1415,6 @@ public class HARegionQueue implements RegionQueue { * @param timeToWait The number of milliseconds to attempt to peek * * @return The list of events peeked - * @throws InterruptedException - * */ public List peek(int batchSize, int timeToWait) throws InterruptedException { long start = System.currentTimeMillis(); @@ -1506,7 +1460,7 @@ public class HARegionQueue implements RegionQueue { boolean interrupted = Thread.interrupted(); try { Thread.sleep(50); // TODO this seems kinda busy IMNSHO -- jason - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { interrupted = true; this.region.getCancelCriterion().checkCancelInProgress(null); } finally { @@ -1520,11 +1474,10 @@ public class HARegionQueue implements RegionQueue { /** * This method prepares the batch of events and updates the thread-context with corresponding * counters, so that when remove is called by this thread, these events are destroyed from the - * queue.This method should always be invoked within the rwLock. + * queue.This method should always be invoked within the {@code rwLock}. * * @param batchSize - number of events to be peeked * @return - list of events peeked - * @throws CacheException */ private List getBatchAndUpdateThreadContext(int batchSize) { Iterator itr = this.idsAvailable.iterator(); @@ -1559,26 +1512,22 @@ public class HARegionQueue implements RegionQueue { } public void addCacheListener(CacheListener listener) { - // TODO Auto-generated method stub - + // nothing } public void removeCacheListener() { - // TODO Auto-generated method stub + // nothing } /** * It adds the entry to a static data structure dispatchedMessagesMap which is periodically * operated upon by the QRM thread. - * - *

- * author Asif - * + * * @param tid - the ThreadIdentifier object for this event * @param sequenceId - the sequence id for this event */ public void addDispatchedMessage(ThreadIdentifier tid, long sequenceId) { - Long lastSequenceNumber = Long.valueOf(sequenceId); + Long lastSequenceNumber = sequenceId; boolean wasEmpty = false; Long oldvalue = null; Map internalMap = null; @@ -1734,16 +1683,10 @@ public class HARegionQueue implements RegionQueue { * creates a DACE. Only one QRM operates at a time on a DACE & any other mesasge will be waiting * for the current thread to exit. This is accomplished by taking a lock on QRM_LOCK object in the * DACE. - * - *

- * author Asif - * + * * @param lastDispatched EventID containing the ThreadIdentifier and the last dispatched sequence * Id - * @throws CacheException - * @throws InterruptedException */ - void removeDispatchedEvents(EventID lastDispatched) throws CacheException, InterruptedException { ThreadIdentifier ti = getThreadIdentifier(lastDispatched); long sequenceID = lastDispatched.getSequenceID(); @@ -1764,11 +1707,9 @@ public class HARegionQueue implements RegionQueue { } else { // Add the recently added ThreadIdentifier to the RegionQueue for // expiry - this.region.put(ti, Long.valueOf(dace.lastDispatchedSequenceId)); + this.region.put(ti, dace.lastDispatchedSequenceId); // update the stats - // if (logger.isDebugEnabled()) { this.stats.incThreadIdentifiers(); - // } } } } @@ -1778,7 +1719,6 @@ public class HARegionQueue implements RegionQueue { * * @return the size of the queue */ - public int size() { acquireReadLock(); try { @@ -1788,12 +1728,8 @@ public class HARegionQueue implements RegionQueue { } } - void decrementTakeSidePutPermits() { - - } - void incrementTakeSidePutPermits() { - + // nothing } // called from dace on a put. @@ -1929,13 +1865,8 @@ public class HARegionQueue implements RegionQueue { /** * Always returns false for a HARegionQueue class. Suitably overridden in BlockingHARegionQueue * class. - * - *

- * author Asif - * + * * @return false for HAREgionQueue as this is a non blocking class - * @throws InterruptedException - * */ boolean waitForData() throws InterruptedException { return false; @@ -1976,12 +1907,8 @@ public class HARegionQueue implements RegionQueue { * @param cache Gemfire Cache instance * @param haRgnQType int identifying whether the HARegionQueue is of type blocking or non blocking * @return an instance of HARegionQueue - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - * @throws CacheException */ - public static HARegionQueue getHARegionQueueInstance(String regionName, Cache cache, + public static HARegionQueue getHARegionQueueInstance(String regionName, InternalCache cache, final int haRgnQType, final boolean isDurable) throws IOException, ClassNotFoundException, CacheException, InterruptedException { Map container = null; @@ -1993,7 +1920,7 @@ public class HARegionQueue implements RegionQueue { container = new HashMap(); } - return getHARegionQueueInstance(regionName, (GemFireCacheImpl) cache, + return getHARegionQueueInstance(regionName, cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES, haRgnQType, isDurable, container, null, HandShake.CONFLATION_DEFAULT, false, Boolean.FALSE); } @@ -2009,12 +1936,8 @@ public class HARegionQueue implements RegionQueue { * @param isPrimary whether this is the primary queue for the client * @param canHandleDelta boolean indicating whether the HARegionQueue can handle delta or not * @return an instance of HARegionQueue - * @throws IOException - * @throws ClassNotFoundException - * @throws CacheException - * @throws InterruptedException */ - public static HARegionQueue getHARegionQueueInstance(String regionName, GemFireCacheImpl cache, + public static HARegionQueue getHARegionQueueInstance(String regionName, InternalCache cache, HARegionQueueAttributes hrqa, final int haRgnQType, final boolean isDurable, Map haContainer, ClientProxyMembershipID clientProxyId, final byte clientConflation, boolean isPrimary, boolean canHandleDelta) @@ -2038,12 +1961,11 @@ public class HARegionQueue implements RegionQueue { default: throw new IllegalArgumentException( LocalizedStrings.HARegionQueue_HARGNQTYPE_CAN_EITHER_BE_BLOCKING_0_OR_NON_BLOCKING_1 - .toLocalizedString(new Object[] {Integer.valueOf(BLOCKING_HA_QUEUE), - Integer.valueOf(NON_BLOCKING_HA_QUEUE)})); + .toLocalizedString(new Object[] {BLOCKING_HA_QUEUE, NON_BLOCKING_HA_QUEUE})); } if (!isDurable) { Integer expiryTime = Integer.getInteger(REGION_ENTRY_EXPIRY_TIME, hrqa.getExpiryTime()); - hrqa.setExpiryTime(expiryTime.intValue()); + hrqa.setExpiryTime(expiryTime); ExpirationAttributes ea = new ExpirationAttributes(hrqa.getExpiryTime(), ExpirationAction.LOCAL_INVALIDATE); hrq.region.getAttributesMutator().setEntryTimeToLive(ea); @@ -2054,19 +1976,10 @@ public class HARegionQueue implements RegionQueue { /** * Creates a HARegionQueue object with default attributes. used by tests * - * @param regionName - * @param cache - * @param hrqa - * @param haRgnQType - * @param isDurable * @return an instance of HARegionQueue - * @throws IOException - * @throws ClassNotFoundException - * @throws CacheException - * @throws InterruptedException * @since GemFire 5.7 */ - public static HARegionQueue getHARegionQueueInstance(String regionName, Cache cache, + public static HARegionQueue getHARegionQueueInstance(String regionName, InternalCache cache, HARegionQueueAttributes hrqa, final int haRgnQType, final boolean isDurable) throws IOException, ClassNotFoundException, CacheException, InterruptedException { Map container = null; @@ -2078,8 +1991,8 @@ public class HARegionQueue implements RegionQueue { container = new HashMap(); } - return getHARegionQueueInstance(regionName, (GemFireCacheImpl) cache, hrqa, haRgnQType, - isDurable, container, null, HandShake.CONFLATION_DEFAULT, false, Boolean.FALSE); + return getHARegionQueueInstance(regionName, cache, hrqa, haRgnQType, isDurable, container, null, + HandShake.CONFLATION_DEFAULT, false, Boolean.FALSE); } public boolean isEmptyAckList() { @@ -2122,7 +2035,7 @@ public class HARegionQueue implements RegionQueue { if (this.destroyFromAvailableIDsAndRegion(counter)) { stats.incEventsRemoved(); } - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } } @@ -2134,18 +2047,14 @@ public class HARegionQueue implements RegionQueue { } } - - /** - * This is an implemention of RegionQueue where peek() & take () are blocking operation and will + * This is an implementation of RegionQueue where peek() & take () are blocking operation and will * not return unless it gets some legitimate value The Lock object used by this class is a * ReentrantLock & not a ReadWriteLock as in the base class. This reduces the concurrency of peek * operations, but it enables the condition object of the ReentrantLock used to guard the * idsAvailable Set for notifying blocking peek & take operations. Previously a separate Lock * object was used by the BlockingQueue for wait notify. This class will be performant if there is * a single peek thread. - * - * */ private static class BlockingHARegionQueue extends HARegionQueue { /** @@ -2180,19 +2089,11 @@ public class HARegionQueue implements RegionQueue { protected final StoppableCondition blockCond; /** - * - * @param regionName - * @param cache * @param hrqa HARegionQueueAttributes through which expiry time etc for the HARegionQueue can * be set - * @param haContainer * @param isPrimary whether this is the primary queue for a client - * @throws IOException TODO-javadocs - * @throws ClassNotFoundException TODO-javadocs - * @throws CacheException TODO-javadocs - * @throws InterruptedException */ - protected BlockingHARegionQueue(String regionName, GemFireCacheImpl cache, + protected BlockingHARegionQueue(String regionName, InternalCache cache, HARegionQueueAttributes hrqa, Map haContainer, ClientProxyMembershipID clientProxyId, final byte clientConflation, boolean isPrimary) throws IOException, ClassNotFoundException, CacheException, InterruptedException { @@ -2231,12 +2132,9 @@ public class HARegionQueue implements RegionQueue { * This effectively makes the blocking queue behave like a non-blocking queue which throttles * puts if it reaches its capacity. This was changed in 8.1, see #51400. This function is NOOP * in the HARegionQueue. - * - *

- * author Asif */ @Override - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "TLW_TWO_LOCK_WAIT") + @edu.umd.cs.findbugs.annotations.SuppressWarnings("TLW_TWO_LOCK_WAIT") void checkQueueSizeConstraint() throws InterruptedException { if (this.haContainer instanceof HAContainerMap && isPrimary()) { // Fix for bug 39413 if (Thread.interrupted()) @@ -2261,8 +2159,8 @@ public class HARegionQueue implements RegionQueue { this.maxQueueSizeHitCount = 0; } ++this.maxQueueSizeHitCount; - // for (;;) { this.region.checkReadiness(); // fix for bug 37581 + // TODO: wait called while holding two locks this.permitMon.wait(CacheClientNotifier.eventEnqueueWaitTime); this.region.checkReadiness(); // fix for bug 37581 // Fix for #51400. Allow the queue to grow beyond its @@ -2270,15 +2168,13 @@ public class HARegionQueue implements RegionQueue { // drain the queue, either due to a slower client or the // deadlock scenario mentioned in the ticket. reconcilePutPermits(); - // } if ((this.maxQueueSizeHitCount % logFrequency) == 1) { logger.info(LocalizedMessage .create(LocalizedStrings.HARegionQueue_RESUMING_WITH_PROCESSING_PUTS)); } } catch (InterruptedException ex) { - // TODO:Asif: The line below is meaningless. Comment it out - // later - this.permitMon.notify(); + // TODO: The line below is meaningless. Comment it out later + this.permitMon.notifyAll(); throw ex; } } @@ -2292,14 +2188,10 @@ public class HARegionQueue implements RegionQueue { /** * This function should always be called under a lock on putGuard & permitMon obejct - * - *

- * author Asif - * - * @return int currnet Put permits + * + * @return int current Put permits */ private int reconcilePutPermits() { - putPermits += takeSidePutPermits; takeSidePutPermits = 0; return putPermits; @@ -2312,10 +2204,6 @@ public class HARegionQueue implements RegionQueue { * added in case a put operation which has reduced the put permit optmistically but due to some * reason ( most likely because of duplicate event) was not added in the queue. In such case it * will increment take side permit without notifying any waiting thread - * - *

- * author Asif - * */ @Override void incrementTakeSidePutPermitsWithoutNotify() { @@ -2328,24 +2216,19 @@ public class HARegionQueue implements RegionQueue { * Implemented to reduce contention between concurrent take/remove operations and put . The * reconciliation between take side put permits & put side put permits happens only if theput * side put permits are exhausted. In HARehionQueue base class this is a NOOP function - * - *

- * author Asif - * */ @Override void incrementTakeSidePutPermits() { if (this.haContainer instanceof HAContainerMap && isPrimary()) { // Fix for bug 39413 synchronized (this.permitMon) { ++this.takeSidePutPermits; - this.permitMon.notify(); + this.permitMon.notifyAll(); } } } /** * Identical to the acquireReadLock as there is only one type of Lock object in this class. - * */ @Override void acquireWriteLock() { @@ -2378,8 +2261,6 @@ public class HARegionQueue implements RegionQueue { * acquiring the lock on ReentrantLock object. It blocks the thread if the queue is empty or * returns true otherwise . This will always return true indicating that data is available for * retrieval or throw an Exception.It can never return false. - * - * @throws InterruptedException */ @Override boolean waitForData() throws InterruptedException { @@ -2443,7 +2324,7 @@ public class HARegionQueue implements RegionQueue { LinkedList unremovedElements = null; HashMap currDurableMap = null; - protected DurableHARegionQueue(String regionName, GemFireCacheImpl cache, + protected DurableHARegionQueue(String regionName, InternalCache cache, HARegionQueueAttributes hrqa, Map haContainer, ClientProxyMembershipID clientProxyId, final byte clientConflation, boolean isPrimary) throws IOException, ClassNotFoundException, CacheException, InterruptedException { @@ -2604,11 +2485,11 @@ public class HARegionQueue implements RegionQueue { this.releaseWriteLock(); } } - /** - * ashetkar: Setting this threadlocal variable to null has no use as the current thread never - * uses it. Instead it should really be set null by message dispatcher thread while starting - * or resuming. This was added in revision 20914. Need to check if it really needs to be - * thread local. + /* + * Setting this threadlocal variable to null has no use as the current thread never uses it. + * Instead it should really be set null by message dispatcher thread while starting or + * resuming. This was added in revision 20914. Need to check if it really needs to be thread + * local. */ peekedEventsContext.set(null); this.threadIdToSeqId.list.clear(); @@ -2675,38 +2556,27 @@ public class HARegionQueue implements RegionQueue { * bridge between the user defined HARegionQueue class & the actual class. This class object will * be buggy as it will tend to publish the Object o QRM thread & the expiry thread before the * complete creation of the HARegionQueue instance - * - *

- * author Asif - * */ static class TestOnlyHARegionQueue extends HARegionQueue { /** * Overloaded constructor to accept haContainer. * - * @param regionName - * @param cache - * @param haContainer - * @throws IOException - * @throws ClassNotFoundException - * @throws CacheException - * @throws InterruptedException * @since GemFire 5.7 */ - TestOnlyHARegionQueue(String regionName, Cache cache, Map haContainer) + TestOnlyHARegionQueue(String regionName, InternalCache cache, Map haContainer) throws IOException, ClassNotFoundException, CacheException, InterruptedException { - this(regionName, (GemFireCacheImpl) cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES, - haContainer, HandShake.CONFLATION_DEFAULT, false); + this(regionName, cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES, haContainer, + HandShake.CONFLATION_DEFAULT, false); this.initialized.set(true); } - TestOnlyHARegionQueue(String regionName, Cache cache) + TestOnlyHARegionQueue(String regionName, InternalCache cache) throws IOException, ClassNotFoundException, CacheException, InterruptedException { - this(regionName, (GemFireCacheImpl) cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES, - new HashMap(), HandShake.CONFLATION_DEFAULT, false); + this(regionName, cache, HARegionQueueAttributes.DEFAULT_HARQ_ATTRIBUTES, new HashMap(), + HandShake.CONFLATION_DEFAULT, false); } - TestOnlyHARegionQueue(String regionName, GemFireCacheImpl cache, HARegionQueueAttributes hrqa, + TestOnlyHARegionQueue(String regionName, InternalCache cache, HARegionQueueAttributes hrqa, Map haContainer, final byte clientConflation, boolean isPrimary) throws IOException, ClassNotFoundException, CacheException, InterruptedException { super(regionName, cache, hrqa, haContainer, null, clientConflation, isPrimary); @@ -2718,34 +2588,21 @@ public class HARegionQueue implements RegionQueue { } /** - * Overloaded constructor to pass an HashMap instance as a haContainer. + * Overloaded constructor to pass an {@code HashMap} instance as a haContainer. * - * @param regionName - * @param cache - * @param hrqa - * @throws IOException - * @throws ClassNotFoundException - * @throws CacheException - * @throws InterruptedException * @since GemFire 5.7 */ - TestOnlyHARegionQueue(String regionName, Cache cache, HARegionQueueAttributes hrqa) + TestOnlyHARegionQueue(String regionName, InternalCache cache, HARegionQueueAttributes hrqa) throws IOException, ClassNotFoundException, CacheException, InterruptedException { - this(regionName, (GemFireCacheImpl) cache, hrqa, new HashMap(), HandShake.CONFLATION_DEFAULT, - false); + this(regionName, cache, hrqa, new HashMap(), HandShake.CONFLATION_DEFAULT, false); } } /** * This thread will check for messages which have been dispatched. After a configurable time or - * size is reached, it will create a new QueueRemovalMessage and send it to all the - * nodes in the DistributedSystem - * - *

- * author Mitul Bid - * + * size is reached, it will create a new {@code QueueRemovalMessage} and send it to all the nodes + * in the DistributedSystem */ - private static class QueueRemovalThread extends Thread { /** @@ -2753,14 +2610,14 @@ public class HARegionQueue implements RegionQueue { */ private volatile boolean shutdown = false; - private final GemFireCacheImpl cache; + private final InternalCache cache; /** * Constructor : Creates and initializes the thread */ - public QueueRemovalThread(GemFireCacheImpl c) { + public QueueRemovalThread(InternalCache cache) { this.setDaemon(true); - this.cache = c; + this.cache = cache; } private boolean checkCancelled() { @@ -2775,11 +2632,7 @@ public class HARegionQueue implements RegionQueue { /** * The thread will check the dispatchedMessages map for messages that have been dispatched. It - * will create a new QueueRemovalMessage and send it to the other nodes - */ - /** - * The thread will check the dispatchedMessages map for messages that have been dispatched. It - * will create a new QueueRemovalMessage and send it to the other nodes + * will create a new {@code QueueRemovalMessage} and send it to the other nodes */ @Override public void run() { @@ -2836,7 +2689,7 @@ public class HARegionQueue implements RegionQueue { dm.putOutgoing(qrm); } // messages exist } // be somewhat tolerant of failures - catch (CancelException e) { + catch (CancelException ignore) { if (logger.isDebugEnabled()) { logger.debug("QueueRemovalThread is exiting due to cancellation"); } @@ -2918,14 +2771,14 @@ public class HARegionQueue implements RegionQueue { * threadIdToSequenceIdMap.list.add(internalMap); } } */ // first add the size within the lock - queueRemovalMessageList.add(Integer.valueOf(internalMap.size())); + queueRemovalMessageList.add(internalMap.size()); internalIterator = internalMap.entrySet().iterator(); // then add the event ids to the message list within the lock while (internalIterator.hasNext()) { internalEntry = (Map.Entry) internalIterator.next(); tid = (ThreadIdentifier) internalEntry.getKey(); sequenceId = (Long) internalEntry.getValue(); - eventId = new EventID(tid.getMembershipID(), tid.getThreadID(), sequenceId.longValue()); + eventId = new EventID(tid.getMembershipID(), tid.getThreadID(), sequenceId); queueRemovalMessageList.add(eventId); } } @@ -2945,7 +2798,7 @@ public class HARegionQueue implements RegionQueue { boolean interrupted = Thread.interrupted(); try { this.join(15 * 1000); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { interrupted = true; } finally { if (interrupted) { @@ -2960,13 +2813,9 @@ public class HARegionQueue implements RegionQueue { } /** - * Class whick keeps track of the positions ( keys) of underlying Region object for the events + * Class which keeps track of the positions ( keys) of underlying Region object for the events * placed in the Queue. It also keeps track of the last sequence ID dispatched. Thus all the * events with sequence ID less than that dispatched are eligible for removal - * - *

- * author Asif - * */ public static class DispatchedAndCurrentEvents implements DataSerializableFixedID, Serializable { /** @@ -3006,10 +2855,6 @@ public class HARegionQueue implements RegionQueue { /** * Used for debugging purpose to ensure that in no situation , for a given ThreadIdentifier the * order gets violated - * - *

- * author Asif - * */ protected volatile long lastSequenceIDPut = INIT_OF_SEQUENCEID; @@ -3024,15 +2869,9 @@ public class HARegionQueue implements RegionQueue { * @param event Object to be added to the queue * @param sequenceID Sequence ID of the event originating from a unqiue thread identified by its * ThreadIdentifier - * @throws CacheException - * @throws InterruptedException */ protected boolean putObject(Conflatable event, long sequenceID) throws CacheException, InterruptedException { - // logger.debug("BRUCE: putObject() lastSequenceIDPut="+lastSequenceIDPut - // +"; adding sequenceID="+sequenceID + " for " + event); - // logger.info("putObject, sequenceID = " + sequenceID + "; lastSequenceIDPut = " + - // lastSequenceIDPut, new Exception("putObject")); Long oldPosition = null; final boolean isDebugEnabled_BS = logger.isTraceEnabled(LogMarker.BRIDGE_SERVER); if (isDebugEnabled_BS && this.lastSequenceIDPut >= sequenceID @@ -3072,7 +2911,7 @@ public class HARegionQueue implements RegionQueue { } if (sequenceID > lastDispatchedSequenceId || owningQueue.puttingGIIDataInQueue) { // Insert the object into the Region - Long position = Long.valueOf(owningQueue.tailKey.incrementAndGet()); + Long position = owningQueue.tailKey.incrementAndGet(); owningQueue.putEventInHARegion(event, position); @@ -3128,15 +2967,11 @@ public class HARegionQueue implements RegionQueue { /** * Destroys the the old entry ( which got replaced by the new entry due to conflation) from the - * availableIDs , Region & Counters set. Since this is executed within a synch block by the new + * availableIDs , Region & Counters set. Since this is executed within a sync block by the new * entry thread, it is guaranteed that the old entry thread will exit first , placing the - * poistion etc in the available IDs set. Also the new entry thraed & old entry thread are - * belonging to diffrenet ThreadIdentifier objects & hence hold different + * position etc in the available IDs set. Also the new entry thread & old entry thread are + * belonging to different ThreadIdentifier objects & hence hold different * DispatchedAndCurrentEvents object. - * - * @param oldPosition - * @throws CacheException - * @throws InterruptedException */ private void removeOldConflatedEntry(Long oldPosition) throws CacheException, InterruptedException { @@ -3152,9 +2987,8 @@ public class HARegionQueue implements RegionQueue { } // // update statistics - // if (logger.isDebugEnabled()) { - // vrao: Fix for bug 39291: + // Fix for bug 39291: // Since markers are always conflated regardless of the conflation // setting and they are not normal (are internal) events, we should // not bump the events-conflated stat for markers. @@ -3163,7 +2997,6 @@ public class HARegionQueue implements RegionQueue { } else { owningQueue.stats.incMarkerEventsConflated(); } - // } } } } @@ -3184,13 +3017,10 @@ public class HARegionQueue implements RegionQueue { ConcurrentMap conflationMap = (ConcurrentMap) owningQueue.indexes.get(rName); Assert.assertTrue(conflationMap != null); conflationMap.remove(key, position); - } /** * Removes the Entry from the Counters Set contained in DACE - * - * @param position */ protected synchronized void destroy(Long position) { if (this.counters != null) { @@ -3227,17 +3057,17 @@ public class HARegionQueue implements RegionQueue { owningQueue.eventsMap.remove(ti); expired = true; this.owningQueue.getStatistics().decThreadIdentifiers(); - } catch (RegionDestroyedException ignore) { + } catch (RegionDestroyedException e) { if (!owningQueue.destroyInProgress && logger.isDebugEnabled()) { logger.debug( "DispatchedAndCurrentEvents::expireOrUpdate: Queue found destroyed while removing expiry entry for ThreadIdentifier={} and expiry value={}", - ti, expVal, ignore); + ti, expVal, e); } } catch (EntryNotFoundException enfe) { if (!owningQueue.destroyInProgress) { logger.error(LocalizedMessage.create( LocalizedStrings.HARegionQueue_DISPATCHEDANDCURRENTEVENTSEXPIREORUPDATE_UNEXPECTEDLY_ENCOUNTERED_EXCEPTION_WHILE_REMOVING_EXPIRY_ENTRY_FOR_THREADIDENTIFIER_0_AND_EXPIRY_VALUE_1, - new Object[] {ti, Long.valueOf(expVal), enfe})); + new Object[] {ti, expVal, enfe})); } } } @@ -3245,7 +3075,7 @@ public class HARegionQueue implements RegionQueue { if (!expired) { try { // Update the entry with latest sequence ID - owningQueue.region.put(ti, Long.valueOf(this.lastDispatchedSequenceId)); + owningQueue.region.put(ti, this.lastDispatchedSequenceId); } catch (CancelException e) { throw e; } catch (Exception e) { @@ -3267,8 +3097,6 @@ public class HARegionQueue implements RegionQueue { * be removed * * @param lastDispatchedSeqId long indicating the last dispatched ID which gets set in a DACE - * @throws CacheException - * @throws InterruptedException */ protected void setLastDispatchedIDAndRemoveEvents(long lastDispatchedSeqId) throws CacheException, InterruptedException { @@ -3341,8 +3169,6 @@ public class HARegionQueue implements RegionQueue { * Events which have been peeked & are now candidate for removal. It has to be guaranteed * that the sequence IDs of all the other counters is less than the last dispatched * @param lastDispatchedSeqId long indicating the last dispatched ID which gets set in a DACE - * @throws CacheException - * @throws InterruptedException */ protected void setLastDispatchedIDAndRemoveEvents(List removedEventInfoList, long lastDispatchedSeqId) throws CacheException, InterruptedException { @@ -3472,8 +3298,6 @@ public class HARegionQueue implements RegionQueue { /** * destroys the underlying HARegion and removes its reference from the dispatched messages map - * - * @throws CacheWriterException */ public void destroy() throws CacheWriterException { this.destroyInProgress = true; @@ -3484,9 +3308,9 @@ public class HARegionQueue implements RegionQueue { try { try { updateHAContainer(); - } catch (RegionDestroyedException e) { + } catch (RegionDestroyedException ignore) { // keep going - } catch (CancelException e) { + } catch (CancelException ignore) { // keep going if (logger.isDebugEnabled()) { logger.debug("HARegionQueue#destroy: ignored cancellation!!!!"); @@ -3495,9 +3319,9 @@ public class HARegionQueue implements RegionQueue { try { this.region.destroyRegion(); - } catch (RegionDestroyedException e) { + } catch (RegionDestroyedException ignore) { // keep going - } catch (CancelException e) { + } catch (CancelException ignore) { // keep going } ((HAContainerWrapper) haContainer).removeProxy(regionName); @@ -3510,8 +3334,6 @@ public class HARegionQueue implements RegionQueue { * If the event is an instance of HAEventWrapper, put it into the haContainer and then into the ha * region. Otherwise, simply put it into the ha region. * - * @param event - * @param position * @since GemFire 5.7 */ protected void putEventInHARegion(Conflatable event, Long position) { @@ -3623,7 +3445,7 @@ public class HARegionQueue implements RegionQueue { * If the wrapper's referenceCount becomes 1 after increment, then set this haEventWrapper and its * clientUpdateMessage into the haContainer as . * - * @param haEventWrapper An instance of HAEventWrapper + * @param haEventWrapper An instance of {@code HAEventWrapper} * @since GemFire 5.7 */ protected void putEntryConditionallyIntoHAContainer(HAEventWrapper haEventWrapper) { @@ -3690,7 +3512,7 @@ public class HARegionQueue implements RegionQueue { Object[] wrapperArray = null; acquireReadLock(); try { - if (!(this.availableIDsSize() == 0)) { + if (this.availableIDsSize() != 0) { wrapperArray = this.availableIDsArray(); } } finally { @@ -3714,7 +3536,7 @@ public class HARegionQueue implements RegionQueue { HARegionQueue.this.decAndRemoveFromHAContainer((HAEventWrapper) conflatable); } } - } catch (CancelException e) { + } catch (CancelException ignore) { return; // we're done } catch (Exception e) { if (logger.isDebugEnabled()) { @@ -3751,10 +3573,9 @@ public class HARegionQueue implements RegionQueue { * If the conflatable is an instance of HAEventWrapper, and if the corresponding entry is present * in the haContainer, then decrements its reference count. If the decremented ref count is zero * and put is not in progress, removes the entry from the haContainer, before returning the - * ClientUpdateMessage instance. + * {@code ClientUpdateMessage} instance. * - * @param conflatable - * @return An instance of ClientUpdateMessage + * @return An instance of {@code ClientUpdateMessage} * @since GemFire 5.7 */ public Conflatable getAndRemoveFromHAContainer(Conflatable conflatable) { @@ -3778,7 +3599,6 @@ public class HARegionQueue implements RegionQueue { * Decrements wrapper's reference count by one. If the decremented ref count is zero and put is * not in progress, removes the entry from the haContainer. * - * @param wrapper * @since GemFire 5.7 */ public void decAndRemoveFromHAContainer(HAEventWrapper wrapper) { @@ -3813,7 +3633,7 @@ public class HARegionQueue implements RegionQueue { /** * Set whether the dispatcher of this node is active or not (i.e. primary or secondary node). If - * flag is set to true, disables Entry Expiry Tasks. + * {@code flag} is set to {@code true}, disables Entry Expiry Tasks. * * @param flag the value to set isPrimary to */ @@ -3830,7 +3650,7 @@ public class HARegionQueue implements RegionQueue { } /** - * Disables EntryExpiryTask for the HARegion (this.region). + * Disables EntryExpiryTask for the HARegion ({@code this.region}). * */ private void disableEntryExpiryTasks() { @@ -3842,7 +3662,7 @@ public class HARegionQueue implements RegionQueue { this.region.setCustomEntryTimeToLive(new ThreadIdentifierCustomExpiry()); logger.info(LocalizedMessage.create( LocalizedStrings.HARegionQueue_ENYTRY_EXPIRY_TASKS_DISABLED_BECAUSE_QUEUE_BECAME_PRIMARY_OLD_MSG_TTL_0, - new Object[] {Integer.valueOf(oldTimeToLive)})); + new Object[] {oldTimeToLive})); } } @@ -3882,7 +3702,7 @@ public class HARegionQueue implements RegionQueue { if (r != null && !r.isDestroyed()) { try { r.close(); - } catch (RegionDestroyedException e) { + } catch (RegionDestroyedException ignore) { } } } @@ -3895,97 +3715,83 @@ public class HARegionQueue implements RegionQueue { */ public boolean isPeekInitialized() { return HARegionQueue.peekedEventsContext.get() != null; - } -} - - -/** - * A wrapper class whose underlying map gets replaced with a fresh one when QRM thread is operating - * on it. This wrapper acts as a means of communication between the QRM thread & the MapWrapper - * object contained in the HARegionQueue - * - *

- * author ashahid - */ - -class MapWrapper { - Map map; + /** + * A wrapper class whose underlying map gets replaced with a fresh one when QRM thread is + * operating on it. This wrapper acts as a means of communication between the QRM thread & the + * MapWrapper object contained in the HARegionQueue + */ + static class MapWrapper { + Map map; - List list; + List list; - boolean keepPrevAcks = false; + boolean keepPrevAcks = false; - public MapWrapper() { - super(); - map = new HashMap(); - list = new LinkedList(); - } + public MapWrapper() { + super(); + map = new HashMap(); + list = new LinkedList(); + } - void put(Object key, Object o) { - synchronized (this.map) { - this.map.put(key, o); + void put(Object key, Object o) { + synchronized (this.map) { + this.map.put(key, o); + } } } -} - -/** - * A wrapper class that has counter, key and the region-name for an event which was peeked and needs - * to be removed. The key and regionName fields will be set only if conflation is true for the - * event. - * - *

- * author dpatel - * - */ - -class RemovedEventInfo { - Long counter; + /** + * A wrapper class that has counter, key and the region-name for an event which was peeked and + * needs to be removed. The key and regionName fields will be set only if conflation is true for + * the event. + */ + static class RemovedEventInfo { + Long counter; - String regionName; + String regionName; - Object key; + Object key; - public RemovedEventInfo(Long counter, String regionName, Object key) { - this.counter = counter; - this.regionName = regionName; - this.key = key; + public RemovedEventInfo(Long counter, String regionName, Object key) { + this.counter = counter; + this.regionName = regionName; + this.key = key; + } } -} + /** this is used to expire thread identifiers, even in primary queues */ + static class ThreadIdentifierCustomExpiry implements CustomExpiry { + private static final ExpirationAttributes DEFAULT_THREAD_ID_EXP_ATTS = new ExpirationAttributes( + HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME, ExpirationAction.LOCAL_INVALIDATE); + private static volatile ExpirationAttributes testExpAtts = null; -/** this is used to expire thread identifiers, even in primary queues */ -class ThreadIdentifierCustomExpiry implements CustomExpiry { - private static final ExpirationAttributes DEFAULT_THREAD_ID_EXP_ATTS = new ExpirationAttributes( - HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME, ExpirationAction.LOCAL_INVALIDATE); - private static volatile ExpirationAttributes testExpAtts = null; - - public ExpirationAttributes getExpiry(Region.Entry entry) { - // Use key to determine expiration. - Object key = entry.getKey(); - if (key instanceof ThreadIdentifier) { - final int expTime = HARegionQueue.threadIdExpiryTime; - if (expTime != HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME) { - // This should only happen in unit test code - ExpirationAttributes result = testExpAtts; - if (result == null || result.getTimeout() != expTime) { - result = new ExpirationAttributes(expTime, ExpirationAction.LOCAL_INVALIDATE); - // save the expiration attributes in a static to prevent tests from creating lots of - // instances. - testExpAtts = result; + public ExpirationAttributes getExpiry(Region.Entry entry) { + // Use key to determine expiration. + Object key = entry.getKey(); + if (key instanceof ThreadIdentifier) { + final int expTime = HARegionQueue.threadIdExpiryTime; + if (expTime != HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME) { + // This should only happen in unit test code + ExpirationAttributes result = testExpAtts; + if (result == null || result.getTimeout() != expTime) { + result = new ExpirationAttributes(expTime, ExpirationAction.LOCAL_INVALIDATE); + // save the expiration attributes in a static to prevent tests from creating lots of + // instances. + testExpAtts = result; + } + return result; + } else { + return DEFAULT_THREAD_ID_EXP_ATTS; } - return result; } else { - return DEFAULT_THREAD_ID_EXP_ATTS; + return null; } - } else { - return null; } - } - public void close() {} + public void close() {} + } }