Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4195D200C4D for ; Wed, 5 Apr 2017 19:25:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 401DD160B76; Wed, 5 Apr 2017 17:25:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 194F4160BA3 for ; Wed, 5 Apr 2017 19:25:51 +0200 (CEST) Received: (qmail 36145 invoked by uid 500); 5 Apr 2017 17:25:51 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 36026 invoked by uid 99); 5 Apr 2017 17:25:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Apr 2017 17:25:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D21C6E00AF; Wed, 5 Apr 2017 17:25:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: klund@apache.org To: commits@geode.apache.org Date: Wed, 05 Apr 2017 17:25:51 -0000 Message-Id: <6ae40768640f479895e3497f50164ca7@git.apache.org> In-Reply-To: <6b795d17ad95463d9c249bc63310e671@git.apache.org> References: <6b795d17ad95463d9c249bc63310e671@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] geode git commit: WIP refactoring archived-at: Wed, 05 Apr 2017 17:25:54 -0000 http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java index d7b923c..5370e2f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java @@ -43,7 +43,6 @@ import org.apache.geode.CancelException; import org.apache.geode.DataSerializer; import org.apache.geode.StatisticsFactory; import org.apache.geode.cache.Cache; -import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.ClientSession; import org.apache.geode.cache.DynamicRegionFactory; @@ -119,155 +118,128 @@ public class CacheClientProxy implements ClientSession { private static final Logger logger = LogService.getLogger(); /** - * The socket between the server and the client - */ - protected Socket _socket; - - private final AtomicBoolean _socketClosed = new AtomicBoolean(); - - /** - * A communication buffer used by each message we send to the client - */ - protected ByteBuffer _commBuffer; - - /** - * The remote host's IP address string (cached for convenience) - */ - protected String _remoteHostAddress; - - /** - * Concurrency: protected by synchronization of {@link #isMarkedForRemovalLock} + * Notify the region when a client interest registration occurs. This tells the region to update + * access time when an update is to be pushed to a client. It is enabled only for + * PartitionedRegions currently. */ - protected volatile boolean isMarkedForRemoval = false; + private static final boolean NOTIFY_REGION_ON_INTEREST = + Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "updateAccessTimeOnClientInterest"); /** - * @see #isMarkedForRemoval + * The number of times to peek on shutdown before giving up and shutting down */ - protected final Object isMarkedForRemovalLock = new Object(); + private static final int MAXIMUM_SHUTDOWN_PEEKS = Integer + .getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAXIMUM_SHUTDOWN_PEEKS", 50); /** - * The proxy id of the client represented by this proxy + * Default value for slow starting time of dispatcher */ - protected ClientProxyMembershipID proxyID; + private static final long DEFAULT_SLOW_STARTING_TIME = 5000; /** - * The GemFire cache + * Key in the system property from which the slow starting time value will be retrieved */ - protected final GemFireCacheImpl _cache; + private static final String KEY_SLOW_START_TIME_FOR_TESTING = "slowStartTimeForTesting"; /** - * The list of keys that the client represented by this proxy is interested in (stored by region) + * TODO: delete this and rewrite the tests that use this + * NOTE: this is NOT thread safe */ - protected final ClientInterestList[] cils = new ClientInterestList[2]; + private static TestHook testHook; /** - * A thread that dispatches messages to the client + * TODO: delete this and rewrite the test that uses this + * A debug flag used for testing Backward compatibility */ - protected volatile MessageDispatcher _messageDispatcher; + private static boolean afterMessageCreationForTesting = false; /** - * The statistics for this proxy + * TODO: delete this and rewrite the test that uses this + * for testing purposes, delays the start of the dispatcher thread */ - protected final CacheClientProxyStats _statistics; + private static boolean isSlowStartForTesting = false; - protected final AtomicReference _durableExpirationTask = new AtomicReference(); - - protected SystemTimer durableTimer; + private final AtomicBoolean socketClosed = new AtomicBoolean(); /** - * Whether this dispatcher is paused + * @see #isMarkedForRemoval */ - protected volatile boolean _isPaused = true; + private final Object isMarkedForRemovalLock = new Object(); /** - * True if we are connected to a client. - */ - private volatile boolean connected = false; - // /** - // * A string representing interest in all keys - // */ - // protected static final String ALL_KEYS = "ALL_KEYS"; - // - /** - * True if a marker message is still in the ha queue. + * The GemFire cache */ - private boolean markerEnqueued = false; + private final GemFireCacheImpl cache; /** - * The number of times to peek on shutdown before giving up and shutting down + * The list of keys that the client represented by this proxy is interested in (stored by region) */ - protected static final int MAXIMUM_SHUTDOWN_PEEKS = Integer - .getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAXIMUM_SHUTDOWN_PEEKS", 50).intValue(); + private final ClientInterestList[] cils = new ClientInterestList[2]; /** - * The number of milliseconds to wait for an offering to the message queue + * The statistics for this proxy */ - protected static final int MESSAGE_OFFER_TIME = 0; + private final CacheClientProxyStats _statistics; - /** - * The default maximum message queue size - */ - // protected static final int MESSAGE_QUEUE_SIZE_DEFAULT = 230000; + private final AtomicReference _durableExpirationTask = new AtomicReference(); /** The message queue size */ - protected final int _maximumMessageCount; + private final int _maximumMessageCount; /** * The time (in seconds ) after which a message in the client queue will expire. */ - protected final int _messageTimeToLive; + private final int _messageTimeToLive; /** * The CacheClientNotifier registering this proxy. */ - protected final CacheClientNotifier _cacheClientNotifier; + private final CacheClientNotifier cacheClientNotifier; - /** - * Defaults to true; meaning do some logging of dropped client notification messages. Set the - * system property to true to cause dropped messages to NOT be logged. - */ - protected static final boolean LOG_DROPPED_MSGS = - !Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "disableNotificationWarnings"); + private final Object clientUserAuthsLock = new Object(); /** - * for testing purposes, delays the start of the dispatcher thread + * The AcceptorImpl identifier to which the proxy is connected. */ - public static boolean isSlowStartForTesting = false; + private final long _acceptorId; - /** - * Default value for slow starting time of dispatcher - */ - private static final long DEFAULT_SLOW_STARTING_TIME = 5000; + /** acceptor's setting for notifyBySubscription */ + private final boolean notifyBySubscription; + + private final Object queuedEventsSync = new Object(); /** - * Key in the system property from which the slow starting time value will be retrieved + * A counter that keeps track of how many task iterations that have occurred since the last ping + * or message. The {@linkplain CacheClientNotifier#scheduleClientPingTask ping task} increments + * it. Normal messages sent to the client reset it. If the counter reaches 3, a ping is sent. */ - private static final String KEY_SLOW_START_TIME_FOR_TESTING = "slowStartTimeForTesting"; + private final AtomicInteger pingCounter = new AtomicInteger(); - private boolean isPrimary; + private final Object drainLock = new Object(); - /** @since GemFire 5.7 */ - protected byte clientConflation = HandShake.CONFLATION_DEFAULT; + private final Object drainsInProgressLock = new Object(); + + private final SecurityService securityService; /** - * Flag to indicate whether to keep a durable client's queue alive + * Concurrency: protected by synchronization of {@link #isMarkedForRemovalLock} */ - boolean keepalive = false; - - private AccessControl postAuthzCallback; - private Subject subject; + private volatile boolean isMarkedForRemoval = false; /** - * For multiuser environment.. + * A thread that dispatches messages to the client */ - private ClientUserAuths clientUserAuths; + private volatile MessageDispatcher _messageDispatcher; - private final Object clientUserAuthsLock = new Object(); + /** + * Whether this dispatcher is paused + */ + private volatile boolean _isPaused = true; /** - * The version of the client + * True if we are connected to a client. */ - private Version clientVersion; + private volatile boolean connected = false; /** * A map of region name as key and integer as its value. Basically, it stores the names of the @@ -278,42 +250,60 @@ public class CacheClientProxy implements ClientSession { */ private volatile Map regionsWithEmptyDataPolicy = new HashMap(); + /** To queue the events arriving during message dispatcher initialization */ + private volatile ConcurrentLinkedQueue queuedEvents = + new ConcurrentLinkedQueue(); + + private volatile boolean messageDispatcherInit = false; + /** - * A debug flag used for testing Backward compatibility + * The socket between the server and the client */ - public static boolean AFTER_MESSAGE_CREATION_FLAG = false; + private Socket socket; /** - * Notify the region when a client interest registration occurs. This tells the region to update - * access time when an update is to be pushed to a client. It is enabled only for - * PartitionedRegions currently. + * A communication buffer used by each message we send to the client */ - protected static final boolean NOTIFY_REGION_ON_INTEREST = - Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "updateAccessTimeOnClientInterest"); + private ByteBuffer _commBuffer; /** - * The AcceptorImpl identifier to which the proxy is connected. + * The remote host's IP address string (cached for convenience) */ - private final long _acceptorId; + private String _remoteHostAddress; - /** acceptor's setting for notifyBySubscription */ - private final boolean notifyBySubscription; + /** + * The proxy id of the client represented by this proxy + */ + private ClientProxyMembershipID proxyID; - /** To queue the events arriving during message dispatcher initialization */ - private volatile ConcurrentLinkedQueue queuedEvents = - new ConcurrentLinkedQueue(); + /** + * True if a marker message is still in the ha queue. + */ + private boolean markerEnqueued = false; - private final Object queuedEventsSync = new Object(); + private boolean isPrimary; - private volatile boolean messageDispatcherInit = false; + /** @since GemFire 5.7 */ + private byte clientConflation = HandShake.CONFLATION_DEFAULT; /** - * A counter that keeps track of how many task iterations that have occurred since the last ping - * or message. The {@linkplain CacheClientNotifier#scheduleClientPingTask ping task} increments - * it. Normal messages sent to the client reset it. If the counter reaches 3, a ping is sent. + * Flag to indicate whether to keep a durable client's queue alive */ - private final AtomicInteger pingCounter = new AtomicInteger(); + private boolean keepalive = false; + private AccessControl postAuthzCallback; + + private Subject subject; + + /** + * For multiuser environment.. + */ + private ClientUserAuths clientUserAuths; + + /** + * The version of the client + */ + private Version clientVersion; /** Date on which this instances was created */ private Date creationDate; @@ -321,52 +311,87 @@ public class CacheClientProxy implements ClientSession { /** * true when the durable client associated with this proxy is being restarted and prevents cqs * from being closed and drained - **/ + */ private boolean drainLocked = false; - private final Object drainLock = new Object(); /** number of cq drains that are currently in progress **/ private int numDrainsInProgress = 0; - private final Object drainsInProgressLock = new Object(); - private SecurityService securityService = SecurityService.getSecurityService(); + static CacheClientProxy createCacheClientProxy(final CacheClientNotifier ccn, + final GemFireCacheImpl cache, + final StatisticsFactory statsFactory, + final SecurityService securityService, + final Socket socket, + final ClientProxyMembershipID proxyID, + final boolean isPrimary, + final byte clientConflation, + final Version clientVersion, + final long acceptorId, + final boolean notifyBySubscription) { + + CacheClientProxy cacheClientProxy = new CacheClientProxy( + ccn, cache, statsFactory, securityService, socket, proxyID, isPrimary, clientConflation, clientVersion, acceptorId, notifyBySubscription); + + // Create the interest list + cacheClientProxy.cils[RegisterInterestTracker.interestListIndex] = + new ClientInterestList(cacheClientProxy, cacheClientProxy.proxyID); + // Create the durable interest list + cacheClientProxy.cils[RegisterInterestTracker.durableInterestListIndex] = + new ClientInterestList(cacheClientProxy, cacheClientProxy.getDurableId()); + + return cacheClientProxy; + } /** * Constructor. * * @param ccn The CacheClientNotifier registering this proxy + * @param cache * @param socket The socket between the server and the client * @param proxyID representing the Connection Proxy of the clien * @param isPrimary The boolean stating whether this prozxy is primary - * @throws CacheException { - */ - protected CacheClientProxy(CacheClientNotifier ccn, Socket socket, - ClientProxyMembershipID proxyID, boolean isPrimary, byte clientConflation, - Version clientVersion, long acceptorId, boolean notifyBySubscription) throws CacheException { + * @param clientConflation + * @param clientVersion + */ + private CacheClientProxy(final CacheClientNotifier ccn, + final GemFireCacheImpl cache, + final StatisticsFactory statsFactory, + final SecurityService securityService, + final Socket socket, + final ClientProxyMembershipID proxyID, + final boolean isPrimary, + final byte clientConflation, + final Version clientVersion, + final long acceptorId, + final boolean notifyBySubscription) + throws CacheException { initializeTransientFields(socket, proxyID, isPrimary, clientConflation, clientVersion); - this._cacheClientNotifier = ccn; - this._cache = (GemFireCacheImpl) ccn.getCache(); + this.cacheClientNotifier = ccn; + this.cache = cache; + this.securityService = securityService; this._maximumMessageCount = ccn.getMaximumMessageCount(); this._messageTimeToLive = ccn.getMessageTimeToLive(); this._acceptorId = acceptorId; this.notifyBySubscription = notifyBySubscription; - StatisticsFactory factory = this._cache.getDistributedSystem(); + this._statistics = - new CacheClientProxyStats(factory, "id_" + this.proxyID.getDistributedMember().getId() - + "_at_" + this._remoteHostAddress + ":" + this._socket.getPort()); + new CacheClientProxyStats(statsFactory, "id_" + this.proxyID.getDistributedMember().getId() + + "_at_" + this._remoteHostAddress + ":" + this.socket.getPort()); - // Create the interest list - this.cils[RegisterInterestTracker.interestListIndex] = - new ClientInterestList(this, this.proxyID); - // Create the durable interest list - this.cils[RegisterInterestTracker.durableInterestListIndex] = - new ClientInterestList(this, this.getDurableId()); this.postAuthzCallback = null; - this._cacheClientNotifier.getAcceptorStats().incCurrentQueueConnections(); + this.cacheClientNotifier.getAcceptorStats().incCurrentQueueConnections(); this.creationDate = new Date(); initializeClientAuths(); } + boolean isClientConflationOn() { + return this.clientConflation == HandShake.CONFLATION_ON; + } + + boolean isClientConflationDefault() { + return this.clientConflation == HandShake.CONFLATION_ON; + } + private void initializeClientAuths() { if (AcceptorImpl.isPostAuthzCallbackPresent()) this.clientUserAuths = ServerConnection.getClientUserAuths(this.proxyID); @@ -411,13 +436,13 @@ public class CacheClientProxy implements ClientSession { private void initializeTransientFields(Socket socket, ClientProxyMembershipID pid, boolean ip, byte cc, Version vers) { - this._socket = socket; + this.socket = socket; this.proxyID = pid; this.connected = true; { int bufSize = 1024; try { - bufSize = _socket.getSendBufferSize(); + bufSize = this.socket.getSendBufferSize(); if (bufSize < 1024) { bufSize = 1024; } @@ -450,7 +475,6 @@ public class CacheClientProxy implements ClientSession { return this.notifyBySubscription; } - /** * Returns the DistributedMember represented by this proxy */ @@ -458,47 +482,6 @@ public class CacheClientProxy implements ClientSession { return this.proxyID; } - // the following code was commented out simply because it was not used - // /** - // * Determines if the proxy represents the client host (and only the host, not - // * necessarily the exact VM running on the host) - // * - // * @return Whether the proxy represents the client host - // */ - // protected boolean representsClientHost(String clientHost) - // { - // // [bruce] TODO BUGBUGBUG: this should compare InetAddresses, not Strings - // return this._remoteHostAddress.equals(clientHost); - // } - - // protected boolean representsClientVM(DistributedMember remoteMember) - // { - // // logger.warn("Is input port " + clientPort + " contained in " + - // // logger.warn("Does input host " + clientHost + " equal " + - // // this._remoteHostAddress+ ": " + representsClientHost(clientHost)); - // // logger.warn("representsClientVM: " + - // // (representsClientHost(clientHost) && containsPort(clientPort))); - // return (proxyID.getDistributedMember().equals(remoteMember)); - // } - - // /** - // * Determines if the CacheClientUpdater proxied by this instance is listening - // * on the input clientHost and clientPort - // * - // * @param clientHost - // * The host name of the client to compare - // * @param clientPort - // * The port number of the client to compare - // * - // * @return Whether the CacheClientUpdater proxied by this instance is - // * listening on the input clientHost and clientPort - // */ - // protected boolean representsCacheClientUpdater(String clientHost, - // int clientPort) - // { - // return (clientPort == this._socket.getPort() && representsClientHost(clientHost)); - // } - protected boolean isMember(ClientProxyMembershipID memberId) { return this.proxyID.equals(memberId); } @@ -522,11 +505,11 @@ public class CacheClientProxy implements ClientSession { * @return the socket between the server and the client */ protected Socket getSocket() { - return this._socket; + return this.socket; } public String getSocketHost() { - return this._socket.getInetAddress().getHostAddress(); + return this.socket.getInetAddress().getHostAddress(); } protected ByteBuffer getCommBuffer() { @@ -548,7 +531,7 @@ public class CacheClientProxy implements ClientSession { * @return the remote host's port */ public int getRemotePort() { - return this._socket.getPort(); + return this.socket.getPort(); } /** @@ -593,7 +576,7 @@ public class CacheClientProxy implements ClientSession { this.isMarkedForRemovalLock.wait(); } catch (InterruptedException e) { interrupted = true; - this._cache.getCancelCriterion().checkCancelInProgress(e); + this.cache.getCancelCriterion().checkCancelInProgress(e); } } // while } finally { @@ -621,7 +604,7 @@ public class CacheClientProxy implements ClientSession { * @return the GemFire cache */ public GemFireCacheImpl getCache() { - return this._cache; + return this.cache; } public Set getInterestRegisteredRegions() { @@ -649,7 +632,7 @@ public class CacheClientProxy implements ClientSession { * @return this proxy's CacheClientNotifier */ protected CacheClientNotifier getCacheClientNotifier() { - return this._cacheClientNotifier; + return this.cacheClientNotifier; } /** @@ -852,8 +835,8 @@ public class CacheClientProxy implements ClientSession { } } } catch (Exception ex) { - if (this._cache.getSecurityLoggerI18n().warningEnabled()) { - this._cache.getSecurityLoggerI18n().warning(LocalizedStrings.TWO_ARG_COLON, + if (this.cache.getSecurityLoggerI18n().warningEnabled()) { + this.cache.getSecurityLoggerI18n().warning(LocalizedStrings.TWO_ARG_COLON, new Object[] {this, ex}); } } @@ -991,9 +974,9 @@ public class CacheClientProxy implements ClientSession { } private void closeSocket() { - if (this._socketClosed.compareAndSet(false, true)) { + if (this.socketClosed.compareAndSet(false, true)) { // Close the socket - this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress, + this.cacheClientNotifier.getSocketCloser().asyncClose(this.socket, this._remoteHostAddress, null); getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections(); } @@ -1008,7 +991,7 @@ public class CacheClientProxy implements ClientSession { { String remoteHostAddress = this._remoteHostAddress; if (remoteHostAddress != null) { - this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress); + this.cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress); this._remoteHostAddress = null; } } @@ -1124,7 +1107,7 @@ public class CacheClientProxy implements ClientSession { InterestResultPolicy policy, boolean isDurable, boolean receiveValues, int interestType) { // Create a client interest message for the keyOfInterest ClientInterestMessageImpl message = new ClientInterestMessageImpl( - new EventID(this._cache.getDistributedSystem()), regionName, keyOfInterest, interestType, + new EventID(this.cache.getDistributedSystem()), regionName, keyOfInterest, interestType, policy.getOrdinal(), isDurable, !receiveValues, ClientInterestMessageImpl.REGISTER); // Notify all secondary proxies of a change in interest @@ -1146,7 +1129,7 @@ public class CacheClientProxy implements ClientSession { String regionName, Object keyOfInterest) { // Get the initial value Get70 request = (Get70) Get70.getCommand(); - LocalRegion lr = (LocalRegion) this._cache.getRegion(regionName); + LocalRegion lr = (LocalRegion) this.cache.getRegion(regionName); Get70.Entry entry = request.getValueAndIsObject(lr, keyOfInterest, null, null); boolean isObject = entry.isObject; byte[] value = null; @@ -1170,7 +1153,7 @@ public class CacheClientProxy implements ClientSession { EventID eventId = null; if (clientInterestMessage == null) { // If the clientInterestMessage is null, create a new event id - eventId = new EventID(this._cache.getDistributedSystem()); + eventId = new EventID(this.cache.getDistributedSystem()); } else { // If the clientInterestMessage is not null, base the event id off its event id to fix // GEM-794. @@ -1239,7 +1222,7 @@ public class CacheClientProxy implements ClientSession { boolean isDurable, boolean receiveValues, int interestType) { // Notify all secondary proxies of a change in interest ClientInterestMessageImpl message = new ClientInterestMessageImpl( - new EventID(this._cache.getDistributedSystem()), regionName, keyOfInterest, interestType, + new EventID(this.cache.getDistributedSystem()), regionName, keyOfInterest, interestType, (byte) 0, isDurable, !receiveValues, ClientInterestMessageImpl.UNREGISTER); notifySecondariesOfInterestChange(message); @@ -1269,17 +1252,9 @@ public class CacheClientProxy implements ClientSession { .append("->").append(InterestType.getString(message.getInterestType())); logger.debug(buffer.toString()); } - this._cacheClientNotifier.deliverInterestChange(this.proxyID, message); + this.cacheClientNotifier.deliverInterestChange(this.proxyID, message); } - /* - * protected void addFilterRegisteredClients(String regionName, Object keyOfInterest) { try { - * this._cacheClientNotifier.addFilterRegisteredClients(regionName, this.proxyID); } catch - * (RegionDestroyedException e) { - * logger.warn(LocalizedStrings.CacheClientProxy_0_INTEREST_REG_FOR_0_FAILED, regionName + "->" + - * keyOfInterest, e); } } - */ - /** * Registers interest in the input region name and key * @@ -1293,7 +1268,7 @@ public class CacheClientProxy implements ClientSession { cil.registerClientInterest(regionName, keyOfInterest, interestType, sendUpdatesAsInvalidates); if (flushState) { flushForInterestRegistration(regionName, - this._cache.getDistributedSystem().getDistributedMember()); + this.cache.getDistributedSystem().getDistributedMember()); } HARegionQueue queue = getHARegionQueue(); if (queue != null) { // queue is null during initialization @@ -1306,7 +1281,7 @@ public class CacheClientProxy implements ClientSession { * interest. During queue creation it is the queue's image provider. */ public void flushForInterestRegistration(String regionName, DistributedMember target) { - Region r = this._cache.getRegion(regionName); + Region r = this.cache.getRegion(regionName); if (r == null) { if (logger.isDebugEnabled()) { logger.debug("Unable to find region '{}' to flush for interest registration", regionName); @@ -1320,7 +1295,7 @@ public class CacheClientProxy implements ClientSession { if (r instanceof PartitionedRegion) { // need to flush all buckets. SFO should be changed to target buckets // belonging to a particular PR, but it doesn't have that option right now - sfo = new StateFlushOperation(this._cache.getDistributedSystem().getDistributionManager()); + sfo = new StateFlushOperation(this.cache.getDistributedSystem().getDistributionManager()); } else { sfo = new StateFlushOperation((DistributedRegion) r); } @@ -1378,7 +1353,7 @@ public class CacheClientProxy implements ClientSession { if (getHARegionQueue() != null) { if (flushState) { flushForInterestRegistration(regionName, - this._cache.getDistributedSystem().getDistributedMember()); + this.cache.getDistributedSystem().getDistributedMember()); } getHARegionQueue().setHasRegisteredInterest(true); } @@ -1643,7 +1618,7 @@ public class CacheClientProxy implements ClientSession { if (logger.isDebugEnabled()) { logger.debug("About to send message directly to {}", this); } - if (this._messageDispatcher != null && this._socket != null && !this._socket.isClosed()) { + if (this._messageDispatcher != null && this.socket != null && !this.socket.isClosed()) { // If the socket is open, send the message to it this._messageDispatcher.sendMessageDirectly(message); if (logger.isDebugEnabled()) { @@ -1759,7 +1734,7 @@ public class CacheClientProxy implements ClientSession { if (this.isPrimary) { // Add the marker to the queue if (!processedMarker) { - EventID eventId = new EventID(this._cache.getDistributedSystem()); + EventID eventId = new EventID(this.cache.getDistributedSystem()); this._messageDispatcher.enqueueMarker(new ClientMarkerMessageImpl(eventId)); } @@ -1811,13 +1786,8 @@ public class CacheClientProxy implements ClientSession { public String toString() { StringBuffer buffer = new StringBuffer(); buffer.append("CacheClientProxy[") - // .append("client proxy id=") .append(this.proxyID) - // .append("; client host name=") - // .append(this._socket.getInetAddress().getCanonicalHostName()) - // .append("; client host address=") - // .append(this._remoteHostAddress) - .append("; port=").append(this._socket.getPort()).append("; primary=").append(isPrimary) + .append("; port=").append(this.socket.getPort()).append("; primary=").append(isPrimary) .append("; version=").append(clientVersion).append("]"); return buffer.toString(); } @@ -1825,13 +1795,8 @@ public class CacheClientProxy implements ClientSession { public String getState() { StringBuffer buffer = new StringBuffer(); buffer.append("CacheClientProxy[") - // .append("client proxy id=") .append(this.proxyID) - // .append("; client host name=") - // .append(this._socket.getInetAddress().getCanonicalHostName()) - // .append("; client host address=") - // .append(this._remoteHostAddress) - .append("; port=").append(this._socket.getPort()).append("; primary=").append(isPrimary) + .append("; port=").append(this.socket.getPort()).append("; primary=").append(isPrimary) .append("; version=").append(clientVersion).append("; paused=").append(isPaused()) .append("; alive=").append(isAlive()).append("; connected=").append(isConnected()) .append("; isMarkedForRemoval=").append(isMarkedForRemoval).append("]"); @@ -1844,15 +1809,7 @@ public class CacheClientProxy implements ClientSession { } public boolean isPrimary() { - // boolean primary = this._messageDispatcher.isAlive() - // || this._messageDispatcher._messageQueue.isPrimary(); - boolean primary = this.isPrimary; - // System.out.println(this + ": DISPATCHER IS ALIVE: " + this._messageDispatcher.isAlive()); - // System.out.println(this + ": DISPATCHER QUEUE IS PRIMARY: " + - // this._messageDispatcher._messageQueue.isPrimary()); - // System.out.println(this + ": IS PRIMARY: " + primary); - return primary; - // return this.isPrimary ; + return this.isPrimary; } protected boolean basicIsPrimary() { @@ -1863,13 +1820,7 @@ public class CacheClientProxy implements ClientSession { this.isPrimary = isPrimary; } - // private static int nextId = 0; - // static protected int getNextId() { - // synchronized (CacheClientProxy.class) { - // return ++nextId; - // } - // } - /* + /** * Return this client's HA region queue * * @returns - HARegionQueue of the client @@ -1881,7 +1832,6 @@ public class CacheClientProxy implements ClientSession { return null; } - /** * Reinitialize a durable CacheClientProxy with a new client. * @@ -1952,7 +1902,7 @@ public class CacheClientProxy implements ClientSession { // Close the proxy terminateDispatching(false); - _cacheClientNotifier._statistics.incQueueDroppedCount(); + cacheClientNotifier._statistics.incQueueDroppedCount(); /** * Setting the expiration task to null again and cancelling existing one, if any. See @@ -1976,7 +1926,7 @@ public class CacheClientProxy implements ClientSession { }; if (this._durableExpirationTask.compareAndSet(null, task)) { - _cache.getCCPTimer().schedule(task, getDurableTimeout() * 1000L); + cache.getCCPTimer().schedule(task, getDurableTimeout() * 1000L); } } @@ -1992,11 +1942,131 @@ public class CacheClientProxy implements ClientSession { } } + public static void setTestHook(TestHook value) { + testHook = value; + } + + public static void unsetTestHook() { + testHook = null; + } + + public static TestHook getTestHook() { + return testHook; + } + + static void setSlowStartForTesting() { + isSlowStartForTesting = true; + } + + static void unsetSlowStartForTesting() { + isSlowStartForTesting = false; + } + + static void setAfterMessageCreationForTesting() { + afterMessageCreationForTesting = true; + } + + static void unsetAfterMessageCreationForTesting() { + afterMessageCreationForTesting = false; + } + + Socket getSocketForTesting() { + return this.socket; + } + + ClientInterestList[] getClientInterestListForTesting() { + return this.cils; + } + + MessageDispatcher getMessageDispatcherForTesting() { + return this._messageDispatcher; + } + + /** + * Returns the current number of CQS the client installed. + * + * @return int the current count of CQs for this client + */ + public int getCqCount() { + synchronized (this) { + return this._statistics.getCqCount(); + } + } + + /** + * Increment the number of CQs the client installed + * + */ + public void incCqCount() { + synchronized (this) { + this._statistics.incCqCount(); + } + } + + /** + * Decrement the number of CQs the client installed + * + */ + public synchronized void decCqCount() { + synchronized (this) { + this._statistics.decCqCount(); + } + } + + /** + * Returns true if the client has one CQ + * + * @return true if the client has one CQ + */ + public boolean hasOneCq() { + synchronized (this) { + return this._statistics.getCqCount() == 1; + } + } + + /** + * Returns true if the client has no CQs + * + * @return true if the client has no CQs + */ + public boolean hasNoCq() { + synchronized (this) { + return this._statistics.getCqCount() == 0; + } + } + + /** + * Get map of regions with empty data policy + * + * @since GemFire 6.1 + */ + public Map getRegionsWithEmptyDataPolicy() { + return regionsWithEmptyDataPolicy; + } + + public int incrementAndGetPingCounter() { + int pingCount = this.pingCounter.incrementAndGet(); + return pingCount; + } + + public void resetPingCounter() { + this.pingCounter.set(0); + } + + /** + * Returns the number of seconds that have elapsed since the Client proxy created. + * + * @since GemFire 7.0 + */ + public long getUpTime() { + return (System.currentTimeMillis() - this.creationDate.getTime()) / 1000; + } + /** * Class ClientInterestList provides a convenient interface for manipulating client * interest information. */ - static protected class ClientInterestList { + static class ClientInterestList { final CacheClientProxy ccp; @@ -2031,7 +2101,7 @@ public class CacheClientProxy implements ClientSession { } Set keysRegistered = null; synchronized (this.interestListLock) { - LocalRegion r = (LocalRegion) this.ccp._cache.getRegion(regionName, true); + LocalRegion r = (LocalRegion) this.ccp.cache.getRegion(regionName, true); if (r == null) { throw new RegionDestroyedException("Region could not be found for interest registration", regionName); @@ -2055,7 +2125,7 @@ public class CacheClientProxy implements ClientSession { protected FilterProfile getProfile(String regionName) { try { - return this.ccp._cache.getFilterProfile(regionName); + return this.ccp.cache.getFilterProfile(regionName); } catch (CancelException e) { return null; } @@ -2221,7 +2291,6 @@ public class CacheClientProxy implements ClientSession { } } - /** * Class MessageDispatcher is a Thread that processes messages bound for * the client by taking messsages from the message queue and sending them to the client over the @@ -2234,34 +2303,17 @@ public class CacheClientProxy implements ClientSession { */ protected final HARegionQueue _messageQueue; - // /** - // * An int used to keep track of the number of messages dropped for logging - // * purposes. If greater than zero then a warning has been logged about - // * messages being dropped. - // */ - // private int _numberOfMessagesDropped = 0; - /** * The proxy for which this dispatcher is processing messages */ private final CacheClientProxy _proxy; - // /** - // * The conflator faciliates message conflation - // */ - // protected BridgeEventConflator _eventConflator; - /** * Whether the dispatcher is stopped */ private volatile boolean _isStopped = true; /** - * guarded.By _pausedLock - */ - // boolean _isPausedDispatcher = false; - - /** * A lock object used to control pausing this dispatcher */ protected final Object _pausedLock = new Object(); @@ -2274,11 +2326,6 @@ public class CacheClientProxy implements ClientSession { private final ReadWriteLock socketLock = new ReentrantReadWriteLock(); private final Lock socketWriteLock = socketLock.writeLock(); - // /** - // * A boolean verifying whether a warning has already been issued if the - // * message queue has reached its capacity. - // */ - // private boolean _messageQueueCapacityReachedWarning = false; /** * Constructor. @@ -2303,7 +2350,7 @@ public class CacheClientProxy implements ClientSession { HARegionQueueAttributes harq = new HARegionQueueAttributes(); harq.setBlockingQueueCapacity(proxy._maximumMessageCount); harq.setExpiryTime(proxy._messageTimeToLive); - ((HAContainerWrapper) proxy._cacheClientNotifier.getHaContainer()) + ((HAContainerWrapper) proxy.cacheClientNotifier.getHaContainer()) .putProxy(HARegionQueue.createRegionName(getProxy().getHARegionName()), getProxy()); boolean createDurableQueue = proxy.proxyID.isDurable(); boolean canHandleDelta = (proxy.clientVersion.compareTo(Version.GFE_61) >= 0) @@ -2314,7 +2361,7 @@ public class CacheClientProxy implements ClientSession { } this._messageQueue = HARegionQueue.getHARegionQueueInstance(getProxy().getHARegionName(), getCache(), harq, HARegionQueue.BLOCKING_HA_QUEUE, createDurableQueue, - proxy._cacheClientNotifier.getHaContainer(), proxy.getProxyID(), + proxy.cacheClientNotifier.getHaContainer(), proxy.getProxyID(), this._proxy.clientConflation, this._proxy.isPrimary(), canHandleDelta); // Check if interests were registered during HARegion GII. if (this._proxy.hasRegisteredInterested()) { @@ -2405,10 +2452,6 @@ public class CacheClientProxy implements ClientSession { Thread.sleep(500); } catch (InterruptedException e) { interrupted = true; - /* - * GemFireCache c = (GemFireCache)_cache; - * c.getDistributedSystem().getCancelCriterion().checkCancelInProgress(e); - */ } catch (CancelException e) { break; } catch (CacheException e) { @@ -2503,7 +2546,7 @@ public class CacheClientProxy implements ClientSession { ClientMessage clientMessage = null; while (!isStopped()) { // SystemFailure.checkFailure(); DM's stopper does this - if (this._proxy._cache.getCancelCriterion().isCancelInProgress()) { + if (this._proxy.cache.getCancelCriterion().isCancelInProgress()) { break; } try { @@ -2752,9 +2795,6 @@ public class CacheClientProxy implements ClientSession { } Message message = null; - // byte[] latestValue = - // this._eventConflator.getLatestValue(clientMessage); - if (clientMessage instanceof ClientUpdateMessage) { byte[] latestValue = (byte[]) ((ClientUpdateMessage) clientMessage).getValue(); if (logger.isTraceEnabled()) { @@ -2771,7 +2811,7 @@ public class CacheClientProxy implements ClientSession { message = ((ClientUpdateMessageImpl) clientMessage).getMessage(getProxy(), latestValue); - if (AFTER_MESSAGE_CREATION_FLAG) { + if (afterMessageCreationForTesting) { ClientServerObserver bo = ClientServerObserverHolder.getInstance(); bo.afterMessageCreation(message); } @@ -2779,37 +2819,9 @@ public class CacheClientProxy implements ClientSession { message = clientMessage.getMessage(getProxy(), true /* notify */); } - // ////////////////////////////// - // TEST CODE BEGIN (Throws exception to test closing proxy) - // if (true) throw new IOException("test"); - // TEST CODE END - // ////////////////////////////// - // Message message = ((ClientUpdateMessageImpl)clientMessage).getMessage(getProxy().proxyID, - // latestValue); - // Message message = clientMessage.getMessage(); removed during merge. - // BugFix for BUG#38206 and BUG#37791 if (!this._proxy.isPaused()) { sendMessage(message); - // ////////////////////////////// - // TEST CODE BEGIN (Throws exception to test closing proxy) - // if (true) throw new IOException("test"); - // TEST CODE END - // ////////////////////////////// - // Message message = ((ClientUpdateMessageImpl)clientMessage).getMessage(getProxy().proxyID, - // latestValue); - // Message message = clientMessage.getMessage(); removed during merge. - // message.setComms(getSocket(), getCommBuffer(), getStatistics()); - // message.send(); - - // ////////////////////////////// - // TEST CODE BEGIN (Introduces random wait in client) - // Sleep a random number of ms - // java.util.Random rand = new java.util.Random(); - // try {Thread.sleep(rand.nextInt(5));} catch (InterruptedException e) {} - // TEST CODE END - // ////////////////////////////// - if (logger.isTraceEnabled()) { logger.trace("{}: Dispatched {}", this, clientMessage); } @@ -2851,7 +2863,7 @@ public class CacheClientProxy implements ClientSession { try { this._messageQueue.put(clientMessage); if (this._proxy.isPaused() && this._proxy.isDurable()) { - this._proxy._cacheClientNotifier._statistics.incEventEnqueuedWhileClientAwayCount(); + this._proxy.cacheClientNotifier._statistics.incEventEnqueuedWhileClientAwayCount(); if (logger.isDebugEnabled()) { logger.debug("{}: Queued message while Durable Client is away {}", this, clientMessage); } @@ -2955,7 +2967,7 @@ public class CacheClientProxy implements ClientSession { this._pausedLock.notifyAll(); } - protected Object deserialize(byte[] serializedBytes) { + private Object deserialize(byte[] serializedBytes) { Object deserializedObject = serializedBytes; // This is a debugging method so ignore all exceptions like // ClassNotFoundException @@ -2979,89 +2991,7 @@ public class CacheClientProxy implements ClientSession { } } - /** - * Returns the current number of CQS the client installed. - * - * @return int the current count of CQs for this client - */ - public int getCqCount() { - synchronized (this) { - return this._statistics.getCqCount(); - } - } - - /** - * Increment the number of CQs the client installed - * - */ - public void incCqCount() { - synchronized (this) { - this._statistics.incCqCount(); - } - } - - /** - * Decrement the number of CQs the client installed - * - */ - public synchronized void decCqCount() { - synchronized (this) { - this._statistics.decCqCount(); - } - } - - /** - * Returns true if the client has one CQ - * - * @return true if the client has one CQ - */ - public boolean hasOneCq() { - synchronized (this) { - return this._statistics.getCqCount() == 1; - } - } - - /** - * Returns true if the client has no CQs - * - * @return true if the client has no CQs - */ - public boolean hasNoCq() { - synchronized (this) { - return this._statistics.getCqCount() == 0; - } - } - - /** - * Get map of regions with empty data policy - * - * @since GemFire 6.1 - */ - public Map getRegionsWithEmptyDataPolicy() { - return regionsWithEmptyDataPolicy; - } - - public int incrementAndGetPingCounter() { - int pingCount = this.pingCounter.incrementAndGet(); - return pingCount; - } - - public void resetPingCounter() { - this.pingCounter.set(0); - } - - /** - * Returns the number of seconds that have elapsed since the Client proxy created. - * - * @since GemFire 7.0 - */ - public long getUpTime() { - return (long) ((System.currentTimeMillis() - this.creationDate.getTime()) / 1000); - } - public interface TestHook { - public void doTestHook(String spot); + void doTestHook(String spot); } - - public static TestHook testHook; } http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java index e21a834..6e8f9ce 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java @@ -22,7 +22,11 @@ import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.SystemTimer.SystemTimerTask; import org.apache.geode.internal.Version; -import org.apache.geode.internal.cache.*; +import org.apache.geode.internal.cache.CacheClientStatus; +import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.IncomingGatewayStatus; +import org.apache.geode.internal.cache.TXId; +import org.apache.geode.internal.cache.TXManagerImpl; import org.apache.geode.internal.cache.tier.Acceptor; import org.apache.geode.internal.concurrent.ConcurrentHashSet; import org.apache.geode.internal.i18n.LocalizedStrings; @@ -32,7 +36,14 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.logging.log4j.Logger; import java.net.InetAddress; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicIntegerArray; /** http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java index 6bbe7b8..7d1603d 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java @@ -352,8 +352,8 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N byte[] serializedValue = null; Message message = null; boolean conflation = false; - conflation = (proxy.clientConflation == HandShake.CONFLATION_ON) - || (proxy.clientConflation == HandShake.CONFLATION_DEFAULT && this.shouldBeConflated()); + conflation = (proxy.isClientConflationOn()) + || (proxy.isClientConflationDefault() && this.shouldBeConflated()); if (latestValue != null) { serializedValue = latestValue; http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java index 6e119c0..d43244a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java @@ -62,6 +62,7 @@ import org.apache.geode.CancelCriterion; import org.apache.geode.DataSerializer; import org.apache.geode.GemFireConfigException; import org.apache.geode.InternalGemFireException; +import org.apache.geode.LogWriter; import org.apache.geode.cache.GatewayConfigurationException; import org.apache.geode.cache.client.PoolFactory; import org.apache.geode.cache.client.ServerRefusedConnectionException; @@ -1669,8 +1670,7 @@ public class HandShake implements ClientHandShake { * not */ public static Object verifyCredentials(String authenticatorMethod, Properties credentials, - Properties securityProperties, InternalLogWriter logWriter, - InternalLogWriter securityLogWriter, DistributedMember member) + Properties securityProperties, LogWriter logWriter, LogWriter securityLogWriter, DistributedMember member) throws AuthenticationRequiredException, AuthenticationFailedException { if (!AcceptorImpl.isAuthenticationRequired()) { @@ -1702,8 +1702,7 @@ public class HandShake implements ClientHandShake { String methodName = this.system.getProperties().getProperty(SECURITY_CLIENT_AUTHENTICATOR); return verifyCredentials(methodName, this.credentials, this.system.getSecurityProperties(), - (InternalLogWriter) this.system.getLogWriter(), - (InternalLogWriter) this.system.getSecurityLogWriter(), this.id.getDistributedMember()); + this.system.getLogWriter(), this.system.getSecurityLogWriter(), this.id.getDistributedMember()); } public void sendCredentialsForWan(OutputStream out, InputStream in) { @@ -1731,8 +1730,7 @@ public class HandShake implements ClientHandShake { String authenticator = this.system.getProperties().getProperty(SECURITY_CLIENT_AUTHENTICATOR); Properties peerWanProps = readCredentials(dis, dos, this.system); verifyCredentials(authenticator, peerWanProps, this.system.getSecurityProperties(), - (InternalLogWriter) this.system.getLogWriter(), - (InternalLogWriter) this.system.getSecurityLogWriter(), member); + this.system.getLogWriter(), this.system.getSecurityLogWriter(), member); } private static int getKeySize(String skAlgo) { http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java b/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java index 1f8a564..34e6020 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java +++ b/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java @@ -152,6 +152,16 @@ public class LogService extends LogManager { } /** + * Returns a Logger with the name of @{link SECURITY_LOGGER_NAME}. + * + * @return The security Logger. + */ + public static Logger getSecurityLogger() { + return new FastLogger( + LogManager.getLogger(SECURITY_LOGGER_NAME, GemFireParameterizedMessageFactory.INSTANCE)); + } + + /** * Returns a LogWriterLogger that is decorated with the LogWriter and LogWriterI18n methods. *

