Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D8804200CA8 for ; Thu, 1 Jun 2017 01:12:52 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D6FAE160BCB; Wed, 31 May 2017 23:12:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EACB7160BE3 for ; Thu, 1 Jun 2017 01:12:50 +0200 (CEST) Received: (qmail 78641 invoked by uid 500); 31 May 2017 23:12:50 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 78533 invoked by uid 99); 31 May 2017 23:12:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 May 2017 23:12:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DCC7EDFC8B; Wed, 31 May 2017 23:12:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: klund@apache.org To: commits@geode.apache.org Date: Wed, 31 May 2017 23:12:53 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/14] geode git commit: GEODE-2632: refactoring preparations for SecurityService and BaseCommand changes archived-at: Wed, 31 May 2017 23:12:53 -0000 http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Request.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Request.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Request.java index f7baba4..6f97d31 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Request.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Request.java @@ -52,15 +52,16 @@ public class Request extends BaseCommand { Request() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) + throws IOException { Part regionNamePart = null, keyPart = null, valuePart = null; String regionName = null; Object callbackArg = null, key = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - CacheServerStats stats = servConn.getCacheServerStats(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + CacheServerStats stats = serverConnection.getCacheServerStats(); StringId errMessage = null; - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); // requiresResponse = true; { long oldStart = start; @@ -68,18 +69,18 @@ public class Request extends BaseCommand { stats.incReadGetRequestTime(start - oldStart); } // Retrieve the data from the message parts - int parts = msg.getNumberOfParts(); - regionNamePart = msg.getPart(0); - keyPart = msg.getPart(1); + int parts = clientMessage.getNumberOfParts(); + regionNamePart = clientMessage.getPart(0); + keyPart = clientMessage.getPart(1); // valuePart = null; (redundant assignment) if (parts > 2) { - valuePart = msg.getPart(2); + valuePart = clientMessage.getPart(2); try { callbackArg = valuePart.getObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); + writeException(clientMessage, e, false, serverConnection); // responded = true; - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -87,15 +88,15 @@ public class Request extends BaseCommand { try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); + writeException(clientMessage, e, false, serverConnection); // responded = true; - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { logger.debug("{}: Received get request ({} bytes) from {} for region {} key {} txId {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key, - msg.getTransactionId()); + serverConnection.getName(), clientMessage.getPayloadLength(), + serverConnection.getSocketString(), regionName, key, clientMessage.getTransactionId()); } // Process the get request @@ -109,31 +110,31 @@ public class Request extends BaseCommand { errMessage = LocalizedStrings.Request_THE_INPUT_REGION_NAME_FOR_THE_GET_REQUEST_IS_NULL; } String s = errMessage.toLocalizedString(); - logger.warn("{}: {}", servConn.getName(), s); - writeErrorResponse(msg, MessageType.REQUESTDATAERROR, s, servConn); + logger.warn("{}: {}", serverConnection.getName(), s); + writeErrorResponse(clientMessage, MessageType.REQUESTDATAERROR, s, serverConnection); // responded = true; - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } else { - Region region = servConn.getCache().getRegion(regionName); + Region region = serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = LocalizedStrings.Request__0_WAS_NOT_FOUND_DURING_GET_REQUEST .toLocalizedString(regionName); - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); } else { GetOperationContext getContext = null; try { this.securityService.authorizeRegionRead(regionName, key.toString()); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { getContext = authzRequest.getAuthorize(regionName, key, callbackArg); callbackArg = getContext.getCallbackArg(); } } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -141,10 +142,10 @@ public class Request extends BaseCommand { // the value if it is a byte[]. Object[] valueAndIsObject = new Object[3]; try { - getValueAndIsObject(region, key, callbackArg, servConn, valueAndIsObject); + getValueAndIsObject(region, key, callbackArg, serverConnection, valueAndIsObject); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -154,7 +155,7 @@ public class Request extends BaseCommand { try { - AuthorizeRequestPP postAuthzRequest = servConn.getPostAuthzRequest(); + AuthorizeRequestPP postAuthzRequest = serverConnection.getPostAuthzRequest(); if (postAuthzRequest != null) { getContext = postAuthzRequest.getAuthorize(regionName, key, data, isObject, getContext); byte[] serializedValue = getContext.getSerializedValue(); @@ -166,8 +167,8 @@ public class Request extends BaseCommand { isObject = getContext.isObject(); } } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } { @@ -179,20 +180,21 @@ public class Request extends BaseCommand { if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion) region; if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { - writeResponseWithRefreshMetadata(data, callbackArg, msg, isObject, servConn, pr, - pr.getNetworkHopType()); + writeResponseWithRefreshMetadata(data, callbackArg, clientMessage, isObject, + serverConnection, pr, pr.getNetworkHopType()); pr.clearNetworkHopData(); } else { - writeResponse(data, callbackArg, msg, isObject, servConn); + writeResponse(data, callbackArg, clientMessage, isObject, serverConnection); } } else { - writeResponse(data, callbackArg, msg, isObject, servConn); + writeResponse(data, callbackArg, clientMessage, isObject, serverConnection); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { logger.debug("{}: Wrote get response back to {} for region {} key {} value: {}", - servConn.getName(), servConn.getSocketString(), regionName, key, data); + serverConnection.getName(), serverConnection.getSocketString(), regionName, key, + data); } stats.incWriteGetResponseTime(DistributionStats.getStatTime() - start); } http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java index 3fd84d6..a6d6578 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java @@ -49,57 +49,60 @@ public class RequestEventValue extends BaseCommand { private RequestEventValue() {} - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) + throws IOException { Part eventIDPart = null, valuePart = null; EventID event = null; Object callbackArg = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); StringBuffer errMessage = new StringBuffer(); - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); // Retrieve the data from the message parts - int parts = msg.getNumberOfParts(); - eventIDPart = msg.getPart(0); + int parts = clientMessage.getNumberOfParts(); + eventIDPart = clientMessage.getPart(0); if (eventIDPart == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.RequestEventValue_0_THE_EVENT_ID_FOR_THE_GET_EVENT_VALUE_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); errMessage.append(" The event id for the get event value request is null."); - writeErrorResponse(msg, MessageType.REQUESTDATAERROR, errMessage.toString(), servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.REQUESTDATAERROR, errMessage.toString(), + serverConnection); + serverConnection.setAsTrue(RESPONDED); } else { try { event = (EventID) eventIDPart.getObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } if (parts > 1) { - valuePart = msg.getPart(1); + valuePart = clientMessage.getPart(1); try { if (valuePart != null) { callbackArg = valuePart.getObject(); } } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } if (logger.isTraceEnabled()) { - logger.trace("{}: Received get event value request ({} bytes) from {}", servConn.getName(), - msg.getPayloadLength(), servConn.getSocketString()); + logger.trace("{}: Received get event value request ({} bytes) from {}", + serverConnection.getName(), clientMessage.getPayloadLength(), + serverConnection.getSocketString()); } - CacheClientNotifier ccn = servConn.getAcceptor().getCacheClientNotifier(); + CacheClientNotifier ccn = serverConnection.getAcceptor().getCacheClientNotifier(); // Get the ha container. HAContainerWrapper haContainer = (HAContainerWrapper) ccn.getHaContainer(); if (haContainer == null) { String reason = " was not found during get event value request"; - writeRegionDestroyedEx(msg, "ha container", reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, "ha container", reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); } else { Object[] valueAndIsObject = new Object[2]; try { @@ -110,8 +113,9 @@ public class RequestEventValue extends BaseCommand { LocalizedStrings.RequestEventValue_UNABLE_TO_FIND_A_CLIENT_UPDATE_MESSAGE_FOR_0, event)); String msgStr = "No value found for " + event + " in " + haContainer.getName(); - writeErrorResponse(msg, MessageType.REQUEST_EVENT_VALUE_ERROR, msgStr, servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.REQUEST_EVENT_VALUE_ERROR, msgStr, + serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } else { if (logger.isDebugEnabled()) { @@ -130,20 +134,22 @@ public class RequestEventValue extends BaseCommand { valueAndIsObject[1] = Boolean.valueOf(((ClientUpdateMessageImpl) data).valueIsObject()); } } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } Object data = valueAndIsObject[0]; boolean isObject = (Boolean) valueAndIsObject[1]; - writeResponse(data, callbackArg, msg, isObject, servConn); - servConn.setAsTrue(RESPONDED); - ccn.getClientProxy(servConn.getProxyID()).getStatistics().incDeltaFullMessagesSent(); + writeResponse(data, callbackArg, clientMessage, isObject, serverConnection); + serverConnection.setAsTrue(RESPONDED); + ccn.getClientProxy(serverConnection.getProxyID()).getStatistics() + .incDeltaFullMessagesSent(); if (logger.isDebugEnabled()) { logger.debug("{}: Wrote get event value response back to {} for ha container {}", - servConn.getName(), servConn.getSocketString(), haContainer.getName()); + serverConnection.getName(), serverConnection.getSocketString(), + haContainer.getName()); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java index a579170..cd12ea7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java @@ -39,18 +39,18 @@ public class RollbackCommand extends BaseCommand { private RollbackCommand() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException, InterruptedException { - servConn.setAsTrue(REQUIRES_RESPONSE); - TXManagerImpl txMgr = (TXManagerImpl) servConn.getCache().getCacheTransactionManager(); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + TXManagerImpl txMgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager(); InternalDistributedMember client = - (InternalDistributedMember) servConn.getProxyID().getDistributedMember(); - int uniqId = msg.getTransactionId(); + (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember(); + int uniqId = clientMessage.getTransactionId(); TXId txId = new TXId(client, uniqId); if (txMgr.isHostedTxRecentlyCompleted(txId)) { if (logger.isDebugEnabled()) { logger.debug("TX: found a recently rolled back tx: {}", txId); - sendRollbackReply(msg, servConn); + sendRollbackReply(clientMessage, serverConnection); txMgr.removeHostedTXState(txId); return; } @@ -60,16 +60,16 @@ public class RollbackCommand extends BaseCommand { if (txState != null) { txId = txState.getTxId(); txMgr.rollback(); - sendRollbackReply(msg, servConn); + sendRollbackReply(clientMessage, serverConnection); } else { // could not find TxState in the host server. // Protect against a failover command received so late, // and it is removed from the failoverMap due to capacity. - sendRollbackReply(msg, servConn); + sendRollbackReply(clientMessage, serverConnection); } } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); } finally { if (logger.isDebugEnabled()) { logger.debug("TX: removing tx state for {}", txId); http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java index c78f4d9..c4515ab 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java @@ -56,18 +56,18 @@ public class Size extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { StringBuilder errMessage = new StringBuilder(); - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - CacheServerStats stats = servConn.getCacheServerStats(); - servConn.setAsTrue(REQUIRES_RESPONSE); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + CacheServerStats stats = serverConnection.getCacheServerStats(); + serverConnection.setAsTrue(REQUIRES_RESPONSE); long oldStart = start; start = DistributionStats.getStatTime(); stats.incReadSizeRequestTime(start - oldStart); // Retrieve the data from the message parts - Part regionNamePart = msg.getPart(0); + Part regionNamePart = clientMessage.getPart(0); String regionName = regionNamePart.getString(); if (regionName == null) { @@ -76,8 +76,9 @@ public class Size extends BaseCommand { errMessage .append(LocalizedStrings.BaseCommand__THE_INPUT_REGION_NAME_FOR_THE_0_REQUEST_IS_NULL .toLocalizedString("size")); - writeErrorResponse(msg, MessageType.SIZE_ERROR, errMessage.toString(), servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.SIZE_ERROR, errMessage.toString(), + serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -85,38 +86,39 @@ public class Size extends BaseCommand { if (region == null) { String reason = LocalizedStrings.BaseCommand__0_WAS_NOT_FOUND_DURING_1_REQUEST .toLocalizedString(regionName, "size"); - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // Size the entry try { this.securityService.authorizeRegionRead(regionName); - writeSizeResponse(region.size(), msg, servConn); + writeSizeResponse(region.size(), clientMessage, serverConnection); } catch (RegionDestroyedException rde) { - writeException(msg, rde, false, servConn); + writeException(clientMessage, rde, false, serverConnection); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // If an exception occurs during the destroy, preserve the connection - writeException(msg, e, false, servConn); + writeException(clientMessage, e, false, serverConnection); if (e instanceof GemFireSecurityException) { // Fine logging for security exceptions since these are already // logged by the security logger if (logger.isDebugEnabled()) { - logger.debug("{}: Unexpected Security exception", servConn.getName(), e); + logger.debug("{}: Unexpected Security exception", serverConnection.getName(), e); } } else { logger.warn(LocalizedMessage.create(LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION, - servConn.getName()), e); + serverConnection.getName()), e); } } finally { if (logger.isDebugEnabled()) { - logger.debug("{}: Sent size response for region {}", servConn.getName(), regionName); + logger.debug("{}: Sent size response for region {}", serverConnection.getName(), + regionName); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); stats.incWriteSizeResponseTime(DistributionStats.getStatTime() - start); } } http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java index 72eab50..9fc3fd1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java @@ -49,23 +49,23 @@ public class TXFailoverCommand extends BaseCommand { private TXFailoverCommand() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException, InterruptedException { - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); // Build the TXId for the transaction InternalDistributedMember client = - (InternalDistributedMember) servConn.getProxyID().getDistributedMember(); - int uniqId = msg.getTransactionId(); + (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember(); + int uniqId = clientMessage.getTransactionId(); if (logger.isDebugEnabled()) { logger.debug("TX: Transaction {} from {} is failing over to this server", uniqId, client); } TXId txId = new TXId(client, uniqId); - TXManagerImpl mgr = (TXManagerImpl) servConn.getCache().getCacheTransactionManager(); + TXManagerImpl mgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager(); mgr.waitForCompletingTransaction(txId); // in case it's already completing here in another // thread if (mgr.isHostedTxRecentlyCompleted(txId)) { - writeReply(msg, servConn); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); mgr.removeHostedTXState(txId); return; } @@ -75,7 +75,7 @@ public class TXFailoverCommand extends BaseCommand { if (!tx.isRealDealLocal()) { // send message to all peers to find out who hosts the transaction FindRemoteTXMessageReplyProcessor processor = - FindRemoteTXMessage.send(servConn.getCache(), txId); + FindRemoteTXMessage.send(serverConnection.getCache(), txId); try { processor.waitForRepliesUninterruptibly(); } catch (ReplyException e) { @@ -96,7 +96,7 @@ public class TXFailoverCommand extends BaseCommand { // bug #42228 and bug #43504 - this cannot return until the current view // has been installed by all members, so that dlocks are released and // the same keys can be used in a new transaction by the same client thread - InternalCache cache = servConn.getCache(); + InternalCache cache = serverConnection.getCache(); try { WaitForViewInstallation.send((DistributionManager) cache.getDistributionManager()); } catch (InterruptedException e) { @@ -110,9 +110,9 @@ public class TXFailoverCommand extends BaseCommand { } mgr.saveTXCommitMessageForClientFailover(txId, processor.getTxCommitMessage()); } else { - writeException(msg, new TransactionDataNodeHasDepartedException( - "Could not find transaction host for " + txId), false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, new TransactionDataNodeHasDepartedException( + "Could not find transaction host for " + txId), false, serverConnection); + serverConnection.setAsTrue(RESPONDED); mgr.removeHostedTXState(txId); return; } @@ -121,8 +121,8 @@ public class TXFailoverCommand extends BaseCommand { if (!wasInProgress) { mgr.setInProgress(false); } - writeReply(msg, servConn); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); } } http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java index 8cedd2c..03270d6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java @@ -15,7 +15,6 @@ package org.apache.geode.internal.cache.tier.sockets.command; -import org.apache.geode.cache.SynchronizationCommitConflictException; import org.apache.geode.cache.client.internal.TXSynchronizationOp.CompletionType; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.ReplyException; @@ -54,7 +53,8 @@ public class TXSynchronizationCommand extends BaseCommand { * org.apache.geode.internal.cache.tier.sockets.ServerConnection) */ @Override - protected boolean shouldMasqueradeForTx(Message msg, ServerConnection servConn) { + protected boolean shouldMasqueradeForTx(Message clientMessage, + ServerConnection serverConnection) { // masquerading is done in the waiting thread pool return false; } @@ -68,26 +68,28 @@ public class TXSynchronizationCommand extends BaseCommand { * long) */ @Override - public void cmdExecute(final Message msg, final ServerConnection servConn, long start) - throws IOException, ClassNotFoundException, InterruptedException { + public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection, + long start) throws IOException, ClassNotFoundException, InterruptedException { - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); - CompletionType type = CompletionType.values()[msg.getPart(0).getInt()]; - /* int txIdInt = */ msg.getPart(1).getInt(); // [bruce] not sure if we need to transmit this + CompletionType type = CompletionType.values()[clientMessage.getPart(0).getInt()]; + /* int txIdInt = */ clientMessage.getPart(1).getInt(); // [bruce] not sure if we need to + // transmit this final Part statusPart; if (type == CompletionType.AFTER_COMPLETION) { - statusPart = msg.getPart(2); + statusPart = clientMessage.getPart(2); } else { statusPart = null; } - final TXManagerImpl txMgr = (TXManagerImpl) servConn.getCache().getCacheTransactionManager(); + final TXManagerImpl txMgr = + (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager(); final InternalDistributedMember member = - (InternalDistributedMember) servConn.getProxyID().getDistributedMember(); + (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember(); // get the tx state without associating it with this thread. That's done later - final TXStateProxy txProxy = txMgr.masqueradeAs(msg, member, true); + final TXStateProxy txProxy = txMgr.masqueradeAs(clientMessage, member, true); // we have to run beforeCompletion and afterCompletion in the same thread // because beforeCompletion obtains locks for the thread and afterCompletion @@ -102,21 +104,21 @@ public class TXSynchronizationCommand extends BaseCommand { TXStateProxy txState = null; Throwable failureException = null; try { - txState = txMgr.masqueradeAs(msg, member, false); + txState = txMgr.masqueradeAs(clientMessage, member, false); if (isDebugEnabled) { logger.debug("Executing beforeCompletion() notification for transaction {}", - msg.getTransactionId()); + clientMessage.getTransactionId()); } txState.setIsJTA(true); txState.beforeCompletion(); try { - writeReply(msg, servConn); + writeReply(clientMessage, serverConnection); } catch (IOException e) { if (isDebugEnabled) { logger.debug("Problem writing reply to client", e); } } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } catch (ReplyException e) { failureException = e.getCause(); } catch (InterruptedException e) { @@ -128,13 +130,13 @@ public class TXSynchronizationCommand extends BaseCommand { } if (failureException != null) { try { - writeException(msg, failureException, false, servConn); + writeException(clientMessage, failureException, false, serverConnection); } catch (IOException ioe) { if (isDebugEnabled) { logger.debug("Problem writing reply to client", ioe); } } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } } }; @@ -150,11 +152,11 @@ public class TXSynchronizationCommand extends BaseCommand { public void run() { TXStateProxy txState = null; try { - txState = txMgr.masqueradeAs(msg, member, false); + txState = txMgr.masqueradeAs(clientMessage, member, false); int status = statusPart.getInt(); if (isDebugEnabled) { logger.debug("Executing afterCompletion({}) notification for transaction {}", - status, msg.getTransactionId()); + status, clientMessage.getTransactionId()); } txState.setIsJTA(true); txState.afterCompletion(status); @@ -162,7 +164,7 @@ public class TXSynchronizationCommand extends BaseCommand { // where it can be applied to the local cache TXCommitMessage cmsg = txState.getCommitMessage(); try { - CommitCommand.writeCommitResponse(cmsg, msg, servConn); + CommitCommand.writeCommitResponse(cmsg, clientMessage, serverConnection); txMgr.removeHostedTXState(txState.getTxId()); } catch (IOException e) { // not much can be done here @@ -170,16 +172,16 @@ public class TXSynchronizationCommand extends BaseCommand { logger.warn("Problem writing reply to client", e); } } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } catch (RuntimeException e) { try { - writeException(msg, e, false, servConn); + writeException(clientMessage, e, false, serverConnection); } catch (IOException ioe) { if (isDebugEnabled) { logger.debug("Problem writing reply to client", ioe); } } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { @@ -195,12 +197,12 @@ public class TXSynchronizationCommand extends BaseCommand { sync.runSecondRunnable(afterCompletion); } else { if (statusPart.getInt() == Status.STATUS_COMMITTED) { - TXStateProxy txState = txMgr.masqueradeAs(msg, member, false); + TXStateProxy txState = txMgr.masqueradeAs(clientMessage, member, false); try { if (isDebugEnabled) { logger.debug( "Executing beforeCompletion() notification for transaction {} after failover", - msg.getTransactionId()); + clientMessage.getTransactionId()); } txState.setIsJTA(true); txState.beforeCompletion(); @@ -212,8 +214,8 @@ public class TXSynchronizationCommand extends BaseCommand { } } } catch (Exception e) { - writeException(msg, MessageType.EXCEPTION, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, MessageType.EXCEPTION, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); } if (isDebugEnabled) { logger.debug("Sent tx synchronization response"); http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java index 7dbb78f..199ac18 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java @@ -45,43 +45,44 @@ public class UnregisterInterest extends BaseCommand { UnregisterInterest() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws ClassNotFoundException, IOException { Part regionNamePart = null, keyPart = null; String regionName = null; Object key = null; int interestType = 0; StringId errMessage = null; - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); - regionNamePart = msg.getPart(0); - interestType = msg.getPart(1).getInt(); - keyPart = msg.getPart(2); - Part isClosingPart = msg.getPart(3); + regionNamePart = clientMessage.getPart(0); + interestType = clientMessage.getPart(1).getInt(); + keyPart = clientMessage.getPart(2); + Part isClosingPart = clientMessage.getPart(3); byte[] isClosingPartBytes = (byte[]) isClosingPart.getObject(); boolean isClosing = isClosingPartBytes[0] == 0x01; regionName = regionNamePart.getString(); try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean keepalive = false; try { - Part keepalivePart = msg.getPart(4); + Part keepalivePart = clientMessage.getPart(4); byte[] keepaliveBytes = (byte[]) keepalivePart.getObject(); keepalive = keepaliveBytes[0] != 0x00; } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { logger.debug( "{}: Received unregister interest request ({} bytes) from {} for region {} key {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key); + serverConnection.getName(), clientMessage.getPayloadLength(), + serverConnection.getSocketString(), regionName, key); } // Process the unregister interest request @@ -95,9 +96,10 @@ public class UnregisterInterest extends BaseCommand { errMessage = LocalizedStrings.UnRegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_UNREGISTER_INTEREST_REQUEST_IS_NULL; String s = errMessage.toLocalizedString(); - logger.warn("{}: {}", servConn.getName(), s); - writeErrorResponse(msg, MessageType.UNREGISTER_INTEREST_DATA_ERROR, s, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), s); + writeErrorResponse(clientMessage, MessageType.UNREGISTER_INTEREST_DATA_ERROR, s, + serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -108,12 +110,12 @@ public class UnregisterInterest extends BaseCommand { this.securityService.authorizeRegionRead(regionName, key.toString()); } } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { try { @@ -121,8 +123,8 @@ public class UnregisterInterest extends BaseCommand { authzRequest.unregisterInterestAuthorize(regionName, key, interestType); key = unregisterContext.getKey(); } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -141,18 +143,18 @@ public class UnregisterInterest extends BaseCommand { */ // Unregister interest irrelevent of whether the region is present it or // not - servConn.getAcceptor().getCacheClientNotifier().unregisterClientInterest(regionName, key, - interestType, isClosing, servConn.getProxyID(), keepalive); + serverConnection.getAcceptor().getCacheClientNotifier().unregisterClientInterest(regionName, + key, interestType, isClosing, serverConnection.getProxyID(), keepalive); // Update the statistics and write the reply // bserverStats.incLong(processDestroyTimeId, // DistributionStats.getStatTime() - start); // start = DistributionStats.getStatTime(); - writeReply(msg, servConn); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { - logger.debug("{}: Sent unregister interest response for region {} key {}", servConn.getName(), - regionName, key); + logger.debug("{}: Sent unregister interest response for region {} key {}", + serverConnection.getName(), regionName, key); } // bserverStats.incLong(writeDestroyResponseTimeId, // DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java index 7369587..1968bff 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java @@ -46,48 +46,48 @@ public class UnregisterInterestList extends BaseCommand { private UnregisterInterestList() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException { Part regionNamePart = null, keyPart = null, numberOfKeysPart = null; String regionName = null; Object key = null; List keys = null; int numberOfKeys = 0, partNumber = 0; - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); // bserverStats.incLong(readDestroyRequestTimeId, // DistributionStats.getStatTime() - start); // bserverStats.incInt(destroyRequestsId, 1); // start = DistributionStats.getStatTime(); // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); - Part isClosingListPart = msg.getPart(1); + Part isClosingListPart = clientMessage.getPart(1); byte[] isClosingListPartBytes = (byte[]) isClosingListPart.getObject(); boolean isClosingList = isClosingListPartBytes[0] == 0x01; boolean keepalive = false; try { - Part keepalivePart = msg.getPart(2); + Part keepalivePart = clientMessage.getPart(2); byte[] keepalivePartBytes = (byte[]) keepalivePart.getObject(); keepalive = keepalivePartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - numberOfKeysPart = msg.getPart(3); + numberOfKeysPart = clientMessage.getPart(3); numberOfKeys = numberOfKeysPart.getInt(); partNumber = 4; keys = new ArrayList(); for (int i = 0; i < numberOfKeys; i++) { - keyPart = msg.getPart(partNumber + i); + keyPart = clientMessage.getPart(partNumber + i); try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } keys.add(key); @@ -95,8 +95,8 @@ public class UnregisterInterestList extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug( "{}: Received unregister interest request ({} bytes) from {} for the following {} keys in region {}: {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), numberOfKeys, - regionName, keys); + serverConnection.getName(), clientMessage.getPayloadLength(), + serverConnection.getSocketString(), numberOfKeys, regionName, keys); } // Process the unregister interest request @@ -113,22 +113,23 @@ public class UnregisterInterestList extends BaseCommand { LocalizedStrings.UnRegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_UNREGISTER_INTEREST_REQUEST_IS_NULL; } String s = errMessage.toLocalizedString(); - logger.warn("{}: {}", servConn.getName(), s); - writeErrorResponse(msg, MessageType.UNREGISTER_INTEREST_DATA_ERROR, s, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), s); + writeErrorResponse(clientMessage, MessageType.UNREGISTER_INTEREST_DATA_ERROR, s, + serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } try { this.securityService.authorizeRegionRead(regionName); } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { try { @@ -136,8 +137,8 @@ public class UnregisterInterestList extends BaseCommand { authzRequest.unregisterInterestListAuthorize(regionName, keys); keys = (List) unregisterContext.getKey(); } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -155,20 +156,20 @@ public class UnregisterInterestList extends BaseCommand { * responded = true; } else { */ // Register interest - servConn.getAcceptor().getCacheClientNotifier().unregisterClientInterest(regionName, keys, - isClosingList, servConn.getProxyID(), keepalive); + serverConnection.getAcceptor().getCacheClientNotifier().unregisterClientInterest(regionName, + keys, isClosingList, serverConnection.getProxyID(), keepalive); // Update the statistics and write the reply // bserverStats.incLong(processDestroyTimeId, // DistributionStats.getStatTime() - start); // start = DistributionStats.getStatTime(); WHY ARE GETTING START AND NOT // USING IT? - writeReply(msg, servConn); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { logger.debug( "{}: Sent unregister interest response for the following {} keys in region {}: {}", - servConn.getName(), numberOfKeys, regionName, keys); + serverConnection.getName(), numberOfKeys, regionName, keys); } // bserverStats.incLong(writeDestroyResponseTimeId, // DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java index 57aca22..2f434fb 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java @@ -35,8 +35,9 @@ public class UpdateClientNotification extends BaseCommand { private UpdateClientNotification() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { - CacheServerStats stats = servConn.getCacheServerStats(); + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) + throws IOException { + CacheServerStats stats = serverConnection.getCacheServerStats(); { long oldStart = start; start = DistributionStats.getStatTime(); http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index 4e450c7..1afe6ff 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -2149,7 +2149,7 @@ public class Connection implements Runnable { logger.fatal(LocalizedMessage .create(LocalizedStrings.Connection_FAILED_HANDLING_CHUNK_MESSAGE), ex); } - } else /* (msgType == END_CHUNKED_MSG_TYPE) */ { + } else /* (messageType == END_CHUNKED_MSG_TYPE) */ { MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion); this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, len); try { http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java b/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java index 031f827..80b16fc 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java +++ b/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java @@ -30,8 +30,7 @@ import java.io.ObjectStreamClass; /** * Reusable Input/Output operation utility methods. - *

- * + * * @since GemFire 6.6 */ @SuppressWarnings("unused") @@ -44,8 +43,7 @@ public abstract class IOUtils { * File.separator character. If the pathname is unspecified (null, empty or blank) then path * elements are considered relative to file system root, beginning with File.separator. If array * of path elements are null, then the pathname is returned as is. - *

- * + * * @param pathname a String value indicating the base pathname. * @param pathElements the path elements to append to pathname. * @return the path elements appended to the pathname. http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java b/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java new file mode 100644 index 0000000..017e0f5 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.distributed; + +import org.apache.geode.cache.Cache; + +/** + * Provides tests a way to access non-public state in ServerLauncher + */ +public class ServerLauncherUtils { + + /** + * Returns the Cache from an online in-process ServerLauncher instance + */ + public static Cache getCache(final ServerLauncher serverLauncher) { + return serverLauncher.getCache(); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java index 39aa1e6..b529f0c 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java @@ -14,166 +14,141 @@ */ package org.apache.geode.internal.cache.ha; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; -import java.io.IOException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.awaitility.Awaitility; - -import org.apache.geode.test.junit.categories.ClientSubscriptionTest; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.apache.geode.cache.CacheException; import org.apache.geode.internal.cache.Conflatable; import org.apache.geode.internal.cache.EventID; +import org.apache.geode.test.junit.categories.ClientSubscriptionTest; import org.apache.geode.test.junit.categories.IntegrationTest; /** * Test runs all tests of HARegionQueueJUnitTest using BlockingHARegionQueue instead of * HARegionQueue. - * - * */ @Category({IntegrationTest.class, ClientSubscriptionTest.class}) public class BlockingHARegionQueueJUnitTest extends HARegionQueueJUnitTest { - /** - * Creates Blocking HA region-queue object - * - * @return Blocking HA region-queue object - * @throws IOException - * @throws ClassNotFoundException - * @throws CacheException - * @throws InterruptedException - */ - protected HARegionQueue createHARegionQueue(String name) - throws IOException, ClassNotFoundException, CacheException, InterruptedException { - HARegionQueue regionqueue = - HARegionQueue.getHARegionQueueInstance(name, cache, HARegionQueue.BLOCKING_HA_QUEUE, false); - return regionqueue; - } - - /** - * Creates Blocking HA region-queue object - * - * @return Blocking HA region-queue object - * @throws IOException - * @throws ClassNotFoundException - * @throws CacheException - * @throws InterruptedException - */ - protected HARegionQueue createHARegionQueue(String name, HARegionQueueAttributes attrs) - throws IOException, ClassNotFoundException, CacheException, InterruptedException { - HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, cache, attrs, - HARegionQueue.BLOCKING_HA_QUEUE, false); - return regionqueue; + @Override + protected int queueType() { + return HARegionQueue.BLOCKING_HA_QUEUE; } /** * Tests the effect of a put which is blocked because of capacity constraint & subsequent passage * because of take operation - * */ @Test - public void testBlockingPutAndTake() - throws InterruptedException, IOException, ClassNotFoundException { + public void testBlockingPutAndTake() throws Exception { HARegionQueueAttributes hrqa = new HARegionQueueAttributes(); hrqa.setBlockingQueueCapacity(1); - final HARegionQueue hrq = this.createHARegionQueue("testBlockingPutAndTake", hrqa); - hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only. + + HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName(), hrqa); + hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only. + EventID id1 = new EventID(new byte[] {1}, 1, 1); hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing")); - Thread t1 = new Thread(new Runnable() { - public void run() { - try { - EventID id2 = new EventID(new byte[] {1}, 1, 2); - hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing")); - } catch (Exception e) { - encounteredException = true; - } + + AtomicBoolean threadStarted = new AtomicBoolean(false); + + Thread thread = new Thread(() -> { + try { + threadStarted.set(true); + EventID id2 = new EventID(new byte[] {1}, 1, 2); + hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing")); + } catch (InterruptedException e) { + errorCollector.addError(e); } }); - t1.start(); - Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive()); + thread.start(); + + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> threadStarted.get()); + Conflatable conf = (Conflatable) hrq.take(); - assertNotNull(conf); - Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !t1.isAlive()); + assertThat(conf, notNullValue()); + + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !thread.isAlive()); } /** * Test Scenario : BlockingQueue capacity is 1. The first put should be successful. The second put * should block till a peek/remove happens. - * */ @Test - public void testBlockingPutAndPeekRemove() - throws InterruptedException, IOException, ClassNotFoundException { + public void testBlockingPutAndPeekRemove() throws Exception { HARegionQueueAttributes hrqa = new HARegionQueueAttributes(); hrqa.setBlockingQueueCapacity(1); - final HARegionQueue hrq = this.createHARegionQueue("testBlockingPutAndPeekRemove", hrqa); + + HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName(), hrqa); hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only. + EventID id1 = new EventID(new byte[] {1}, 1, 1); hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing")); - Thread t1 = new Thread(new Runnable() { - public void run() { - try { - EventID id2 = new EventID(new byte[] {1}, 1, 2); - hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing")); - } catch (Exception e) { - encounteredException = true; - } + + AtomicBoolean threadStarted = new AtomicBoolean(false); + + Thread thread = new Thread(() -> { + try { + threadStarted.set(true); + EventID id2 = new EventID(new byte[] {1}, 1, 2); + hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing")); + } catch (Exception e) { + errorCollector.addError(e); } }); - t1.start(); - Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive()); + thread.start(); + + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> threadStarted.get()); + Conflatable conf = (Conflatable) hrq.peek(); - assertNotNull(conf); + assertThat(conf, notNullValue()); + hrq.remove(); - Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !t1.isAlive()); - assertFalse("Exception occurred in put-thread", encounteredException); + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !thread.isAlive()); } /** * Test Scenario :Blocking Queue capacity is 1. The first put should be successful.The second put * should block till the first put expires. - * + *

+ * fix for 40314 - capacity constraint is checked for primary only and expiry is not applicable on + * primary so marking this test as invalid. */ - // fix for 40314 - capacity constraint is checked for primary only and - // expiry is not applicable on primary so marking this test as invalid. - @Ignore @Test - public void testBlockingPutAndExpiry() - throws InterruptedException, IOException, ClassNotFoundException { + public void testBlockingPutAndExpiry() throws Exception { HARegionQueueAttributes hrqa = new HARegionQueueAttributes(); hrqa.setBlockingQueueCapacity(1); hrqa.setExpiryTime(1); - final HARegionQueue hrq = this.createHARegionQueue("testBlockingPutAndExpiry", hrqa); + + HARegionQueue hrq = this.createHARegionQueue(this.testName.getMethodName(), hrqa); EventID id1 = new EventID(new byte[] {1}, 1, 1); - long start = System.currentTimeMillis(); + hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing")); - Thread t1 = new Thread(new Runnable() { - public void run() { - try { - EventID id2 = new EventID(new byte[] {1}, 1, 2); - hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing")); - } catch (Exception e) { - encounteredException = true; - } + + AtomicBoolean threadStarted = new AtomicBoolean(false); + + Thread thread = new Thread(() -> { + try { + threadStarted.set(true); + EventID id2 = new EventID(new byte[] {1}, 1, 2); + hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing")); + } catch (Exception e) { + errorCollector.addError(e); } }); - t1.start(); - Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive()); - waitAtLeast(1000, start, () -> { - assertFalse("Put-thread blocked unexpectedly", t1.isAlive()); - }); - assertFalse("Exception occurred in put-thread", encounteredException); + thread.start(); + + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> threadStarted.get()); + + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !thread.isAlive()); } }