Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4EE6118B45 for ; Tue, 23 Feb 2016 20:25:50 +0000 (UTC) Received: (qmail 80665 invoked by uid 500); 23 Feb 2016 20:25:50 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 80629 invoked by uid 500); 23 Feb 2016 20:25:50 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 80620 invoked by uid 99); 23 Feb 2016 20:25:50 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Feb 2016 20:25:50 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id A03211A0287 for ; Tue, 23 Feb 2016 20:25:49 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.349 X-Spam-Level: X-Spam-Status: No, score=-4.349 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.329] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id PDjqDjPKUcY7 for ; Tue, 23 Feb 2016 20:25:35 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 015DF5FB0F for ; Tue, 23 Feb 2016 20:25:30 +0000 (UTC) Received: (qmail 65633 invoked by uid 99); 23 Feb 2016 20:22:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Feb 2016 20:22:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 521B6E8E83; Tue, 23 Feb 2016 20:22:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: klund@apache.org To: commits@geode.incubator.apache.org Date: Tue, 23 Feb 2016 20:24:07 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [71/94] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java index 0000000,69ae6d8..f650fee mode 000000,100755..100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java @@@ -1,0 -1,3112 +1,3116 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package com.gemstone.gemfire.internal.cache.tier.sockets; + + import java.io.ByteArrayInputStream; + import java.io.DataInputStream; + import java.io.IOException; + import java.net.Socket; + import java.net.SocketException; + import java.nio.ByteBuffer; + import java.util.Arrays; + import java.util.Collections; + import java.util.Date; + import java.util.HashMap; + import java.util.HashSet; + import java.util.Iterator; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.concurrent.ConcurrentLinkedQueue; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicInteger; + import java.util.concurrent.atomic.AtomicReference; + import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.ReadWriteLock; + import java.util.concurrent.locks.ReentrantReadWriteLock; + import java.util.regex.Pattern; + + import org.apache.logging.log4j.Logger; + + import com.gemstone.gemfire.CancelException; + import com.gemstone.gemfire.DataSerializer; + import com.gemstone.gemfire.StatisticsFactory; + import com.gemstone.gemfire.cache.Cache; + import com.gemstone.gemfire.cache.CacheClosedException; + import com.gemstone.gemfire.cache.CacheException; + import com.gemstone.gemfire.cache.ClientSession; + import com.gemstone.gemfire.cache.DynamicRegionFactory; + import com.gemstone.gemfire.cache.InterestRegistrationEvent; + import com.gemstone.gemfire.cache.InterestResultPolicy; + import com.gemstone.gemfire.cache.Region; + import com.gemstone.gemfire.cache.RegionDestroyedException; + import com.gemstone.gemfire.cache.RegionExistsException; + import com.gemstone.gemfire.cache.client.internal.RegisterInterestTracker; + import com.gemstone.gemfire.cache.operations.DestroyOperationContext; + import com.gemstone.gemfire.cache.operations.InvalidateOperationContext; + import com.gemstone.gemfire.cache.operations.OperationContext; + import com.gemstone.gemfire.cache.operations.PutOperationContext; + import com.gemstone.gemfire.cache.operations.RegionClearOperationContext; + import com.gemstone.gemfire.cache.operations.RegionCreateOperationContext; + import com.gemstone.gemfire.cache.operations.RegionDestroyOperationContext; + import com.gemstone.gemfire.cache.query.CqException; + import com.gemstone.gemfire.cache.query.CqQuery; + import com.gemstone.gemfire.cache.query.internal.cq.CqService; + import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery; + import com.gemstone.gemfire.distributed.DistributedMember; + import com.gemstone.gemfire.distributed.internal.DistributionManager; + import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; + import com.gemstone.gemfire.internal.SystemTimer; + import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask; + import com.gemstone.gemfire.internal.Version; + import com.gemstone.gemfire.internal.cache.ClientServerObserver; + import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder; + import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisee; + import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.InitialImageAdvice; + import com.gemstone.gemfire.internal.cache.Conflatable; + import com.gemstone.gemfire.internal.cache.DistributedRegion; + import com.gemstone.gemfire.internal.cache.EnumListenerEvent; + import com.gemstone.gemfire.internal.cache.EventID; + import com.gemstone.gemfire.internal.cache.FilterProfile; + import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; + import com.gemstone.gemfire.internal.cache.InterestRegistrationEventImpl; + import com.gemstone.gemfire.internal.cache.LocalRegion; + import com.gemstone.gemfire.internal.cache.PartitionedRegion; + import com.gemstone.gemfire.internal.cache.StateFlushOperation; + import com.gemstone.gemfire.internal.cache.ha.HAContainerWrapper; + import com.gemstone.gemfire.internal.cache.ha.HARegionQueue; + import com.gemstone.gemfire.internal.cache.ha.HARegionQueueAttributes; + import com.gemstone.gemfire.internal.cache.ha.HARegionQueueStats; + import com.gemstone.gemfire.internal.cache.tier.InterestType; + import com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessageImpl.CqNameToOp; + import com.gemstone.gemfire.internal.cache.tier.sockets.command.Get70; + import com.gemstone.gemfire.internal.cache.versions.VersionTag; + import com.gemstone.gemfire.internal.i18n.LocalizedStrings; + import com.gemstone.gemfire.internal.logging.LogService; + import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; + import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; + import com.gemstone.gemfire.internal.logging.log4j.LogMarker; + import com.gemstone.gemfire.internal.security.AuthorizeRequestPP; + import com.gemstone.gemfire.security.AccessControl; + import com.gemstone.gemfire.i18n.StringId; + + /** + * Class CacheClientProxy represents the server side of the + * {@link CacheClientUpdater}. It queues messages to be sent from the server to + * the client. It then reads those messages from the queue and sends them to the + * client. + * + * @author Barry Oglesby + * + * @since 4.2 + */ + @SuppressWarnings("synthetic-access") + public class CacheClientProxy implements ClientSession { + private static final Logger logger = LogService.getLogger(); + + /** + * The socket between the server and the client + */ + protected Socket _socket; + + private final AtomicBoolean _socketClosed = new AtomicBoolean(); + + /** + * A communication buffer used by each message we send to the client + */ + protected ByteBuffer _commBuffer; + + /** + * The remote host's IP address string (cached for convenience) + */ + protected String _remoteHostAddress; + + /** + * Concurrency: protected by synchronization of {@link #isMarkedForRemovalLock} + */ + protected boolean isMarkedForRemoval = false; + + /** + * @see #isMarkedForRemoval + */ + protected final Object isMarkedForRemovalLock = new Object(); + + /** + * The proxy id of the client represented by this proxy + */ + protected ClientProxyMembershipID proxyID; + + /** + * The GemFire cache + */ + protected final GemFireCacheImpl _cache; + + /** + * The list of keys that the client represented by this proxy is interested in + * (stored by region) + */ + protected final ClientInterestList[] cils = new ClientInterestList[2]; + + /** + * A thread that dispatches messages to the client + */ + protected volatile MessageDispatcher _messageDispatcher; + + /** + * The statistics for this proxy + */ + protected final CacheClientProxyStats _statistics; + + protected final AtomicReference _durableExpirationTask = new AtomicReference(); + + protected SystemTimer durableTimer; + + /** + * Whether this dispatcher is paused + */ + protected volatile boolean _isPaused = true; + + /** + * True if we are connected to a client. + */ + private volatile boolean connected = false; + // /** + // * A string representing interest in all keys + // */ + // protected static final String ALL_KEYS = "ALL_KEYS"; + // + /** + * True if a marker message is still in the ha queue. + */ + private boolean markerEnqueued = false; + + /** + * The number of times to peek on shutdown before giving up and shutting down + */ + protected static final int MAXIMUM_SHUTDOWN_PEEKS = Integer.getInteger("gemfire.MAXIMUM_SHUTDOWN_PEEKS",50).intValue(); + + /** + * The number of milliseconds to wait for an offering to the message queue + */ + protected static final int MESSAGE_OFFER_TIME = 0; + + /** + * The default maximum message queue size + */ + // protected static final int MESSAGE_QUEUE_SIZE_DEFAULT = 230000; + + /** The message queue size */ + protected final int _maximumMessageCount; + + /** + * The time (in seconds ) after which a message in the client queue will + * expire. + */ + protected final int _messageTimeToLive; + + /** + * The CacheClientNotifier registering this proxy. + */ + protected final CacheClientNotifier _cacheClientNotifier; + + /** + * Defaults to true; meaning do some logging of dropped client notification + * messages. Set the system property to true to cause dropped messages to NOT + * be logged. + */ + protected static final boolean LOG_DROPPED_MSGS = !Boolean + .getBoolean("gemfire.disableNotificationWarnings"); + + /** + * for testing purposes, delays the start of the dispatcher thread + */ + public static boolean isSlowStartForTesting = false; + + /** + * Default value for slow starting time of dispatcher + */ + private static final long DEFAULT_SLOW_STARTING_TIME = 5000; + + /** + * Key in the system property from which the slow starting time value will be + * retrieved + */ + private static final String KEY_SLOW_START_TIME_FOR_TESTING = "slowStartTimeForTesting"; + + private boolean isPrimary; + + /** @since 5.7 */ + protected byte clientConflation = HandShake.CONFLATION_DEFAULT; + + /** + * Flag to indicate whether to keep a durable client's queue alive + */ + boolean keepalive = false; + + private AccessControl postAuthzCallback; + + /** + * For multiuser environment.. + */ + private ClientUserAuths clientUserAuths; + + private final Object clientUserAuthsLock = new Object(); + + /** + * The version of the client + */ + private Version clientVersion; + + /** + * A map of region name as key and integer as its value. Basically, it stores + * the names of the regions with DataPolicy as EMPTY. If an + * event's region name is present in this map, it's full value (and not + * delta) is sent to the client represented by this proxy. + * + * @since 6.1 + */ + private volatile Map regionsWithEmptyDataPolicy = new HashMap(); + + /** + * A debug flag used for testing Backward compatibility + */ + public static boolean AFTER_MESSAGE_CREATION_FLAG = false; + + /** + * Notify the region when a client interest registration occurs. This tells + * the region to update access time when an update is to be pushed to a + * client. It is enabled only for PartitionedRegions + * currently. + */ + protected static final boolean NOTIFY_REGION_ON_INTEREST = Boolean + .getBoolean("gemfire.updateAccessTimeOnClientInterest"); + + /** + * The AcceptorImpl identifier to which the proxy is connected. + */ + private final long _acceptorId; + + /** acceptor's setting for notifyBySubscription */ + private final boolean notifyBySubscription; + + /** To queue the events arriving during message dispatcher initialization */ + private volatile ConcurrentLinkedQueue queuedEvents = new ConcurrentLinkedQueue(); + + private final Object queuedEventsSync = new Object(); + + private volatile boolean messageDispatcherInit = false; + + /** + * A counter that keeps track of how many task iterations that have occurred + * since the last ping or message. The + * {@linkplain CacheClientNotifier#scheduleClientPingTask ping task} + * increments it. Normal messages sent to the client reset it. If the counter + * reaches 3, a ping is sent. + */ + private final AtomicInteger pingCounter = new AtomicInteger(); + + + /** Date on which this instances was created */ + private Date creationDate; + + /** true when the durable client associated with this proxy is being + * restarted and prevents cqs from being closed and drained**/ + private boolean drainLocked = false; + private final Object drainLock = new Object(); + + /** number of cq drains that are currently in progress **/ + private int numDrainsInProgress = 0; + private final Object drainsInProgressLock = new Object(); + + /** + * Constructor. + * + * @param ccn + * The CacheClientNotifier registering this proxy + * @param socket + * The socket between the server and the client + * @param proxyID + * representing the Connection Proxy of the clien + * @param isPrimary + * The boolean stating whether this prozxy is primary + * @throws CacheException { + */ + protected CacheClientProxy(CacheClientNotifier ccn, Socket socket, + ClientProxyMembershipID proxyID, boolean isPrimary, byte clientConflation, + Version clientVersion, long acceptorId, boolean notifyBySubscription) + throws CacheException { + initializeTransientFields(socket, proxyID, isPrimary, clientConflation, clientVersion); + this._cacheClientNotifier = ccn; + this._cache = (GemFireCacheImpl)ccn.getCache(); + this._maximumMessageCount = ccn.getMaximumMessageCount(); + this._messageTimeToLive = ccn.getMessageTimeToLive(); + this._acceptorId = acceptorId; + this.notifyBySubscription = notifyBySubscription; + StatisticsFactory factory = this._cache.getDistributedSystem(); + this._statistics = new CacheClientProxyStats(factory, + "id_"+this.proxyID.getDistributedMember().getId()+ "_at_"+ this._remoteHostAddress + ":" + this._socket.getPort()); + + // Create the interest list + this.cils[RegisterInterestTracker.interestListIndex] = + new ClientInterestList(this, this.proxyID); + // Create the durable interest list + this.cils[RegisterInterestTracker.durableInterestListIndex] = + new ClientInterestList(this, this.getDurableId()); + this.postAuthzCallback = null; + this._cacheClientNotifier.getAcceptorStats().incCurrentQueueConnections(); + this.creationDate = new Date(); + initializeClientAuths(); + } + + private void initializeClientAuths() + { + if(AcceptorImpl.isPostAuthzCallbackPresent()) + this.clientUserAuths = ServerConnection.getClientUserAuths(this.proxyID); + } + + private void reinitializeClientAuths() + { + if (this.clientUserAuths != null && AcceptorImpl.isPostAuthzCallbackPresent()) { + synchronized (this.clientUserAuthsLock) { + ClientUserAuths newClientAuth = ServerConnection.getClientUserAuths(this.proxyID); + newClientAuth.fillPreviousCQAuth(this.clientUserAuths); + this.clientUserAuths = newClientAuth; + } + } + } + + public void setPostAuthzCallback(AccessControl authzCallback) { + //TODO:hitesh synchronization + synchronized (this.clientUserAuthsLock) { + if (this.postAuthzCallback != null) + this.postAuthzCallback.close(); + this.postAuthzCallback = authzCallback; + } + } + + public void setCQVsUserAuth(String cqName, long uniqueId, boolean isDurable) + { + if(postAuthzCallback == null) //only for multiuser + { + if(this.clientUserAuths != null) + this.clientUserAuths.setUserAuthAttributesForCq(cqName, uniqueId, isDurable); + } + } + + private void initializeTransientFields(Socket socket, + ClientProxyMembershipID pid, boolean ip, byte cc, Version vers) { + this._socket = socket; + this.proxyID = pid; + this.connected = true; + { + int bufSize = 1024; + try { + bufSize = _socket.getSendBufferSize(); + if (bufSize < 1024) { + bufSize = 1024; + } + } catch (SocketException ignore) { + } + this._commBuffer = ServerConnection.allocateCommBuffer(bufSize, socket); + } + this._remoteHostAddress = socket.getInetAddress().getHostAddress(); + this.isPrimary = ip; + this.clientConflation = cc; + this.clientVersion = vers; + } + + public boolean isMarkerEnqueued() { + return markerEnqueued; + } + + public void setMarkerEnqueued(boolean bool) { + markerEnqueued = bool; + } + + public long getAcceptorId(){ + return this._acceptorId; + } + + /** + * @return the notifyBySubscription + */ + public boolean isNotifyBySubscription() { + return this.notifyBySubscription; + } + + + /** + * Returns the DistributedMember represented by this proxy + */ + public ClientProxyMembershipID getProxyID() + { + return this.proxyID; + } + + // the following code was commented out simply because it was not used + // /** + // * Determines if the proxy represents the client host (and only the host, not + // * necessarily the exact VM running on the host) + // * + // * @return Whether the proxy represents the client host + // */ + // protected boolean representsClientHost(String clientHost) + // { + // // [bruce] TODO BUGBUGBUG: this should compare InetAddresses, not Strings + // return this._remoteHostAddress.equals(clientHost); + // } + + // protected boolean representsClientVM(DistributedMember remoteMember) + // { + // // logger.warn("Is input port " + clientPort + " contained in " + + // // logger.warn("Does input host " + clientHost + " equal " + + // // this._remoteHostAddress+ ": " + representsClientHost(clientHost)); + // // logger.warn("representsClientVM: " + + // // (representsClientHost(clientHost) && containsPort(clientPort))); + // return (proxyID.getDistributedMember().equals(remoteMember)); + // } + + // /** + // * Determines if the CacheClientUpdater proxied by this instance is listening + // * on the input clientHost and clientPort + // * + // * @param clientHost + // * The host name of the client to compare + // * @param clientPort + // * The port number of the client to compare + // * + // * @return Whether the CacheClientUpdater proxied by this instance is + // * listening on the input clientHost and clientPort + // */ + // protected boolean representsCacheClientUpdater(String clientHost, + // int clientPort) + // { + // return (clientPort == this._socket.getPort() && representsClientHost(clientHost)); + // } + + protected boolean isMember(ClientProxyMembershipID memberId) + { + return this.proxyID.equals(memberId); + } + + protected boolean isSameDSMember(ClientProxyMembershipID memberId) + { + return this.proxyID.isSameDSMember(memberId); + } + + /** + * Set the queue keepalive option + * + * @param option whether to keep the durable client's queue alive + */ + protected void setKeepAlive(boolean option) { + this.keepalive = option; + } + + /** + * Returns the socket between the server and the client + * + * @return the socket between the server and the client + */ + protected Socket getSocket() + { + return this._socket; + } + + public String getSocketHost() + { + return this._socket.getInetAddress().getHostAddress(); + } + + protected ByteBuffer getCommBuffer() { + return this._commBuffer; + } + + /** + * Returns the remote host's IP address string + * + * @return the remote host's IP address string + */ + protected String getRemoteHostAddress() + { + return this._remoteHostAddress; + } + + /** + * Returns the remote host's port + * + * @return the remote host's port + */ + public int getRemotePort() + { + return this._socket.getPort(); + } + + /** + * Returns whether the proxy is connected to a remote client + * + * @return whether the proxy is connected to a remote client + */ + public boolean isConnected() { + return this.connected; + } + + /** + * Mark the receiver as needing removal + * @return true if it was already marked for removal + */ + protected boolean startRemoval() { + boolean result; + synchronized (this.isMarkedForRemovalLock) { + result = this.isMarkedForRemoval; + this.isMarkedForRemoval = true; + } + return result; + } + + /** + * Wait until the receiver's removal has completed before + * returning. + * @return true if the proxy was initially marked for removal + */ + protected boolean waitRemoval() { + boolean result; + synchronized (this.isMarkedForRemovalLock) { + result = this.isMarkedForRemoval; + boolean interrupted = false; + try { + while (this.isMarkedForRemoval) { + if (logger.isDebugEnabled()) { + logger.debug("Waiting for CacheClientProxy removal: {}", this); + } + try { + this.isMarkedForRemovalLock.wait(); + } + catch (InterruptedException e) { + interrupted = true; + this._cache.getCancelCriterion().checkCancelInProgress(e); + } + } // while + } + finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } // synchronized + return result; + } + + /** + * Indicate that removal has completed on this instance + */ + protected void notifyRemoval() { + synchronized (this.isMarkedForRemovalLock) { + this.isMarkedForRemoval = false; + this.isMarkedForRemovalLock.notifyAll(); + } + } + + /** + * Returns the GemFire cache + * + * @return the GemFire cache + */ + public GemFireCacheImpl getCache() + { + return this._cache; + } + + public Set getInterestRegisteredRegions() { + HashSet regions = new HashSet(); + for(int i=0; i < this.cils.length; i++){ + if (!this.cils[i].regions.isEmpty()) { + regions.addAll(this.cils[i].regions); + } + } + return regions; + } + + /** + * Returns the proxy's statistics + * + * @return the proxy's statistics + */ + public CacheClientProxyStats getStatistics() + { + return this._statistics; + } + + /** + * Returns this proxy's CacheClientNotifier. + * @return this proxy's CacheClientNotifier + */ + protected CacheClientNotifier getCacheClientNotifier() { + return this._cacheClientNotifier; + } + + /** + * Returns the size of the queue for heuristic purposes. This + * size may be changing concurrently if puts/gets are occurring + * at the same time. + */ + public int getQueueSize() { + return this._messageDispatcher == null ? 0 + : this._messageDispatcher.getQueueSize(); + } + + /** + * returns the queue size calculated through stats + */ + public int getQueueSizeStat() { + return this._messageDispatcher == null ? 0 + : this._messageDispatcher.getQueueSizeStat(); + } + + + public boolean drainInProgress() { + synchronized(drainsInProgressLock) { + return numDrainsInProgress > 0; + } + } + + //Called from CacheClientNotifier when attempting to restart paused proxy + //locking the drain lock requires that no drains are in progress + //when the lock was acquired. + public boolean lockDrain() { + synchronized(drainsInProgressLock) { + if (!drainInProgress()) { + synchronized(drainLock) { + if (testHook != null) { + testHook.doTestHook("PRE_ACQUIRE_DRAIN_LOCK_UNDER_SYNC"); + } + //prevent multiple lockings of drain lock + if (!drainLocked) { + drainLocked = true; + return true; + } + } + } + } + return false; + } + + //Called from CacheClientNotifier when completed restart of proxy + public void unlockDrain() { + if (testHook != null) { + testHook.doTestHook("PRE_RELEASE_DRAIN_LOCK"); + } + synchronized(drainLock) { + drainLocked = false; + } + } + + //Only close the client cq if it is paused and no one is attempting to restart the proxy + public boolean closeClientCq(String clientCQName) throws CqException { + if (testHook != null) { + testHook.doTestHook("PRE_DRAIN_IN_PROGRESS"); + } + synchronized(drainsInProgressLock) { + numDrainsInProgress ++; + } + if (testHook != null) { + testHook.doTestHook("DRAIN_IN_PROGRESS_BEFORE_DRAIN_LOCK_CHECK"); + } + try { + //If the drain lock was acquired, the other thread did so before we could bump up + //the numDrainsInProgress. That means we need to stop. + if (drainLocked) { + // someone is trying to restart a paused proxy + String msg = LocalizedStrings.CacheClientProxy_COULD_NOT_DRAIN_CQ_DUE_TO_RESTARTING_DURABLE_CLIENT.toLocalizedString(clientCQName, proxyID.getDurableId()); + logger.info(msg); + throw new CqException(msg); + } + //isConnected is to protect against the case where a durable client has reconnected + //but has not yet sent a ready for events message + //we can probably remove the isPaused check + if (isPaused() && !isConnected()) { + CqService cqService = getCache().getCqService(); + if (cqService != null) { + InternalCqQuery cqToClose = cqService.getCq(cqService.constructServerCqName( + clientCQName, this.proxyID)); + // close and drain + if (cqToClose != null) { + cqService.closeCq(clientCQName, this.proxyID); + this._messageDispatcher.drainClientCqEvents(this.proxyID, cqToClose); + } + else { + String msg = LocalizedStrings.CqService_CQ_NOT_FOUND_FAILED_TO_CLOSE_THE_SPECIFIED_CQ_0.toLocalizedString(clientCQName); + logger.info(msg); + throw new CqException(msg); + } + } + } else { + String msg = LocalizedStrings.CacheClientProxy_COULD_NOT_DRAIN_CQ_DUE_TO_ACTIVE_DURABLE_CLIENT.toLocalizedString(clientCQName, proxyID.getDurableId()); + logger.info(msg); + throw new CqException(msg); + } + } finally { + synchronized (drainsInProgressLock) { + numDrainsInProgress--; + } + if (testHook != null) { + testHook.doTestHook("DRAIN_COMPLETE"); + } + + } + return true; + } + + + /** + * Returns whether the proxy is alive. It is alive if its message dispatcher + * is processing messages. + * + * @return whether the proxy is alive + */ + protected boolean isAlive() + { + if (this._messageDispatcher == null) { + return false; + } + return !this._messageDispatcher.isStopped(); + } + + /** + * Returns whether the proxy is paused. It is paused if its message dispatcher + * is paused. This only applies to durable clients. + * + * @return whether the proxy is paused + * + * @since 5.5 + */ + protected boolean isPaused() { + return this._isPaused; + } + + protected void setPaused(boolean isPaused) { + this._isPaused = isPaused; + } + + /** + * Closes the proxy. This method checks the message queue for any unprocessed + * messages and processes them for MAXIMUM_SHUTDOWN_PEEKS. + * + * @see CacheClientProxy#MAXIMUM_SHUTDOWN_PEEKS + */ + protected void close() + { + close(true, false); + } + + /** + * Set to true once this proxy starts being closed. + * Remains true for the rest of its existence. + */ + private final AtomicBoolean closing = new AtomicBoolean(false); + + /** + * Close the CacheClientProxy. + * + * @param checkQueue + * Whether to message check the queue and process any contained + * messages (up to MAXIMUM_SHUTDOWN_PEEKS). + * @param stoppedNormally + * Whether client stopped normally + * + * @return whether to keep this CacheClientProxy + * @see CacheClientProxy#MAXIMUM_SHUTDOWN_PEEKS + */ + protected boolean close(boolean checkQueue, boolean stoppedNormally) { + boolean pauseDurable = false; + // If the client is durable and either (a) it hasn't stopped normally or (b) it + // has stopped normally but it is configured to be kept alive, set pauseDurable + // to true + if (isDurable() + && (!stoppedNormally || (getDurableKeepAlive() && stoppedNormally))) { + pauseDurable = true; + } + + boolean keepProxy = false; + if (pauseDurable) { + pauseDispatching(); + keepProxy = true; + } else { + terminateDispatching(checkQueue); + closeTransientFields(); + } + + this.connected = false; + + // Close the Authorization callback (if any) + try { + if (!pauseDurable) { + if (this.postAuthzCallback != null) {//for single user + this.postAuthzCallback.close(); + this.postAuthzCallback = null; + }else if(this.clientUserAuths != null) {//for multiple users + this.clientUserAuths.cleanup(true); + this.clientUserAuths = null; + } + } + } + catch (Exception ex) { + if (this._cache.getSecurityLoggerI18n().warningEnabled()) { + this._cache.getSecurityLoggerI18n().warning(LocalizedStrings.TWO_ARG_COLON, new Object[] {this, ex}); + } + } + // Notify the caller whether to keep this proxy. If the proxy is durable + // and should be paused, then return true; otherwise return false. + return keepProxy; + } + + protected void pauseDispatching() { + if (this._messageDispatcher == null){ + return; + } + + // If this is the primary, pause the dispatcher (which closes its transient + // fields. Otherwise, just close the transient fields. + if (logger.isDebugEnabled()) { + logger.debug("{}: Pausing processing", this); + } + //BUGFIX for BUG#38234 + if(!testAndSetPaused(true) && this.isPrimary) { + if (this._messageDispatcher != Thread.currentThread()) { + // don't interrupt ourself to fix bug 40611 + this._messageDispatcher.interrupt(); + } + } + + try { + // Close transient fields + closeTransientFields(); + } finally { + // make sure this gets called if closeTransientFields throws; see bug 40611 + // Start timer + scheduleDurableExpirationTask(); + } + } + + private boolean testAndSetPaused(boolean newValue) { + + synchronized(this._messageDispatcher._pausedLock) { + if (this._isPaused != newValue) { + this._isPaused = newValue; + this._messageDispatcher._pausedLock.notifyAll(); + return !this._isPaused; + } + else { + this._messageDispatcher._pausedLock.notifyAll(); + return this._isPaused; + } + } + } + protected void terminateDispatching(boolean checkQueue) { + if (this._messageDispatcher == null){ + return; + } + + try { + if (logger.isDebugEnabled()) { + logger.debug("{}: Terminating processing", this); + } + if (this._messageDispatcher == Thread.currentThread()) { + // I'm not even sure this is possible but if the dispatcher + // calls us then at least call stopDispatching + // the old code did this (I'm not even sure it is safe to do). + // This needs to be done without testing OR setting "closing". + this._messageDispatcher.stopDispatching(checkQueue); + this.cils[RegisterInterestTracker.interestListIndex].clearClientInterestList(); + this.cils[RegisterInterestTracker.durableInterestListIndex].clearClientInterestList(); + // VJR: bug 37487 fix + destroyRQ(); + return; + } + + if (!this.closing.compareAndSet(false, true)) { + // must already be closing so just return + // this is part of the fix for 37684 + return; + } + // Unregister interest in all interests (if necessary) + this.cils[RegisterInterestTracker.interestListIndex].clearClientInterestList(); + this.cils[RegisterInterestTracker.durableInterestListIndex].clearClientInterestList(); + + // If the message dispatcher is paused, unpause it. The next bit of + // code will interrupt the waiter. + if (this.testAndSetPaused(false)) { + if (logger.isDebugEnabled()) { + logger.debug("{}: Paused but terminating processing", this); + } + // Cancel the expiration task + cancelDurableExpirationTask(false); + } + + boolean alreadyDestroyed = false; + boolean gotInterrupt = Thread.interrupted(); // clears the flag + try { + // Stop the message dispatcher + this._messageDispatcher.stopDispatching(checkQueue); + + gotInterrupt |= Thread.interrupted(); // clears the flag + + // to fix bug 37684 + // 1. check to see if dispatcher is still alive + if (this._messageDispatcher.isAlive()) { + closeSocket(); + destroyRQ(); + alreadyDestroyed = true; + this._messageDispatcher.interrupt(); + if (this._messageDispatcher.isAlive()) { + try { + this._messageDispatcher.join(1000); + } catch (InterruptedException ex) { + gotInterrupt = true; + } + // if it is still alive then warn and move on + if (this._messageDispatcher.isAlive()) { + //com.gemstone.gemfire.internal.OSProcess.printStacks(com.gemstone.gemfire.internal.OSProcess.getId()); + logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_COULD_NOT_STOP_MESSAGE_DISPATCHER_THREAD, this)); + } + } + } + } + finally { + if (gotInterrupt) { + Thread.currentThread().interrupt(); + } + if (!alreadyDestroyed) { + destroyRQ(); + } + } + } finally { + // Close the statistics + this._statistics.close(); // fix for bug 40105 + closeTransientFields(); // make sure this happens + } + } + + private void closeSocket() { + if (this._socketClosed.compareAndSet(false, true)) { + // Close the socket + this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress, null); + getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections(); + } + } + + private void closeTransientFields() { + closeSocket(); + + // Null out comm buffer, host address, ports and proxy id. All will be + // replaced when the client reconnects. + releaseCommBuffer(); + { + String remoteHostAddress = this._remoteHostAddress; + if (remoteHostAddress != null) { + this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress); + this._remoteHostAddress = null; + } + } + try { + this.cils[RegisterInterestTracker.interestListIndex].clearClientInterestList(); + } catch (CacheClosedException e) { + // ignore if cache is shutting down + } + // Commented to fix bug 40259 + //this.clientVersion = null; + closeNonDurableCqs(); + } + + private void releaseCommBuffer() { + ByteBuffer bb = this._commBuffer; + if (bb != null) { + this._commBuffer = null; + ServerConnection.releaseCommBuffer(bb); + } + } + + private void closeNonDurableCqs(){ + CqService cqService = getCache().getCqService(); + if (cqService != null) { + try { + cqService.closeNonDurableClientCqs(getProxyID()); + } + catch (CqException ex) { + logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_CQEXCEPTION_WHILE_CLOSING_NON_DURABLE_CQS_0, ex.getLocalizedMessage())); + } + } + } + + private void destroyRQ() { + if (this._messageDispatcher == null) { + return; + } + try { + // Using Destroy Region bcoz this method is modified in HARegion so as + // not to distribute. + // For normal Regions , even the localDestroyRegion actually propagates + HARegionQueue rq = this._messageDispatcher._messageQueue; + rq.destroy(); + + // if (!rq.getRegion().isDestroyed()) { + // rq.getRegion().destroyRegion(); + // } + } + catch (RegionDestroyedException rde) { + // throw rde; + } + catch (CancelException e) { + // throw e; + } + catch (Exception warning) { + logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_EXCEPTION_IN_CLOSING_THE_UNDERLYING_HAREGION_OF_THE_HAREGIONQUEUE, this), warning); + } + } + + public void registerInterestRegex(String regionName, String regex, + boolean isDurable) { + registerInterestRegex(regionName, regex, isDurable, true); + } + + public void registerInterestRegex(String regionName, String regex, + boolean isDurable, boolean receiveValues) { + if (this.isPrimary) { + // Notify all secondaries and client of change in interest + notifySecondariesAndClient(regionName, regex, InterestResultPolicy.NONE, + isDurable, receiveValues, InterestType.REGULAR_EXPRESSION); + } else { + throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString()); + } + } + + public void registerInterest(String regionName, Object keyOfInterest, + InterestResultPolicy policy, boolean isDurable) { + registerInterest(regionName, keyOfInterest, policy, isDurable, true); + } + + public void registerInterest(String regionName, Object keyOfInterest, + InterestResultPolicy policy, boolean isDurable, + boolean receiveValues) { + if (keyOfInterest instanceof String && keyOfInterest.equals("ALL_KEYS")) { + registerInterestRegex(regionName, ".*", isDurable, receiveValues); + } else if (keyOfInterest instanceof List) { + if (this.isPrimary) { + notifySecondariesAndClient(regionName, keyOfInterest, policy, + isDurable, receiveValues, InterestType.KEY); + } else { + throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString()); + } + } else { + if (this.isPrimary) { + // Notify all secondaries and client of change in interest + notifySecondariesAndClient(regionName, keyOfInterest, policy, + isDurable, receiveValues, InterestType.KEY); + + // Enqueue the initial value message for the client if necessary + if (policy == InterestResultPolicy.KEYS_VALUES) { + Get70 request = (Get70)Get70.getCommand(); + LocalRegion lr = (LocalRegion) this._cache.getRegion(regionName); + Get70.Entry entry = request.getValueAndIsObject(lr, keyOfInterest, null, + null); + boolean isObject = entry.isObject; + byte[] value = null; + if (entry.value instanceof byte[]) { + value = (byte[])entry.value; + } else { + try { + value = CacheServerHelper.serialize(entry.value); + } catch (IOException e) { + logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_THE_FOLLOWING_EXCEPTION_OCCURRED_0, entry.value), e); + } + } + VersionTag tag = entry.versionTag; + ClientUpdateMessage updateMessage = new ClientUpdateMessageImpl( + EnumListenerEvent.AFTER_CREATE, lr, keyOfInterest, value, null, + (isObject ? (byte) 0x01 : (byte) 0x00), null, this.proxyID, + new EventID(this._cache.getDistributedSystem()), tag); + CacheClientNotifier.routeSingleClientMessage(updateMessage, this.proxyID); + } + // Add the client to the region's filters + //addFilterRegisteredClients(regionName, keyOfInterest); + } else { + throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString()); + } + } + } + + private void notifySecondariesAndClient(String regionName, + Object keyOfInterest, InterestResultPolicy policy, boolean isDurable, + boolean receiveValues, int interestType) { + // Create a client interest message for the keyOfInterest + ClientInterestMessageImpl message = new ClientInterestMessageImpl( + new EventID(this._cache.getDistributedSystem()), regionName, + keyOfInterest, interestType, policy.getOrdinal(), isDurable, + !receiveValues, ClientInterestMessageImpl.REGISTER); + + // Notify all secondary proxies of a change in interest + notifySecondariesOfInterestChange(message); + + // Modify interest registration + if (keyOfInterest instanceof List) { + registerClientInterestList(regionName, (List) keyOfInterest, isDurable, + !receiveValues, true); + } else { + registerClientInterest(regionName, keyOfInterest, interestType, + isDurable, !receiveValues, true); + } + + // Enqueue the interest registration message for the client. + // If the client is not 7.0.1 or greater and the key of interest is a list, + // then create an individual message for each entry in the list since the + // client doesn't support a ClientInterestMessageImpl containing a list. + if (Version.GFE_701.compareTo(this.clientVersion) > 0 + && keyOfInterest instanceof List) { + for (Iterator i = ((List) keyOfInterest).iterator(); i.hasNext();) { + this._messageDispatcher.enqueueMessage(new ClientInterestMessageImpl( + new EventID(this._cache.getDistributedSystem()), regionName, + i.next(), interestType, policy.getOrdinal(), isDurable, !receiveValues, + ClientInterestMessageImpl.REGISTER)); + } + } else { + this._messageDispatcher.enqueueMessage(message); + } + } + + public void unregisterInterestRegex(String regionName, String regex, + boolean isDurable) { + unregisterInterestRegex(regionName, regex, isDurable, true); + } + + public void unregisterInterestRegex(String regionName, String regex, + boolean isDurable, boolean receiveValues) { + if (this.isPrimary) { + notifySecondariesAndClient(regionName, regex, isDurable, receiveValues, + InterestType.REGULAR_EXPRESSION); + } else { + throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString()); + } + } + + public void unregisterInterest(String regionName, Object keyOfInterest, + boolean isDurable) { + unregisterInterest(regionName, keyOfInterest, isDurable, true); + } + + public void unregisterInterest(String regionName, Object keyOfInterest, + boolean isDurable, boolean receiveValues) { + if (keyOfInterest instanceof String && keyOfInterest.equals("ALL_KEYS")) { + unregisterInterestRegex(regionName, ".*", isDurable, receiveValues); + } else { + if (this.isPrimary) { + notifySecondariesAndClient(regionName, keyOfInterest, isDurable, + receiveValues, InterestType.KEY); + } else { + throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString()); + } + } + } + + private void notifySecondariesAndClient(String regionName, + Object keyOfInterest, boolean isDurable, boolean receiveValues, + int interestType) { + // Notify all secondary proxies of a change in interest + ClientInterestMessageImpl message = new ClientInterestMessageImpl( + new EventID(this._cache.getDistributedSystem()), regionName, + keyOfInterest, interestType, (byte) 0, isDurable, !receiveValues, + ClientInterestMessageImpl.UNREGISTER); + notifySecondariesOfInterestChange(message); + + // Modify interest registration + if (keyOfInterest instanceof List) { + unregisterClientInterest(regionName, (List) keyOfInterest, false); + } else { + unregisterClientInterest(regionName, keyOfInterest, interestType, + false); + } + + // Enqueue the interest unregistration message for the client. + // If the client is not 7.0.1 or greater and the key of interest is a list, + // then create an individual message for each entry in the list since the + // client doesn't support a ClientInterestMessageImpl containing a list. + if (Version.GFE_701.compareTo(this.clientVersion) > 0 + && keyOfInterest instanceof List) { + for (Iterator i = ((List) keyOfInterest).iterator(); i.hasNext();) { + this._messageDispatcher.enqueueMessage(new ClientInterestMessageImpl( + new EventID(this._cache.getDistributedSystem()), regionName, + i.next(), interestType, (byte) 0, isDurable, !receiveValues, + ClientInterestMessageImpl.UNREGISTER)); + } + } else { + this._messageDispatcher.enqueueMessage(message); + } + } + + protected void notifySecondariesOfInterestChange(ClientInterestMessageImpl message) { + if (logger.isDebugEnabled()) { + StringBuffer subBuffer = new StringBuffer(); + if (message.isRegister()) { + subBuffer + .append("register ") + .append(message.getIsDurable() ? "" : "non-") + .append("durable interest in "); + } else { + subBuffer.append("unregister interest in "); + } + StringBuffer buffer = new StringBuffer(); + buffer + .append(this) + .append(": Notifying secondary proxies to ") + .append(subBuffer.toString()) + .append(message.getRegionName()) + .append("->") + .append(message.getKeyOfInterest()) + .append("->") + .append(InterestType.getString(message.getInterestType())); + logger.debug(buffer.toString()); + } + this._cacheClientNotifier.deliverInterestChange(this.proxyID, message); + } + + /* + protected void addFilterRegisteredClients(String regionName, + Object keyOfInterest) { + try { + this._cacheClientNotifier.addFilterRegisteredClients(regionName, + this.proxyID); + } catch (RegionDestroyedException e) { + logger.warn(LocalizedStrings.CacheClientProxy_0_INTEREST_REG_FOR_0_FAILED, regionName + "->" + keyOfInterest, e); + } + } + */ + + /** + * Registers interest in the input region name and key + * + * @param regionName + * The fully-qualified name of the region in which to register + * interest + * @param keyOfInterest + * The key in which to register interest + */ + protected void registerClientInterest(String regionName, + Object keyOfInterest, int interestType, boolean isDurable, + boolean sendUpdatesAsInvalidates, boolean flushState) + { + ClientInterestList cil = + this.cils[RegisterInterestTracker.getInterestLookupIndex( + isDurable, false)]; + cil.registerClientInterest(regionName, keyOfInterest, interestType, sendUpdatesAsInvalidates); + if (flushState) { + flushForInterestRegistration(regionName, this._cache.getDistributedSystem().getDistributedMember()); + } + HARegionQueue queue = getHARegionQueue(); + if (queue != null) { // queue is null during initialization + queue.setHasRegisteredInterest(true); + } + } + + /** + * flush other regions to the given target. This is usually the member + * that is registering the interest. During queue creation it is the + * queue's image provider. + */ + public void flushForInterestRegistration(String regionName, DistributedMember target) { + Region r = this._cache.getRegion(regionName); + if (r == null) { + if (logger.isDebugEnabled()) { + logger.debug("Unable to find region '{}' to flush for interest registration", regionName); + } + } else if (r.getAttributes().getScope().isDistributed()) { + if (logger.isDebugEnabled()){ + logger.debug("Flushing region '{}' for interest registration", regionName); + } + CacheDistributionAdvisee cd = (CacheDistributionAdvisee)r; + final StateFlushOperation sfo; + if (r instanceof PartitionedRegion) { + // need to flush all buckets. SFO should be changed to target buckets + // belonging to a particular PR, but it doesn't have that option right now + sfo = new StateFlushOperation( + this._cache.getDistributedSystem().getDistributionManager()); + } else { + sfo = new StateFlushOperation((DistributedRegion)r); + } + try { + // bug 41681 - we need to flush any member that may have a cache operation + // in progress so that the changes are received there before returning + // from this method + InitialImageAdvice advice = cd.getCacheDistributionAdvisor().adviseInitialImage(null); + HashSet recips = new HashSet(advice.getReplicates()); + recips.addAll(advice.getUninitialized()); + recips.addAll(advice.getEmpties()); + recips.addAll(advice.getPreloaded()); + recips.addAll(advice.getOthers()); + sfo.flush(recips, + target, + DistributionManager.HIGH_PRIORITY_EXECUTOR, true); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + } + } + + /** + * Unregisters interest in the input region name and key + * + * @param regionName + * The fully-qualified name of the region in which to unregister + * interest + * @param keyOfInterest + * The key in which to unregister interest + * @param isClosing + * Whether the caller is closing + */ + protected void unregisterClientInterest(String regionName, + Object keyOfInterest, int interestType, boolean isClosing) + { + // only unregister durable interest if isClosing and !keepalive + if (!isClosing /* explicit unregister */ + || !getDurableKeepAlive() /* close and no keepAlive*/) { + this.cils[RegisterInterestTracker.durableInterestListIndex]. + unregisterClientInterest(regionName, keyOfInterest, interestType); + } + // always unregister non durable interest + this.cils[RegisterInterestTracker.interestListIndex]. + unregisterClientInterest(regionName, keyOfInterest, interestType); + } + + /** + * Registers interest in the input region name and list of keys + * + * @param regionName + * The fully-qualified name of the region in which to register + * interest + * @param keysOfInterest + * The list of keys in which to register interest + */ + protected void registerClientInterestList(String regionName, + List keysOfInterest, boolean isDurable, boolean sendUpdatesAsInvalidates, + boolean flushState) + { + // we only use two interest lists to map the non-durable and durable + // identifiers to their interest settings + ClientInterestList cil = + this.cils[RegisterInterestTracker.getInterestLookupIndex( + isDurable, false/*sendUpdatesAsInvalidates*/)]; + cil.registerClientInterestList(regionName, keysOfInterest, sendUpdatesAsInvalidates); + if (getHARegionQueue() != null) { + if (flushState) { + flushForInterestRegistration(regionName, this._cache.getDistributedSystem().getDistributedMember()); + } + getHARegionQueue().setHasRegisteredInterest(true); + } + } + + /** + * Unregisters interest in the input region name and list of keys + * + * @param regionName + * The fully-qualified name of the region in which to unregister + * interest + * @param keysOfInterest + * The list of keys in which to unregister interest + * @param isClosing + * Whether the caller is closing + */ + protected void unregisterClientInterest(String regionName, + List keysOfInterest, boolean isClosing) + { + // only unregister durable interest if isClosing and !keepalive + if (!isClosing /* explicit unregister */ + || !getDurableKeepAlive() /* close and no keepAlive*/) { + this.cils[RegisterInterestTracker.durableInterestListIndex]. + unregisterClientInterestList(regionName, keysOfInterest); + } + // always unregister non durable interest + this.cils[RegisterInterestTracker.interestListIndex]. + unregisterClientInterestList(regionName, keysOfInterest); + } + + + /** sent by the cache client notifier when there is an interest registration change */ + protected void processInterestMessage(ClientInterestMessageImpl message) { + int interestType = message.getInterestType(); + String regionName = message.getRegionName(); + Object key = message.getKeyOfInterest(); + if (message.isRegister()) { + // Register interest in this region->key + if (key instanceof List) { + registerClientInterestList(regionName, (List) key, + message.getIsDurable(), message.getForUpdatesAsInvalidates(), true); + } else { + registerClientInterest(regionName, key, interestType, + message.getIsDurable(), message.getForUpdatesAsInvalidates(), true); + } + + // Add the client to the region's filters + //addFilterRegisteredClients(regionName, key); + + if (logger.isDebugEnabled()) { + StringBuffer buffer = new StringBuffer(); + buffer + .append(this) + .append(": Interest listener registered ") + .append(message.getIsDurable() ? "" : "non-") + .append("durable interest in ") + .append(message.getRegionName()) + .append("->") + .append(message.getKeyOfInterest()) + .append("->") + .append(InterestType.getString(message.getInterestType())); + logger.debug(buffer.toString()); + } + } else { + // Unregister interest in this region->key + if (key instanceof List) { + unregisterClientInterest(regionName, (List) key, false); + } else { + unregisterClientInterest(regionName, key, interestType, false); + } + + if (logger.isDebugEnabled()) { + StringBuffer buffer = new StringBuffer(); + buffer + .append(this) + .append(": Interest listener unregistered interest in ") + .append(message.getRegionName()) + .append("->") + .append(message.getKeyOfInterest()) + .append("->") + .append(InterestType.getString(message.getInterestType())); + logger.debug(buffer.toString()); + } + } + } + + private boolean postDeliverAuthCheckPassed(ClientUpdateMessage clientMessage) { + // Before adding it in the queue for dispatching, check for post + // process authorization + if (AcceptorImpl.isAuthenticationRequired() + && this.postAuthzCallback == null + && AcceptorImpl.isPostAuthzCallbackPresent()) { + // security is on and callback is null: it means multiuser mode. + ClientUpdateMessageImpl cumi = (ClientUpdateMessageImpl)clientMessage; + + CqNameToOp clientCq = cumi.getClientCq(this.proxyID); + + if (clientCq != null && !clientCq.isEmpty()) { + if (logger.isDebugEnabled()) { + logger.debug("CCP clientCq size before processing auth {}", clientCq.size()); + } + String[] regionNameHolder = new String[1]; + OperationContext opctxt = getOperationContext(clientMessage, + regionNameHolder); + if (opctxt == null) { + logger.warn(LocalizedMessage.create( + LocalizedStrings.CacheClientProxy__0_NOT_ADDING_MESSAGE_TO_QUEUE_1_BECAUSE_THE_OPERATION_CONTEXT_OBJECT_COULD_NOT_BE_OBTAINED_FOR_THIS_CLIENT_MESSAGE, + new Object[] {this, clientMessage})); + return false; + } + + String[] cqNames = clientCq.getNames(); + if (logger.isDebugEnabled()) { + logger.debug("CCP clientCq names array size {}", cqNames.length); + } + for (int i = 0; i < cqNames.length; i++) { + try { + if (logger.isDebugEnabled()) { + logger.debug("CCP clientCq name {}", cqNames[i]); + } + boolean isAuthorized = false; + + if (this.proxyID.isDurable() && this.getDurableKeepAlive() + && this._isPaused) { + // need to take lock as we may be reinitializing proxy cache + synchronized (this.clientUserAuthsLock) { + AuthorizeRequestPP postAuthCallback = this.clientUserAuths + .getUserAuthAttributes(cqNames[i]).getPostAuthzRequest(); + if (logger.isDebugEnabled() && postAuthCallback == null) { + logger.debug("CCP clientCq post callback is null"); + } + if (postAuthCallback != null && postAuthCallback + .getPostAuthzCallback().authorizeOperation( + regionNameHolder[0], opctxt)) { + isAuthorized = true; + } + } + } else { + UserAuthAttributes userAuthAttributes = this.clientUserAuths + .getUserAuthAttributes(cqNames[i]); + + AuthorizeRequestPP postAuthCallback = userAuthAttributes + .getPostAuthzRequest(); + if (postAuthCallback == null && logger.isDebugEnabled()) { + logger.debug("CCP clientCq post callback is null"); + } + if (postAuthCallback != null && postAuthCallback + .getPostAuthzCallback().authorizeOperation( + regionNameHolder[0], opctxt)) { + isAuthorized = true; + } + } + + if (!isAuthorized) { + logger.warn(LocalizedMessage.create( + LocalizedStrings.CacheClientProxy__0_NOT_ADDING_CQ_MESSAGE_TO_QUEUE_1_BECAUSE_AUTHORIZATION_FAILED, + new Object[] {this, clientMessage})); + clientCq.delete(cqNames[i]); + } + } catch (Exception ex) { + // ignore... + } + if (logger.isDebugEnabled()) { + logger.debug("CCP clientCq size after processing auth {}", clientCq.size()); + } + } + // again need to check as there may be no CQ available + if (!clientMessage.hasCqs(this.proxyID)) { + this._statistics.incMessagesNotQueuedNotInterested(); + if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) { + logger.debug("{}: Not adding message to queue. It is not interested in this region and key: {}", clientMessage); + } + return false; + } + } + } + else if (this.postAuthzCallback != null) { + String[] regionNameHolder = new String[1]; + boolean isAuthorize = false; + OperationContext opctxt = getOperationContext(clientMessage, + regionNameHolder); + if (opctxt == null) { + logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy__0_NOT_ADDING_MESSAGE_TO_QUEUE_1_BECAUSE_THE_OPERATION_CONTEXT_OBJECT_COULD_NOT_BE_OBTAINED_FOR_THIS_CLIENT_MESSAGE, new Object[] {this, clientMessage})); + return false; + } + if (logger.isTraceEnabled()){ + logger.trace("{}: Invoking authorizeOperation for message: {}", this, clientMessage); + } + + if (this.proxyID.isDurable() && this.getDurableKeepAlive() + && this._isPaused) { + synchronized (this.clientUserAuthsLock) { + isAuthorize = this.postAuthzCallback.authorizeOperation( + regionNameHolder[0], opctxt); + } + } else { + isAuthorize = this.postAuthzCallback.authorizeOperation( + regionNameHolder[0], opctxt); + } + if (!isAuthorize) { + logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy__0_NOT_ADDING_MESSAGE_TO_QUEUE_1_BECAUSE_AUTHORIZATION_FAILED, new Object[] {this, clientMessage})); + return false; + } + } + + return true; + } + + /** + * Delivers the message to the client representing this client proxy. + * @param conflatable + */ + protected void deliverMessage(Conflatable conflatable) + { + ClientUpdateMessage clientMessage = null; + if(conflatable instanceof HAEventWrapper) { + clientMessage = ((HAEventWrapper)conflatable).getClientUpdateMessage(); + } else { + clientMessage = (ClientUpdateMessage)conflatable; + } + + this._statistics.incMessagesReceived(); + + if (clientMessage.needsNoAuthorizationCheck() || postDeliverAuthCheckPassed(clientMessage)) { + // If dispatcher is getting initialized, add the event to temporary queue. + if (this.messageDispatcherInit) { + synchronized (this.queuedEventsSync) { + if (this.messageDispatcherInit) { // Check to see value did not changed while getting the synchronize lock. + if (logger.isDebugEnabled()) { + logger.debug("Message dispatcher for proxy {} is getting initialized. Adding message to the queuedEvents.", this); + } + this.queuedEvents.add(conflatable); + return; + } + } + } + + if (this._messageDispatcher != null) { + this._messageDispatcher.enqueueMessage(conflatable); + } else { + this._statistics.incMessagesFailedQueued(); + if (logger.isDebugEnabled()) { + logger.debug("Message is not added to the queue. Message dispatcher for proxy: {} doesn't exist.", this); + } + } + } else { + this._statistics.incMessagesFailedQueued(); + } + } + + protected void sendMessageDirectly(ClientMessage message) { + // Send the message directly if the connection exists + // (do not go through the queue). + if (logger.isDebugEnabled()){ + logger.debug("About to send message directly to {}", this); + } + if (this._messageDispatcher != null && this._socket != null && !this._socket.isClosed()) { + // If the socket is open, send the message to it + this._messageDispatcher.sendMessageDirectly(message); + if (logger.isDebugEnabled()){ + logger.debug("Sent message directly to {}", this); + } + } else { + // Otherwise just reset the ping counter + resetPingCounter(); + if (logger.isDebugEnabled()){ + logger.debug("Skipped sending message directly to {}", this); + } + } + } + + private OperationContext getOperationContext(ClientMessage cmsg, + String[] regionNameHolder) { + ClientUpdateMessageImpl cmsgimpl = (ClientUpdateMessageImpl)cmsg; + OperationContext opctxt = null; + // TODO SW: Special handling for DynamicRegions; this should be reworked + // when DynamicRegion API is deprecated + String regionName = cmsgimpl.getRegionName(); + regionNameHolder[0] = regionName; + if (cmsgimpl.isCreate()) { + if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { + regionNameHolder[0] = (String)cmsgimpl.getKeyOfInterest(); + opctxt = new RegionCreateOperationContext(true); + } + else { + PutOperationContext tmp = new PutOperationContext(cmsgimpl.getKeyOfInterest(), cmsgimpl + .getValue(), cmsgimpl.valueIsObject(), PutOperationContext.CREATE, + true); + tmp.setCallbackArg(cmsgimpl.getCallbackArgument()); + opctxt = tmp; + } + } + else if (cmsgimpl.isUpdate()) { + if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { + regionNameHolder[0] = (String)cmsgimpl.getKeyOfInterest(); + opctxt = new RegionCreateOperationContext(true); + } + else { + PutOperationContext tmp = new PutOperationContext(cmsgimpl.getKeyOfInterest(), cmsgimpl + .getValue(), cmsgimpl.valueIsObject(), PutOperationContext.UPDATE, + true); + tmp.setCallbackArg(cmsgimpl.getCallbackArgument()); + opctxt = tmp; + } + } + else if (cmsgimpl.isDestroy()) { + if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { + regionNameHolder[0] = (String)cmsgimpl.getKeyOfInterest(); + opctxt = new RegionDestroyOperationContext(true); + } + else { + DestroyOperationContext tmp = new DestroyOperationContext(cmsgimpl.getKeyOfInterest(), true); + tmp.setCallbackArg(cmsgimpl.getCallbackArgument()); + opctxt = tmp; + } + } + else if (cmsgimpl.isDestroyRegion()) { + opctxt = new RegionDestroyOperationContext(true); + } + else if (cmsgimpl.isInvalidate()) { + InvalidateOperationContext tmp = new InvalidateOperationContext(cmsgimpl.getKeyOfInterest(), true); + tmp.setCallbackArg(cmsgimpl.getCallbackArgument()); + opctxt = tmp; + } + else if (cmsgimpl.isClearRegion()) { + RegionClearOperationContext tmp = new RegionClearOperationContext(true); + tmp.setCallbackArg(cmsgimpl.getCallbackArgument()); + opctxt = tmp; + } + return opctxt; + } + + /** + * Initializes the message dispatcher thread. The + * MessageDispatcher processes the message queue. + * + * @throws CacheException + */ + public void initializeMessageDispatcher() throws CacheException + { + this.messageDispatcherInit = true; // Initialization process. + try { + if (logger.isDebugEnabled()) { + logger.debug("{}: Initializing message dispatcher with capacity of {} entries", this, _maximumMessageCount); + } + String name = "Client Message Dispatcher for " + + getProxyID().getDistributedMember() + (isDurable()? " (" + getDurableId()+")" : ""); + this._messageDispatcher = new MessageDispatcher(this, name); + + //Fix for 41375 - drain as many of the queued events + //as we can without synchronization. + if (logger.isDebugEnabled()) { + logger.debug("{} draining {} events from init queue into intialized queue", this, this.queuedEvents.size()); + } + Conflatable nextEvent; + while((nextEvent = queuedEvents.poll()) != null) { + this._messageDispatcher.enqueueMessage(nextEvent); + } + + //Now finish emptying the queue with synchronization to make + //sure we don't miss any events. + synchronized (this.queuedEventsSync){ + while((nextEvent = queuedEvents.poll()) != null) { + this._messageDispatcher.enqueueMessage(nextEvent); + } + + this.messageDispatcherInit = false; // Done initialization. + } + } finally { + if (this.messageDispatcherInit) { // If its not successfully completed. + this._statistics.close(); + } + } + } + + protected void startOrResumeMessageDispatcher(boolean processedMarker) { + // Only start or resume the dispatcher if it is Primary + if (this.isPrimary) { + // Add the marker to the queue + if (!processedMarker) { + EventID eventId = new EventID(this._cache.getDistributedSystem()); + this._messageDispatcher.enqueueMarker(new ClientMarkerMessageImpl(eventId)); + } + + // Set the message queue to primary. + this._messageDispatcher._messageQueue.setPrimary(true); + + // Start or resume the dispatcher + synchronized (this._messageDispatcher._pausedLock) { + if (this.isPaused()) { + // It is paused, resume it + this.setPaused(false); + if (this._messageDispatcher.isStopped()) { + if (logger.isDebugEnabled()) { + logger.debug("{}: Starting dispatcher", this); + } + this._messageDispatcher.start(); + } + else { + // ARB: Initialize transient fields. + this._messageDispatcher.initializeTransients(); + if (logger.isDebugEnabled()) { + logger.debug("{}: Resuming dispatcher", this); + } + this._messageDispatcher.resumeDispatching(); + } + } else if (!this._messageDispatcher.isAlive()) { + if (logger.isDebugEnabled()) { + logger.debug("{}: Starting dispatcher", this); + } + this._messageDispatcher.start(); + } + } + } + } + + /* + * Returns whether the client represented by this CacheClientProxy + * has registered interest in anything. @return whether the client + * represented by this CacheClientProxy has registered interest + * in anything + */ + protected boolean hasRegisteredInterested() + { + return + this.cils[RegisterInterestTracker.interestListIndex].hasInterest() || + this.cils[RegisterInterestTracker.durableInterestListIndex].hasInterest(); + } + + /** + * Returns a string representation of the proxy + */ + @Override + public String toString() + { + StringBuffer buffer = new StringBuffer(); + buffer.append("CacheClientProxy[") + // .append("client proxy id=") + .append(this.proxyID) + // .append("; client host name=") + // .append(this._socket.getInetAddress().getCanonicalHostName()) + // .append("; client host address=") + // .append(this._remoteHostAddress) + .append("; port=").append(this._socket.getPort()) + .append("; primary=").append(isPrimary) + .append("; version=").append(clientVersion) + .append("]"); + return buffer.toString(); + } + + public String getState(){ + StringBuffer buffer = new StringBuffer(); + buffer.append("CacheClientProxy[") + // .append("client proxy id=") + .append(this.proxyID) + // .append("; client host name=") + // .append(this._socket.getInetAddress().getCanonicalHostName()) + // .append("; client host address=") + // .append(this._remoteHostAddress) + .append("; port=").append(this._socket.getPort()) + .append("; primary=").append(isPrimary) + .append("; version=").append(clientVersion) + .append("; paused=").append(isPaused()) + .append("; alive=").append(isAlive()) + .append("; connected=").append(isConnected()) + .append("; isMarkedForRemoval=").append(isMarkedForRemoval) + .append("]"); + return buffer.toString(); + } + + public boolean isPrimary() + { + //boolean primary = this._messageDispatcher.isAlive() + // || this._messageDispatcher._messageQueue.isPrimary(); + boolean primary = this.isPrimary; + //System.out.println(this + ": DISPATCHER IS ALIVE: " + this._messageDispatcher.isAlive()); + //System.out.println(this + ": DISPATCHER QUEUE IS PRIMARY: " + this._messageDispatcher._messageQueue.isPrimary()); + //System.out.println(this + ": IS PRIMARY: " + primary); + return primary; + // return this.isPrimary ; + } + + protected boolean basicIsPrimary() { + return this.isPrimary; + } + + protected void setPrimary(boolean isPrimary) { + this.isPrimary = isPrimary; + } + + // private static int nextId = 0; + // static protected int getNextId() { + // synchronized (CacheClientProxy.class) { + // return ++nextId; + // } + // } + /* + * Return this client's HA region queue + * @returns - HARegionQueue of the client + */ + public HARegionQueue getHARegionQueue() { + if (this._messageDispatcher != null){ + return _messageDispatcher._messageQueue; + } + return null; + } + + + /** + * Reinitialize a durable CacheClientProxy with a new client. + * @param socket + * The socket between the server and the client + * @param ip + * whether this proxy represents the primary + */ + protected void reinitialize(Socket socket, ClientProxyMembershipID proxyId, + Cache cache, boolean ip, byte cc, Version ver) { + // Re-initialize transient fields + initializeTransientFields(socket, proxyId, ip, cc, ver); + getCacheClientNotifier().getAcceptorStats().incCurrentQueueConnections(); + + + // Cancel expiration task + cancelDurableExpirationTask(true); + + // Set the message dispatcher's primary flag. This could go from primary + // to secondary + this._messageDispatcher._messageQueue.setPrimary(ip); + this._messageDispatcher._messageQueue.setClientConflation(cc); + + reinitializeClientAuths(); + this.creationDate = new Date(); + if (logger.isDebugEnabled()) { + logger.debug("{}: Has been reinitialized", this); + } + } + + protected boolean isDurable() { + return getProxyID().isDurable(); + } + + protected String getDurableId() { + return getProxyID().getDurableId(); + } + + protected int getDurableTimeout() { + return getProxyID().getDurableTimeout(); + } + + private boolean getDurableKeepAlive() { + return this.keepalive; + } + + protected String getHARegionName() { + return getProxyID().getHARegionName(); + } + + public Region getHARegion() { + return this._messageDispatcher._messageQueue.getRegion(); + } + + public Version getVersion() { + return this.clientVersion; + } + + protected void scheduleDurableExpirationTask() { + SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() { + @Override + public void run2() { + _durableExpirationTask.compareAndSet(this, null); + logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0__THE_EXPIRATION_TASK_HAS_FIRED_SO_THIS_PROXY_IS_BEING_TERMINATED, CacheClientProxy.this)); + // Remove the proxy from the CacheClientNofier's registry + getCacheClientNotifier().removeClientProxy(CacheClientProxy.this); + getCacheClientNotifier().durableClientTimedOut(CacheClientProxy.this.proxyID); + + // Close the proxy + terminateDispatching(false); + _cacheClientNotifier._statistics.incQueueDroppedCount(); + + /** + * Setting the expiration task to null again and cancelling existing + * one, if any. See #50894. + *

+ * The message dispatcher may again set the expiry task in below path: + * + * com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy.scheduleDurableExpirationTask(CacheClientProxy.java:2020) + * com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy.pauseDispatching(CacheClientProxy.java:924) + * com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy$MessageDispatcher.pauseOrUnregisterProxy(CacheClientProxy.java:2813) + * com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy$MessageDispatcher.run(CacheClientProxy.java:2692) + * + *

+ * This is because message dispatcher may get an IOException with + * "Proxy closing due to socket being closed locally" during/after + * terminateDispatching(false) above. + */ + Object task = _durableExpirationTask.getAndSet(null); + if (task != null) { + ((SystemTimerTask)task).cancel(); + } + } + + }; + if(this._durableExpirationTask.compareAndSet(null, task)) { + _cache.getCCPTimer().schedule(task, + getDurableTimeout()*1000L); + } + } + + protected void cancelDurableExpirationTask(boolean logMessage) { + SystemTimer.SystemTimerTask task = (SystemTimerTask) _durableExpirationTask.getAndSet(null); + if (task != null) { + if (logMessage) { + logger.info(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_CANCELLING_EXPIRATION_TASK_SINCE_THE_CLIENT_HAS_RECONNECTED, this)); + } + task.cancel(); + } + } + + /** + * Class ClientInterestList provides a convenient interface + * for manipulating client interest information. + */ + static protected class ClientInterestList + { + + final CacheClientProxy ccp; + + final Object id; + + /** + * An object used for synchronizing the interest lists + */ + final private Object interestListLock = new Object(); + + /** + * Regions that this client is interested in + */ + final protected Set regions = new HashSet(); + + /** + * Constructor. + */ + protected ClientInterestList(CacheClientProxy ccp, Object interestID) { + this.ccp = ccp; + this.id = interestID; + // this.id = getNextId(); + } + + /** + * Registers interest in the input region name and key + */ + protected void registerClientInterest(String regionName, + Object keyOfInterest, int interestType, boolean sendUpdatesAsInvalidates) + { + if (logger.isDebugEnabled()) { + logger.debug("{}: registerClientInterest region={} key={}", ccp, regionName, keyOfInterest); + } + Set keysRegistered = null; + synchronized(this.interestListLock) { + LocalRegion r = (LocalRegion)this.ccp._cache.getRegion(regionName, true); + if (r == null) { + throw new RegionDestroyedException("Region could not be found for interest registration", regionName); + } + if ( ! (r instanceof CacheDistributionAdvisee) ) { + throw new IllegalArgumentException("region " + regionName + " is not distributed and does not support interest registration"); + } + FilterProfile p = r.getFilterProfile(); + keysRegistered = p.registerClientInterest(id, keyOfInterest, interestType, sendUpdatesAsInvalidates); + regions.add(regionName); + } + // Perform actions if any keys were registered + if ((keysRegistered != null) && containsInterestRegistrationListeners() + && !keysRegistered.isEmpty()) { + handleInterestEvent(regionName, keysRegistered, interestType, true); + } + } + + + protected FilterProfile getProfile(String regionName) { + try { + return this.ccp._cache.getFilterProfile(regionName); + } catch (CacheClosedException e) { + return null; + } + } + + /** + * Unregisters interest in the input region name and key + * + * @param regionName + * The fully-qualified name of the region in which to unregister + * interest + * @param keyOfInterest + * The key in which to unregister interest + */ + protected void unregisterClientInterest(String regionName, + Object keyOfInterest, int interestType) + { + if (logger.isDebugEnabled()) { + logger.debug("{}: unregisterClientInterest region={} key={}", ccp, regionName, keyOfInterest); + } + FilterProfile p = getProfile(regionName); + Set keysUnregistered = null; + synchronized(this.interestListLock) { + if (p != null) { + keysUnregistered = p.unregisterClientInterest( + id, keyOfInterest, interestType); + if (!p.hasInterestFor(id)) { + this.regions.remove(regionName); + } + } else { + this.regions.remove(regionName); + } + } + if (keysUnregistered != null && !keysUnregistered.isEmpty()) { + handleInterestEvent(regionName, keysUnregistered, interestType, false); + } + } + + /** + * Registers interest in the input region name and list of keys + * + * @param regionName + * The fully-qualified name of the region in which to register + * interest + * @param keysOfInterest + * The list of keys in which to register interest + */ + protected void registerClientInterestList(String regionName, + List keysOfInterest, boolean sendUpdatesAsInvalidates) { + FilterProfile p = getProfile(regionName); + if (p == null) { + throw new RegionDestroyedException("Region not found during client interest registration", regionName); + } + Set keysRegistered = null; + synchronized(this.interestListLock) { + keysRegistered = p.registerClientInterestList(id, keysOfInterest, sendUpdatesAsInvalidates); + regions.add(regionName); + } + // Perform actions if any keys were registered + if (containsInterestRegistrationListeners() && !keysRegistered.isEmpty()) { + handleInterestEvent(regionName, keysRegistered, InterestType.KEY, true); + } + } + + /** + * Unregisters interest in the input region name and list of keys + * + * @param regionName + * The fully-qualified name of the region in which to unregister + * interest + * @param keysOfInterest + * The list of keys in which to unregister interest + */ + protected void unregisterClientInterestList(String regionName, + List keysOfInterest) + { + FilterProfile p = getProfile(regionName); + Set keysUnregistered = null; + synchronized(this.interestListLock) { + if (p != null) { + keysUnregistered = p.unregisterClientInterestList( + id, keysOfInterest); + if (!p.hasInterestFor(id)) { + regions.remove(regionName); + } + } else { + regions.remove(regionName); + } + } + // Perform actions if any keys were unregistered + if (!keysUnregistered.isEmpty()) { + handleInterestEvent(regionName, keysUnregistered, InterestType.KEY,false); + } + } + + /* + * Returns whether this interest list has any keys, patterns or filters of + * interest. It answers the question: Are any clients being notified because + * of this interest list? @return whether this interest list has any keys, + * patterns or filters of interest + */ + protected boolean hasInterest() { + return regions.size() > 0; + } + + protected void clearClientInterestList() { + boolean isClosed = ccp.getCache().isClosed(); + + synchronized(this.interestListLock) { + for (String regionName: regions) { + FilterProfile p = getProfile(regionName); + if (p == null) { + continue; + } + if (!isClosed) { + if (p.hasAllKeysInterestFor(id)) { + Set allKeys = new HashSet(); + allKeys.add(".*"); + allKeys = Collections.unmodifiableSet(allKeys); + handleInterestEvent(regionName, allKeys, + InterestType.REGULAR_EXPRESSION, false); + } + Set keysOfInterest = p.getKeysOfInterestFor(id); + if (keysOfInterest != null && keysOfInterest.size() > 0) { + handleInterestEvent(regionName, keysOfInterest, + InterestType.KEY, false); + } + Map patternsOfInterest = + p.getPatternsOfInterestFor(id); + if (patternsOfInterest != null && patternsOfInterest.size() > 0) { + handleInterestEvent(regionName, patternsOfInterest.keySet(), + InterestType.REGULAR_EXPRESSION, false); + } + } + p.clearInterestFor(id); + } + regions.clear(); + } + } + + + private void handleInterestEvent(String regionName, Set keysOfInterest, + int interestType, boolean isRegister) { + // Notify the region about this register interest event if: + // - the application has requested it + // - this is a primary CacheClientProxy (otherwise multiple notifications + // may occur) + // - it is a key interest type (regex is currently not supported) + InterestRegistrationEvent event = null; + if (NOTIFY_REGION_ON_INTEREST && this.ccp.isPrimary() + && interestType == InterestType.KEY) { + event = new InterestRegistrationEventImpl(this.ccp, regionName, + keysOfInterest, interestType, isRegister); + try { + notifyRegionOfInterest(event); + } + catch (Exception e) { + logger.warn(LocalizedStrings.CacheClientProxy_REGION_NOTIFICATION_OF_INTEREST_FAILED, e); + } + } + // Invoke interest registration listeners + if (containsInterestRegistrationListeners()) { + if (event == null) { + event = new InterestRegistrationEventImpl(this.ccp, regionName, + keysOfInterest, interestType, isRegister); + } + notifyInterestRegistrationListeners(event); + } + } + + private void notifyRegionOfInterest(InterestRegistrationEvent event) { + this.ccp.getCacheClientNotifier().handleInterestEvent(event); + } + + private void notifyInterestRegistrationListeners( + InterestRegistrationEvent event) { + this.ccp.getCacheClientNotifier().notifyInterestRegistrationListeners( + event); + } + + private boolean containsInterestRegistrationListeners() { + return this.ccp.getCacheClientNotifier() + .containsInterestRegistrationListeners(); + } + } + + + /** + * Class MessageDispatcher is a Thread that + * processes messages bound for the client by taking messsages from the + * message queue and sending them to the client over the socket. + */ + static class MessageDispatcher extends Thread + { + + /** + * The queue of messages to be sent to the client + */ + protected final HARegionQueue _messageQueue; + + // /** + // * An int used to keep track of the number of messages dropped for logging + // * purposes. If greater than zero then a warning has been logged about + // * messages being dropped. + // */ + // private int _numberOfMessagesDropped = 0; + + /** + * The proxy for which this dispatcher is processing messages + */ + private final CacheClientProxy _proxy; + + // /** + // * The conflator faciliates message conflation + // */ + // protected BridgeEventConflator _eventConflator; + + /** + * Whether the dispatcher is stopped + */ + private volatile boolean _isStopped = true; + + /** + * @guarded.By _pausedLock + */ + //boolean _isPausedDispatcher = false; + + /** + * A lock object used to control pausing this dispatcher + */ + protected final Object _pausedLock = new Object(); + + /** + * An object used to protect when dispatching is being stopped. + */ + private final Object _stopDispatchingLock = new Object(); + + private final ReadWriteLock socketLock = new ReentrantReadWriteLock(); + + private final Lock socketWriteLock = socketLock.writeLock(); + // /** + // * A boolean verifying whether a warning has already been issued if the + // * message queue has reached its capacity. + // */ + // private boolean _messageQueueCapacityReachedWarning = false; + + /** + * Constructor. + * + * @param proxy + * The CacheClientProxy for which this dispatcher is + * processing messages + * @param name thread name for this dispatcher +