* This is the bridge to LogWriter and LogWriterI18n that we need to eventually stop using in http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java index 7aa11b7..6cb28ee 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java @@ -14,21 +14,14 @@ */ package org.apache.geode.internal.cache.tier.sockets; -import org.apache.geode.cache.AttributesFactory; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.Region; -import org.apache.geode.cache.client.ServerRefusedConnectionException; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.InternalDistributedSystem; -import org.apache.geode.internal.AvailablePort; import org.apache.geode.internal.AvailablePortHelper; -import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.InternalCache; -import org.apache.geode.internal.cache.tier.Acceptor; -import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.test.junit.categories.ClientServerTest; import org.apache.geode.test.junit.categories.IntegrationTest; import org.junit.After; @@ -39,7 +32,6 @@ import org.junit.experimental.categories.Category; import java.io.IOException; import java.net.BindException; -import java.net.Socket; import java.util.Collections; import java.util.Properties; @@ -85,7 +77,7 @@ public class AcceptorImplJUnitTest { CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, AcceptorImpl.MINIMUM_MAX_CONNECTIONS - 1, CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, - null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY); + null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY, this.cache.getCancelCriterion()); fail("Expected an IllegalArgumentExcption due to max conns < min pool size"); } catch (IllegalArgumentException expected) { } @@ -95,7 +87,7 @@ public class AcceptorImplJUnitTest { CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, 0, CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, Collections.EMPTY_LIST, - CacheServer.DEFAULT_TCP_NO_DELAY); + CacheServer.DEFAULT_TCP_NO_DELAY, this.cache.getCancelCriterion()); fail("Expected an IllegalArgumentExcption due to max conns of zero"); } catch (IllegalArgumentException expected) { } @@ -105,12 +97,12 @@ public class AcceptorImplJUnitTest { CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, - null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY); + null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY, this.cache.getCancelCriterion()); a2 = new AcceptorImpl(port1, null, false, CacheServer.DEFAULT_SOCKET_BUFFER_SIZE, CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, - null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY); + null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY, this.cache.getCancelCriterion()); fail("Expecetd a BindException while attaching to the same port"); } catch (BindException expected) { } @@ -119,7 +111,7 @@ public class AcceptorImplJUnitTest { CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, - null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY); + null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY, this.cache.getCancelCriterion()); assertEquals(port2, a3.getPort()); InternalDistributedSystem isystem = (InternalDistributedSystem) this.cache.getDistributedSystem(); http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java new file mode 100644 index 0000000..a61f790 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.internal.cache.tier.sockets; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.cache.wan.GatewayTransportFilter; +import org.apache.geode.distributed.internal.DistributionConfigImpl; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.net.SocketCreator; +import org.apache.geode.internal.net.SocketCreatorFactory; +import org.apache.geode.test.junit.categories.UnitTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +@Category(UnitTest.class) +public class AcceptorImplTest { + + @Before + public void before() throws Exception { + DistributionConfigImpl distributionConfig = new DistributionConfigImpl(new Properties()); + SocketCreatorFactory.setDistributionConfig(distributionConfig); + } + + @After + public void after() throws Exception { + SocketCreatorFactory.close(); + } + + @Test + public void constructWithDefaults() throws Exception { + /* + Problems: + + this.clientNotifier = CacheClientNotifier.getInstance(cache, this.stats, maximumMessageCount, + messageTimeToLive, connectionListener, overflowAttributesList, isGatewayReceiver); + + this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings, + this.clientNotifier.getStats()); + + LoggingThreadGroup / ThreadFactory / ThreadPoolExecutor + + isAuthenticationRequired = this.securityService.isClientSecurityRequired(); + + isIntegratedSecurity = this.securityService.isIntegratedSecurity(); + + + String postAuthzFactoryName = + this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP); + + */ + + int port = 0; + String bindHostName = SocketCreator.getLocalHost().getHostName(); + boolean notifyBySubscription = false; + int socketBufferSize = 1; + int maximumTimeBetweenPings = 0; + InternalCache internalCache = null; + int maxConnections = 0; + int maxThreads = 0; + int maximumMessageCount = 0; + int messageTimeToLive = 0; + ConnectionListener listener = null; + List overflowAttributesList = null; + boolean isGatewayReceiver = false; + List transportFilter = Collections.emptyList(); + boolean tcpNoDelay = false; + CancelCriterion cancelCriterion = null; + + AcceptorImpl acceptor = new AcceptorImpl( + port, + bindHostName, + notifyBySubscription, + socketBufferSize, + maximumTimeBetweenPings, + internalCache, + maxConnections, + maxThreads, + maximumMessageCount, + messageTimeToLive, + listener, + overflowAttributesList, + isGatewayReceiver, + transportFilter, + tcpNoDelay, + cancelCriterion + ); + + assertThat(acceptor).isNotNull(); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java index 31f67aa..f4a8cc8 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java @@ -99,7 +99,7 @@ public class ClientConflationDUnitTest extends JUnit4DistributedTestCase { * */ public static void setIsSlowStart() { - CacheClientProxy.isSlowStartForTesting = true; + CacheClientProxy.setSlowStartForTesting(); System.setProperty("slowStartTimeForTesting", "15000"); } http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java index 1a76daa..efc0367 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java @@ -166,7 +166,7 @@ public class ClientServerForceInvalidateDUnitTest extends JUnit4CacheTestCase { } private static void installObserver() { - CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = true; + CacheClientProxy.setAfterMessageCreationForTesting(); ClientServerObserverHolder.setInstance(new DelaySendingEvent()); } @@ -176,7 +176,7 @@ public class ClientServerForceInvalidateDUnitTest extends JUnit4CacheTestCase { } private static void cleanupObserver() { - CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = false; + CacheClientProxy.unsetAfterMessageCreationForTesting(); ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter()); } http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java index b4f3185..5b340d1 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java @@ -62,13 +62,9 @@ import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; import org.apache.geode.test.dunit.standalone.VersionManager; import org.apache.geode.test.junit.categories.ClientServerTest; import org.apache.geode.test.junit.categories.DistributedTest; -import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import java.util.Collection; import java.util.Iterator; import java.util.Properties; import java.util.Set; @@ -1014,7 +1010,7 @@ public class ClientServerMiscDUnitTest extends JUnit4CacheTestCase { while (iter_prox.hasNext()) { CacheClientProxy ccp = (CacheClientProxy) iter_prox.next(); // CCP should not contain region1 - Set akr = ccp.cils[RegisterInterestTracker.interestListIndex].regions; + Set akr = ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex].regions; assertNotNull(akr); assertTrue(!akr.contains(Region.SEPARATOR + REGION_NAME1)); // CCP should contain region2 @@ -1352,7 +1348,7 @@ public class ClientServerMiscDUnitTest extends JUnit4CacheTestCase { * */ public static void unsetSlowDispatcherFlag() { - CacheClientProxy.isSlowStartForTesting = false; + CacheClientProxy.unsetSlowStartForTesting(); } } http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java index b1e16ee..275e458 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java @@ -112,7 +112,7 @@ public class ConflationDUnitTest extends JUnit4DistributedTestCase { * */ public static void setIsSlowStart(String milis) { - CacheClientProxy.isSlowStartForTesting = true; + CacheClientProxy.setSlowStartForTesting(); System.setProperty("slowStartTimeForTesting", milis); } @@ -121,7 +121,7 @@ public class ConflationDUnitTest extends JUnit4DistributedTestCase { * */ public static void unsetIsSlowStart() { - CacheClientProxy.isSlowStartForTesting = false; + CacheClientProxy.unsetSlowStartForTesting(); } /** http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java index 544f732..07e2220 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java @@ -459,7 +459,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase { iter_prox = ccn.getClientProxies().iterator(); if (iter_prox.hasNext()) { proxy = (CacheClientProxy) iter_prox.next(); - return proxy._messageDispatcher.isAlive(); + return proxy.getMessageDispatcherForTesting().isAlive(); } else { return false; } @@ -510,7 +510,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase { if (iter_prox.hasNext()) { CacheClientProxy proxy = (CacheClientProxy) iter_prox.next(); assertFalse("Dispatcher on secondary should not be alive", - proxy._messageDispatcher.isAlive()); + proxy.getMessageDispatcherForTesting().isAlive()); } } @@ -818,7 +818,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase { wc = new WaitCriterion() { @Override public boolean done() { - Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex] + Set keysMap = (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex] .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID()); return keysMap != null && keysMap.size() == 2; } @@ -830,7 +830,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase { }; Wait.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); - Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex] + Set keysMap = (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex] .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID()); assertNotNull(keysMap); assertEquals(2, keysMap.size()); @@ -879,7 +879,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase { wc = new WaitCriterion() { @Override public boolean done() { - Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex] + Set keysMap = (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex] .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID()); return keysMap != null; } @@ -891,7 +891,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase { }; Wait.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true); - Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex] + Set keysMap = (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex] .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID()); assertNotNull(keysMap); assertEquals(1, keysMap.size()); http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java index 6aea509..3585c3e 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java @@ -471,7 +471,7 @@ public class HAStartupAndFailoverDUnitTest extends JUnit4DistributedTestCase { String excuse; public boolean done() { - return proxy._messageDispatcher.isAlive(); + return proxy.getMessageDispatcherForTesting().isAlive(); } public String description() { @@ -529,7 +529,7 @@ public class HAStartupAndFailoverDUnitTest extends JUnit4DistributedTestCase { if (iter_prox.hasNext()) { CacheClientProxy proxy = (CacheClientProxy) iter_prox.next(); assertFalse("Dispatcher on secondary should not be alive", - proxy._messageDispatcher.isAlive()); + proxy.getMessageDispatcherForTesting().isAlive()); } } catch (Exception ex) { fail("while setting verifyDispatcherIsNotAlive " + ex); http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java index 041cd38..e9982b2 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java @@ -434,7 +434,7 @@ public class InterestListRecoveryDUnitTest extends JUnit4DistributedTestCase { public static Set getKeysOfInterestMap(CacheClientProxy proxy, String regionName) { // assertNotNull(proxy.cils[RegisterInterestTracker.interestListIndex]); // assertNotNull(proxy.cils[RegisterInterestTracker.interestListIndex]._keysOfInterest); - return proxy.cils[RegisterInterestTracker.interestListIndex].getProfile(regionName) + return proxy.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex].getProfile(regionName) .getKeysOfInterestFor(proxy.getProxyID()); } http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java index 4a98298..826bba9 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java @@ -189,10 +189,10 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase { String excuse; public boolean done() { - if (proxy._messageDispatcher == null) { + if (proxy.getMessageDispatcherForTesting() == null) { return false; } - return proxy._messageDispatcher.isAlive(); + return proxy.getMessageDispatcherForTesting().isAlive(); } public String description() { @@ -245,7 +245,7 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase { if (iter_prox.hasNext()) { CacheClientProxy proxy = (CacheClientProxy) iter_prox.next(); assertFalse("Dispatcher on secondary should not be alive", - proxy._messageDispatcher.isAlive()); + proxy.getMessageDispatcherForTesting().isAlive()); } } catch (Exception ex) { @@ -427,7 +427,7 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase { String excuse; public boolean done() { - Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex] + Set keysMap = (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex] .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID()); if (keysMap == null) { excuse = "keys of interest is null"; @@ -446,7 +446,7 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase { }; Wait.waitForCriterion(wc, 180 * 1000, 2 * 1000, true); - Set keysMap = ccp.cils[RegisterInterestTracker.interestListIndex] + Set keysMap = ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex] .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID()); assertTrue(keysMap.contains(k1)); assertTrue(keysMap.contains(k2)); http://git-wip-us.apache.org/repos/asf/geode/blob/135d674a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65BenchTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65BenchTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65BenchTest.java new file mode 100644 index 0000000..485ee5e --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/Put65BenchTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.tier.sockets.command; + +import static org.apache.geode.SystemFailure.loadEmergencyClasses; +import static org.apache.geode.internal.cache.TXManagerImpl.NOTX; +import static org.mockito.Mockito.*; + +import org.apache.geode.cache.Operation; +import org.apache.geode.internal.Version; +import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.TXManagerImpl; +import org.apache.geode.internal.cache.tier.Command; +import org.apache.geode.internal.cache.tier.sockets.CacheServerStats; +import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; +import org.apache.geode.internal.cache.tier.sockets.Message; +import org.apache.geode.internal.cache.tier.sockets.Part; +import org.apache.geode.internal.cache.tier.sockets.ServerConnection; +import org.apache.geode.test.junit.categories.UnitTest; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(UnitTest.class) +public class Put65BenchTest { + + public Command put65Command; + public ServerConnection mockServerConnection; + public Message mockMessage; + + @Before + public void setup() throws Exception { + loadEmergencyClasses(); + + this.put65Command = Put65.getCommand(); + + this.mockServerConnection = mock(ServerConnection.class, + withSettings().name("mockServerConnection")); + when(this.mockServerConnection.getClientVersion()).thenReturn(Version.CURRENT); + + GemFireCacheImpl mockCache = mock(GemFireCacheImpl.class, withSettings().name("mockCache")); + when(this.mockServerConnection.getCache()).thenReturn(mockCache); + + TXManagerImpl mockTxManager = mock(TXManagerImpl.class, withSettings().name("mockTxManager")); + when(mockCache.getTxManager()).thenReturn(mockTxManager); + + CacheServerStats mockCacheServerStats = + mock(CacheServerStats.class, withSettings().name("mockCacheServerStats")); + when(this.mockServerConnection.getCacheServerStats()).thenReturn(mockCacheServerStats); + + ClientProxyMembershipID mockProxyId = + mock(ClientProxyMembershipID.class, withSettings().name("mockProxyId")); + when(this.mockServerConnection.getProxyID()).thenReturn(mockProxyId); + + Message mockErrorResponseMessage = + mock(Message.class, withSettings().name("mockErrorResponseMessage")); + when(this.mockServerConnection.getErrorResponseMessage()).thenReturn(mockErrorResponseMessage); + + Part mockRegionNamePart = mock(Part.class, withSettings().name("mockRegionNamePart")); + when(mockRegionNamePart.getString()).thenReturn("mockRegionNamePart"); + + Part mockOperationPart = mock(Part.class, withSettings().name("mockOperationPart")); + when(mockOperationPart.getObject()).thenReturn(Operation.UPDATE); + + Part mockFlagsPart = mock(Part.class, withSettings().name("mockFlagsPart")); + when(mockFlagsPart.getInt()).thenReturn(0); + + Part mockKeyPart = mock(Part.class, withSettings().name("mockKeyPart")); + when(mockKeyPart.getObject()).thenReturn("mockKeyPart"); + when(mockKeyPart.getStringOrObject()).thenReturn("mockKeyPart"); + + Part mockIsDeltaPart = mock(Part.class, withSettings().name("mockIsDeltaPart")); + when(mockIsDeltaPart.getObject()).thenReturn(Boolean.FALSE); + + Part mockValuePart = mock(Part.class, withSettings().name("mockValuePart")); + when(mockValuePart.getObject()).thenReturn("mockValuePart"); + + Part mockEventPart = mock(Part.class, withSettings().name("mockEventPart")); + when(mockEventPart.getObject()).thenReturn("mockEventPart"); + + Part mockCallbackArgPart = mock(Part.class, withSettings().name("mockCallbackArgPart")); + when(mockCallbackArgPart.getObject()).thenReturn("mockCallbackArgPart"); + + this.mockMessage = mock(Message.class, withSettings().name("mockMessage")); + + when(this.mockMessage.getTransactionId()).thenReturn(NOTX); + + when(this.mockMessage.getPart(0)).thenReturn(mockRegionNamePart); + when(this.mockMessage.getPart(1)).thenReturn(mockOperationPart); + when(this.mockMessage.getPart(2)).thenReturn(mockFlagsPart); + when(this.mockMessage.getPart(3)).thenReturn(mockKeyPart); + when(this.mockMessage.getPart(4)).thenReturn(mockIsDeltaPart); + when(this.mockMessage.getPart(5)).thenReturn(mockValuePart); + when(this.mockMessage.getPart(6)).thenReturn(mockEventPart); + when(this.mockMessage.getPart(7)).thenReturn(mockCallbackArgPart); + } + + @Test + public void benchmark() { + this.put65Command.execute(this.mockMessage, this.mockServerConnection); + // Message replyMessage = state.mockServerConnection.getReplyMessage(); + // blackhole.consume(replyMessage); + } +}