Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A891E200CB7 for ; Tue, 23 May 2017 04:23:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A7825160BD5; Tue, 23 May 2017 02:23:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 67341160BDE for ; Tue, 23 May 2017 04:23:18 +0200 (CEST) Received: (qmail 78107 invoked by uid 500); 23 May 2017 02:23:17 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 77417 invoked by uid 99); 23 May 2017 02:23:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 May 2017 02:23:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7240FE968C; Tue, 23 May 2017 02:23:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: klund@apache.org To: commits@geode.apache.org Date: Tue, 23 May 2017 02:23:26 -0000 Message-Id: In-Reply-To: <018987527bbb400193565032b11a6552@git.apache.org> References: <018987527bbb400193565032b11a6552@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/22] geode git commit: Cleanup BaseCommand archived-at: Tue, 23 May 2017 02:23:22 -0000 http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java index 9ed00be..f09c854 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java @@ -23,21 +23,20 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.regex.Pattern; +import edu.umd.cs.findbugs.annotations.SuppressWarnings; import org.apache.logging.log4j.Logger; -import org.apache.geode.CancelException; import org.apache.geode.CopyException; import org.apache.geode.InternalGemFireError; import org.apache.geode.SerializationException; import org.apache.geode.SystemFailure; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheLoaderException; import org.apache.geode.cache.CacheWriterException; import org.apache.geode.cache.InterestResultPolicy; @@ -86,24 +85,12 @@ import org.apache.geode.security.GemFireSecurityException; public abstract class BaseCommand implements Command { protected static final Logger logger = LogService.getLogger(); - /** - * Whether zipped values are being passed to/from the client. Can be modified using the system - * property Message.ZIP_VALUES ? This does not appear to happen anywhere - */ - protected static final boolean zipValues = false; - - protected static final boolean APPLY_RETRIES = - Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "gateway.ApplyRetries"); - - public static final byte[] OK_BYTES = new byte[] {0}; - - public static final int maximumChunkSize = - Integer.getInteger("BridgeServer.MAXIMUM_CHUNK_SIZE", 100).intValue(); + private static final byte[] OK_BYTES = new byte[] {0}; - /** Maximum number of entries in each chunked response chunk */ + public static final int MAXIMUM_CHUNK_SIZE = Integer.getInteger("BridgeServer.MAXIMUM_CHUNK_SIZE", 100); /** Whether to suppress logging of IOExceptions */ - private static boolean suppressIOExceptionLogging = + private static final boolean SUPPRESS_IO_EXCEPTION_LOGGING = Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "bridge.suppressIOExceptionLogging"); /** @@ -112,86 +99,88 @@ public abstract class BaseCommand implements Command { * of them completes or fails. The bytes are computed based in the size sent in the incoming msg * header. */ - private static final int MAX_INCOMING_DATA = - Integer.getInteger("BridgeServer.MAX_INCOMING_DATA", -1).intValue(); + private static final int MAX_INCOMING_DATA = Integer.getInteger("BridgeServer.MAX_INCOMING_DATA", -1); /** * Maximum number of concurrent incoming client messages that a bridge server will allow. Once a * server is working on this number additional incoming client messages will wait until one of * them completes or fails. */ - private static final int MAX_INCOMING_MSGS = - Integer.getInteger("BridgeServer.MAX_INCOMING_MSGS", -1).intValue(); + private static final int MAX_INCOMING_MESSAGES = Integer.getInteger("BridgeServer.MAX_INCOMING_MSGS", -1); - private static final Semaphore incomingDataLimiter; + private static final Semaphore INCOMING_DATA_LIMITER; + + private static final Semaphore INCOMING_MSG_LIMITER; + + protected SecurityService securityService = IntegratedSecurityService.getSecurityService(); - private static final Semaphore incomingMsgLimiter; static { - Semaphore tmp; + Semaphore semaphore; if (MAX_INCOMING_DATA > 0) { // backport requires that this is fair since we inc by values > 1 - tmp = new Semaphore(MAX_INCOMING_DATA, true); + semaphore = new Semaphore(MAX_INCOMING_DATA, true); } else { - tmp = null; + semaphore = null; } - incomingDataLimiter = tmp; - if (MAX_INCOMING_MSGS > 0) { - tmp = new Semaphore(MAX_INCOMING_MSGS, false); // unfair for best - // performance + INCOMING_DATA_LIMITER = semaphore; + if (MAX_INCOMING_MESSAGES > 0) { + // unfair for best performance + semaphore = new Semaphore(MAX_INCOMING_MESSAGES, false); } else { - tmp = null; + semaphore = null; } - incomingMsgLimiter = tmp; - + INCOMING_MSG_LIMITER = semaphore; } - protected SecurityService securityService = IntegratedSecurityService.getSecurityService(); + protected static byte[] okBytes() { + return OK_BYTES; + } - public void execute(Message msg, ServerConnection servConn) { + @Override + public void execute(Message clientMessage, ServerConnection serverConnection) { // Read the request and update the statistics long start = DistributionStats.getStatTime(); - // servConn.resetTransientData(); - if (EntryLogger.isEnabled() && servConn != null) { - EntryLogger.setSource(servConn.getMembershipID(), "c2s"); + if (EntryLogger.isEnabled() && serverConnection != null) { + EntryLogger.setSource(serverConnection.getMembershipID(), "c2s"); } - boolean shouldMasquerade = shouldMasqueradeForTx(msg, servConn); + boolean shouldMasquerade = shouldMasqueradeForTx(clientMessage, serverConnection); try { if (shouldMasquerade) { - InternalCache cache = servConn.getCache(); + InternalCache cache = serverConnection.getCache(); InternalDistributedMember member = - (InternalDistributedMember) servConn.getProxyID().getDistributedMember(); + (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember(); TXManagerImpl txMgr = cache.getTxManager(); TXStateProxy tx = null; try { - tx = txMgr.masqueradeAs(msg, member, false); - cmdExecute(msg, servConn, start); + tx = txMgr.masqueradeAs(clientMessage, member, false); + cmdExecute(clientMessage, serverConnection, start); tx.updateProxyServer(txMgr.getMemberId()); } finally { txMgr.unmasquerade(tx); } } else { - cmdExecute(msg, servConn, start); + cmdExecute(clientMessage, serverConnection, start); } } catch (TransactionException | CopyException | SerializationException | CacheWriterException | CacheLoaderException | GemFireSecurityException | PartitionOfflineException | MessageTooLargeException e) { - handleExceptionNoDisconnect(msg, servConn, e); + handleExceptionNoDisconnect(clientMessage, serverConnection, e); } catch (EOFException eof) { - BaseCommand.handleEOFException(msg, servConn, eof); + BaseCommand.handleEOFException(clientMessage, serverConnection, eof); } catch (InterruptedIOException e) { // Solaris only - BaseCommand.handleInterruptedIOException(msg, servConn, e); + BaseCommand.handleInterruptedIOException(serverConnection, e); } catch (IOException e) { - BaseCommand.handleIOException(msg, servConn, e); + BaseCommand.handleIOException(clientMessage, serverConnection, e); } catch (DistributedSystemDisconnectedException e) { - BaseCommand.handleShutdownException(msg, servConn, e); + BaseCommand.handleShutdownException(clientMessage, serverConnection, e); } catch (VirtualMachineError err) { SystemFailure.initiateFailure(err); // If this ever returns, rethrow the error. We're poisoned // now, so don't let this thread continue. throw err; } catch (Throwable e) { - BaseCommand.handleThrowable(msg, servConn, e); + BaseCommand.handleThrowable(clientMessage, serverConnection, e); } finally { EntryLogger.clearSource(); } @@ -201,16 +190,10 @@ public abstract class BaseCommand implements Command { * checks to see if this thread needs to masquerade as a transactional thread. clients after * GFE_66 should be able to start a transaction. * - * @param msg - * @param servConn * @return true if thread should masquerade as a transactional thread. */ - protected boolean shouldMasqueradeForTx(Message msg, ServerConnection servConn) { - if (servConn.getClientVersion().compareTo(Version.GFE_66) >= 0 - && msg.getTransactionId() > TXManagerImpl.NOTX) { - return true; - } - return false; + protected boolean shouldMasqueradeForTx(Message clientMessage, ServerConnection serverConnection) { + return serverConnection.getClientVersion().compareTo(Version.GFE_66) >= 0 && clientMessage.getTransactionId() > TXManagerImpl.NOTX; } /** @@ -221,13 +204,11 @@ public abstract class BaseCommand implements Command { *

* The client event should have the event identifier from the client and the region affected by * the operation. - * - * @param clientEvent */ public boolean recoverVersionTagForRetriedOperation(EntryEventImpl clientEvent) { LocalRegion r = clientEvent.getRegion(); - VersionTag tag = null; - if ((clientEvent.getVersionTag() != null) && (clientEvent.getVersionTag().isGatewayTag())) { + VersionTag tag; + if (clientEvent.getVersionTag() != null && clientEvent.getVersionTag().isGatewayTag()) { tag = r.findVersionTagForGatewayEvent(clientEvent.getEventId()); } else { tag = r.findVersionTagForClientEvent(clientEvent.getEventId()); @@ -246,7 +227,7 @@ public abstract class BaseCommand implements Command { } clientEvent.setVersionTag(tag); } - return (tag != null); + return tag != null; } /** @@ -258,18 +239,18 @@ public abstract class BaseCommand implements Command { * The client event should have the event identifier from the client and the region affected by * the operation. */ - protected VersionTag findVersionTagsForRetriedBulkOp(LocalRegion r, EventID eventID) { - VersionTag tag = r.findVersionTagForClientBulkOp(eventID); + protected VersionTag findVersionTagsForRetriedBulkOp(LocalRegion region, EventID eventID) { + VersionTag tag = region.findVersionTagForClientBulkOp(eventID); if (tag != null) { if (logger.isDebugEnabled()) { logger.debug("recovered version tag {} for replayed bulk operation {}", tag, eventID); } return tag; } - if (r instanceof DistributedRegion || r instanceof PartitionedRegion) { + if (region instanceof DistributedRegion || region instanceof PartitionedRegion) { // TODO this could be optimized for partitioned regions by sending the key // so that the PR could look at an individual bucket for the event - tag = FindVersionTagOperation.findVersionTag(r, eventID, true); + tag = FindVersionTagOperation.findVersionTag(region, eventID, true); } if (tag != null) { if (logger.isDebugEnabled()) { @@ -279,285 +260,231 @@ public abstract class BaseCommand implements Command { return tag; } - abstract public void cmdExecute(Message msg, ServerConnection servConn, long start) + public abstract void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException, InterruptedException; - protected void writeReply(Message origMsg, ServerConnection servConn) throws IOException { - Message replyMsg = servConn.getReplyMessage(); - servConn.getCache().getCancelCriterion().checkCancelInProgress(null); + protected void writeReply(Message origMsg, ServerConnection serverConnection) throws IOException { + Message replyMsg = serverConnection.getReplyMessage(); + serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null); replyMsg.setMessageType(MessageType.REPLY); replyMsg.setNumberOfParts(1); replyMsg.setTransactionId(origMsg.getTransactionId()); - replyMsg.addBytesPart(OK_BYTES); - replyMsg.send(servConn); + replyMsg.addBytesPart(okBytes()); + replyMsg.send(serverConnection); if (logger.isTraceEnabled()) { - logger.trace("{}: rpl tx: {}", servConn.getName(), origMsg.getTransactionId()); + logger.trace("{}: rpl tx: {}", serverConnection.getName(), origMsg.getTransactionId()); } } - protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection servConn, + protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection serverConnection, PartitionedRegion pr, byte nwHop) throws IOException { - Message replyMsg = servConn.getReplyMessage(); - servConn.getCache().getCancelCriterion().checkCancelInProgress(null); + Message replyMsg = serverConnection.getReplyMessage(); + serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null); replyMsg.setMessageType(MessageType.REPLY); replyMsg.setNumberOfParts(1); replyMsg.setTransactionId(origMsg.getTransactionId()); replyMsg.addBytesPart(new byte[] {pr.getMetadataVersion(), nwHop}); - replyMsg.send(servConn); + replyMsg.send(serverConnection); pr.getPrStats().incPRMetaDataSentCount(); if (logger.isTraceEnabled()) { - logger.trace("{}: rpl with REFRESH_METADAT tx: {}", servConn.getName(), + logger.trace("{}: rpl with REFRESH_METADATA tx: {}", serverConnection.getName(), origMsg.getTransactionId()); } } - private static void handleEOFException(Message msg, ServerConnection servConn, Exception eof) { - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - CacheServerStats stats = servConn.getCacheServerStats(); - boolean potentialModification = servConn.getPotentialModification(); + private static void handleEOFException(Message msg, ServerConnection serverConnection, Exception eof) { + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + CacheServerStats stats = serverConnection.getCacheServerStats(); + boolean potentialModification = serverConnection.getPotentialModification(); if (!crHelper.isShutdown()) { if (potentialModification) { stats.incAbandonedWriteRequests(); } else { stats.incAbandonedReadRequests(); } - if (!suppressIOExceptionLogging) { + if (!SUPPRESS_IO_EXCEPTION_LOGGING) { if (potentialModification) { - int transId = (msg != null) ? msg.getTransactionId() : Integer.MIN_VALUE; + int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE; logger.warn(LocalizedMessage.create( LocalizedStrings.BaseCommand_0_EOFEXCEPTION_DURING_A_WRITE_OPERATION_ON_REGION__1_KEY_2_MESSAGEID_3, - new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(), - Integer.valueOf(transId)})); + new Object[] {serverConnection.getName(), serverConnection.getModRegion(), serverConnection.getModKey(), transId })); } else { logger.debug("EOF exception", eof); logger.info(LocalizedMessage.create( LocalizedStrings.BaseCommand_0_CONNECTION_DISCONNECT_DETECTED_BY_EOF, - servConn.getName())); + serverConnection.getName())); } } } - servConn.setFlagProcessMessagesAsFalse(); - servConn.setClientDisconnectedException(eof); + serverConnection.setFlagProcessMessagesAsFalse(); + serverConnection.setClientDisconnectedException(eof); } - private static void handleInterruptedIOException(Message msg, ServerConnection servConn, - Exception e) { - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - if (!crHelper.isShutdown() && servConn.isOpen()) { - if (!suppressIOExceptionLogging) { + private static void handleInterruptedIOException(ServerConnection serverConnection, Exception e) { + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + if (!crHelper.isShutdown() && serverConnection.isOpen()) { + if (!SUPPRESS_IO_EXCEPTION_LOGGING) { if (logger.isDebugEnabled()) logger.debug("Aborted message due to interrupt: {}", e.getMessage(), e); } } - servConn.setFlagProcessMessagesAsFalse(); - servConn.setClientDisconnectedException(e); + serverConnection.setFlagProcessMessagesAsFalse(); + serverConnection.setClientDisconnectedException(e); } - private static void handleIOException(Message msg, ServerConnection servConn, Exception e) { - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - boolean potentialModification = servConn.getPotentialModification(); + private static void handleIOException(Message msg, ServerConnection serverConnection, Exception e) { + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + boolean potentialModification = serverConnection.getPotentialModification(); - if (!crHelper.isShutdown() && servConn.isOpen()) { - if (!suppressIOExceptionLogging) { + if (!crHelper.isShutdown() && serverConnection.isOpen()) { + if (!SUPPRESS_IO_EXCEPTION_LOGGING) { if (potentialModification) { - int transId = (msg != null) ? msg.getTransactionId() : Integer.MIN_VALUE; + int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE; logger.warn(LocalizedMessage.create( LocalizedStrings.BaseCommand_0_UNEXPECTED_IOEXCEPTION_DURING_OPERATION_FOR_REGION_1_KEY_2_MESSID_3, - new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(), - Integer.valueOf(transId)}), + new Object[] {serverConnection.getName(), serverConnection.getModRegion(), serverConnection.getModKey(), transId }), e); } else { logger.warn(LocalizedMessage.create(LocalizedStrings.BaseCommand_0_UNEXPECTED_IOEXCEPTION, - servConn.getName()), e); + serverConnection.getName()), e); } } } - servConn.setFlagProcessMessagesAsFalse(); - servConn.setClientDisconnectedException(e); + serverConnection.setFlagProcessMessagesAsFalse(); + serverConnection.setClientDisconnectedException(e); } - private static void handleShutdownException(Message msg, ServerConnection servConn, Exception e) { - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - boolean potentialModification = servConn.getPotentialModification(); + private static void handleShutdownException(Message msg, ServerConnection serverConnection, Exception e) { + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + boolean potentialModification = serverConnection.getPotentialModification(); if (!crHelper.isShutdown()) { if (potentialModification) { - int transId = (msg != null) ? msg.getTransactionId() : Integer.MIN_VALUE; + int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE; logger.warn(LocalizedMessage.create( LocalizedStrings.BaseCommand_0_UNEXPECTED_SHUTDOWNEXCEPTION_DURING_OPERATION_ON_REGION_1_KEY_2_MESSAGEID_3, - new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(), - Integer.valueOf(transId)}), + new Object[] {serverConnection.getName(), serverConnection.getModRegion(), serverConnection.getModKey(), transId }), e); } else { logger.warn(LocalizedMessage.create( - LocalizedStrings.BaseCommand_0_UNEXPECTED_SHUTDOWNEXCEPTION, servConn.getName()), e); + LocalizedStrings.BaseCommand_0_UNEXPECTED_SHUTDOWNEXCEPTION, serverConnection.getName()), e); } } - servConn.setFlagProcessMessagesAsFalse(); - servConn.setClientDisconnectedException(e); + serverConnection.setFlagProcessMessagesAsFalse(); + serverConnection.setClientDisconnectedException(e); } - // Handle GemfireSecurityExceptions separately since the connection should not - // be terminated (by setting processMessages to false) unlike in - // handleThrowable. Fixes bugs #38384 and #39392. - // private static void handleGemfireSecurityException(Message msg, - // ServerConnection servConn, GemFireSecurityException e) { - // - // boolean requiresResponse = servConn.getTransientFlag(REQUIRES_RESPONSE); - // boolean responded = servConn.getTransientFlag(RESPONDED); - // boolean requiresChunkedResponse = servConn - // .getTransientFlag(REQUIRES_CHUNKED_RESPONSE); - // boolean potentialModification = servConn.getPotentialModification(); - // - // try { - // try { - // if (requiresResponse && !responded) { - // if (requiresChunkedResponse) { - // writeChunkedException(msg, e, false, servConn); - // } - // else { - // writeException(msg, e, false, servConn); - // } - // servConn.setAsTrue(RESPONDED); - // } - // } - // finally { // inner try-finally to ensure proper ordering of logging - // if (potentialModification) { - // int transId = (msg != null) ? msg.getTransactionId() - // : Integer.MIN_VALUE; - // } - // } - // } - // catch (IOException ioe) { - // if (logger.isDebugEnabled()) { - // logger.fine(servConn.getName() - // + ": Unexpected IOException writing security exception: ", ioe); - // } - // } - // } - - private static void handleExceptionNoDisconnect(Message msg, ServerConnection servConn, + private static void handleExceptionNoDisconnect(Message msg, ServerConnection serverConnection, Exception e) { - boolean requiresResponse = servConn.getTransientFlag(REQUIRES_RESPONSE); - boolean responded = servConn.getTransientFlag(RESPONDED); - boolean requiresChunkedResponse = servConn.getTransientFlag(REQUIRES_CHUNKED_RESPONSE); - boolean potentialModification = servConn.getPotentialModification(); - boolean wroteExceptionResponse = false; + boolean requiresResponse = serverConnection.getTransientFlag(REQUIRES_RESPONSE); + boolean responded = serverConnection.getTransientFlag(RESPONDED); + boolean requiresChunkedResponse = serverConnection.getTransientFlag(REQUIRES_CHUNKED_RESPONSE); + boolean potentialModification = serverConnection.getPotentialModification(); try { + boolean wroteExceptionResponse = false; try { if (requiresResponse && !responded) { if (requiresChunkedResponse) { - writeChunkedException(msg, e, false, servConn); + writeChunkedException(msg, e, serverConnection); } else { - writeException(msg, e, false, servConn); + writeException(msg, e, false, serverConnection); } wroteExceptionResponse = true; - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } } finally { // inner try-finally to ensure proper ordering of logging if (potentialModification) { - int transId = (msg != null) ? msg.getTransactionId() : Integer.MIN_VALUE; + int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE; if (!wroteExceptionResponse) { logger.warn(LocalizedMessage.create( LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION_DURING_OPERATION_ON_REGION_1_KEY_2_MESSAGEID_3, - new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(), - Integer.valueOf(transId)}), + new Object[] {serverConnection.getName(), serverConnection.getModRegion(), serverConnection.getModKey(), transId }), e); } else { if (logger.isDebugEnabled()) { logger.debug("{}: Exception during operation on region: {} key: {} messageId: {}", - servConn.getName(), servConn.getModRegion(), servConn.getModKey(), transId, e); + serverConnection.getName(), serverConnection.getModRegion(), serverConnection.getModKey(), transId, e); } } } else { if (!wroteExceptionResponse) { logger.warn(LocalizedMessage.create(LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION, - servConn.getName()), e); + serverConnection.getName()), e); } else { if (logger.isDebugEnabled()) { - logger.debug("{}: Exception: {}", servConn.getName(), e.getMessage(), e); + logger.debug("{}: Exception: {}", serverConnection.getName(), e.getMessage(), e); } } } } } catch (IOException ioe) { if (logger.isDebugEnabled()) { - logger.debug("{}: Unexpected IOException writing exception: {}", servConn.getName(), + logger.debug("{}: Unexpected IOException writing exception: {}", serverConnection.getName(), ioe.getMessage(), ioe); } } } - private static void handleThrowable(Message msg, ServerConnection servConn, Throwable th) { - boolean requiresResponse = servConn.getTransientFlag(REQUIRES_RESPONSE); - boolean responded = servConn.getTransientFlag(RESPONDED); - boolean requiresChunkedResponse = servConn.getTransientFlag(REQUIRES_CHUNKED_RESPONSE); - boolean potentialModification = servConn.getPotentialModification(); + private static void handleThrowable(Message msg, ServerConnection serverConnection, Throwable th) { + boolean requiresResponse = serverConnection.getTransientFlag(REQUIRES_RESPONSE); + boolean responded = serverConnection.getTransientFlag(RESPONDED); + boolean requiresChunkedResponse = serverConnection.getTransientFlag(REQUIRES_CHUNKED_RESPONSE); + boolean potentialModification = serverConnection.getPotentialModification(); try { try { if (th instanceof Error) { logger.fatal(LocalizedMessage.create( - LocalizedStrings.BaseCommand_0_UNEXPECTED_ERROR_ON_SERVER, servConn.getName()), th); + LocalizedStrings.BaseCommand_0_UNEXPECTED_ERROR_ON_SERVER, serverConnection.getName()), th); } if (requiresResponse && !responded) { if (requiresChunkedResponse) { - writeChunkedException(msg, th, false, servConn); + writeChunkedException(msg, th, serverConnection); } else { - writeException(msg, th, false, servConn); + writeException(msg, th, false, serverConnection); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } } finally { // inner try-finally to ensure proper ordering of logging - if (th instanceof Error) { - // log nothing - } else if (th instanceof CancelException) { - // log nothing - } else { + if (!(th instanceof Error || th instanceof CacheLoaderException)) { if (potentialModification) { - int transId = (msg != null) ? msg.getTransactionId() : Integer.MIN_VALUE; + int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE; logger.warn(LocalizedMessage.create( LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION_DURING_OPERATION_ON_REGION_1_KEY_2_MESSAGEID_3, - new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(), - Integer.valueOf(transId)}), + new Object[] {serverConnection.getName(), serverConnection.getModRegion(), serverConnection.getModKey(), transId }), th); } else { logger.warn(LocalizedMessage.create(LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION, - servConn.getName()), th); + serverConnection.getName()), th); } } } } catch (IOException ioe) { if (logger.isDebugEnabled()) { - logger.debug("{}: Unexpected IOException writing exception: {}", servConn.getName(), + logger.debug("{}: Unexpected IOException writing exception: {}", serverConnection.getName(), ioe.getMessage(), ioe); } } finally { - servConn.setFlagProcessMessagesAsFalse(); - servConn.setClientDisconnectedException(th); + serverConnection.setFlagProcessMessagesAsFalse(); + serverConnection.setClientDisconnectedException(th); } } - protected static void writeChunkedException(Message origMsg, Throwable e, boolean isSevere, - ServerConnection servConn) throws IOException { - writeChunkedException(origMsg, e, isSevere, servConn, servConn.getChunkedResponseMessage()); + protected static void writeChunkedException(Message origMsg, Throwable e, ServerConnection serverConnection) throws IOException { + writeChunkedException(origMsg, e, serverConnection, serverConnection.getChunkedResponseMessage()); } - protected static void writeChunkedException(Message origMsg, Throwable e, boolean isSevere, - ServerConnection servConn, ChunkedMessage originalReponse) throws IOException { - writeChunkedException(origMsg, e, isSevere, servConn, originalReponse, 2); + protected static void writeChunkedException(Message origMsg, Throwable e, ServerConnection serverConnection, ChunkedMessage originalResponse) throws IOException { + writeChunkedException(origMsg, e, serverConnection, originalResponse, 2); } - protected static void writeChunkedException(Message origMsg, Throwable exception, - boolean isSevere, ServerConnection servConn, ChunkedMessage originalReponse, int numOfParts) + private static void writeChunkedException(Message origMsg, Throwable exception, ServerConnection serverConnection, ChunkedMessage originalResponse, int numOfParts) throws IOException { - Throwable e = getClientException(servConn, exception); - ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); - chunkedResponseMsg.setServerConnection(servConn); - if (originalReponse.headerHasBeenSent()) { - // chunkedResponseMsg = originalReponse; - // fix for bug 35442 + Throwable e = getClientException(serverConnection, exception); + ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); + chunkedResponseMsg.setServerConnection(serverConnection); + if (originalResponse.headerHasBeenSent()) { chunkedResponseMsg.setNumberOfParts(numOfParts); chunkedResponseMsg.setLastChunkAndNumParts(true, numOfParts); chunkedResponseMsg.addObjPart(e); @@ -565,7 +492,7 @@ public abstract class BaseCommand implements Command { chunkedResponseMsg.addStringPart(getExceptionTrace(e)); } if (logger.isDebugEnabled()) { - logger.debug("{}: Sending exception chunk while reply in progress: {}", servConn.getName(), + logger.debug("{}: Sending exception chunk while reply in progress: {}", serverConnection.getName(), e.getMessage(), e); } } else { @@ -579,10 +506,10 @@ public abstract class BaseCommand implements Command { chunkedResponseMsg.addStringPart(getExceptionTrace(e)); } if (logger.isDebugEnabled()) { - logger.debug("{}: Sending exception chunk: {}", servConn.getName(), e.getMessage(), e); + logger.debug("{}: Sending exception chunk: {}", serverConnection.getName(), e.getMessage(), e); } } - chunkedResponseMsg.sendChunk(servConn); + chunkedResponseMsg.sendChunk(serverConnection); } // Get the exception stacktrace for native clients @@ -595,26 +522,25 @@ public abstract class BaseCommand implements Command { } protected static void writeException(Message origMsg, Throwable e, boolean isSevere, - ServerConnection servConn) throws IOException { - writeException(origMsg, MessageType.EXCEPTION, e, isSevere, servConn); + ServerConnection serverConnection) throws IOException { + writeException(origMsg, MessageType.EXCEPTION, e, isSevere, serverConnection); } - private static Throwable getClientException(ServerConnection servConn, Throwable e) { - Cache cache = servConn.getCache(); - if (cache instanceof InternalCache) { - InternalCache icache = (InternalCache) servConn.getCache(); - OldClientSupportService svc = icache.getService(OldClientSupportService.class); + private static Throwable getClientException(ServerConnection serverConnection, Throwable e) { + InternalCache cache = serverConnection.getCache(); + if (cache != null) { + OldClientSupportService svc = cache.getService(OldClientSupportService.class); if (svc != null) { - return svc.getThrowable(e, servConn.getClientVersion()); + return svc.getThrowable(e, serverConnection.getClientVersion()); } } return e; } protected static void writeException(Message origMsg, int msgType, Throwable e, boolean isSevere, - ServerConnection servConn) throws IOException { - Throwable theException = getClientException(servConn, e); - Message errorMsg = servConn.getErrorResponseMessage(); + ServerConnection serverConnection) throws IOException { + Throwable theException = getClientException(serverConnection, e); + Message errorMsg = serverConnection.getErrorResponseMessage(); errorMsg.setMessageType(msgType); errorMsg.setNumberOfParts(2); errorMsg.setTransactionId(origMsg.getTransactionId()); @@ -628,9 +554,9 @@ public abstract class BaseCommand implements Command { } errorMsg.addObjPart(theException); errorMsg.addStringPart(getExceptionTrace(theException)); - errorMsg.send(servConn); + errorMsg.send(serverConnection); if (logger.isDebugEnabled()) { - logger.debug("{}: Wrote exception: {}", servConn.getName(), e.getMessage(), e); + logger.debug("{}: Wrote exception: {}", serverConnection.getName(), e.getMessage(), e); } if (e instanceof MessageTooLargeException) { throw (IOException) e; @@ -638,41 +564,41 @@ public abstract class BaseCommand implements Command { } protected static void writeErrorResponse(Message origMsg, int messageType, - ServerConnection servConn) throws IOException { - Message errorMsg = servConn.getErrorResponseMessage(); + ServerConnection serverConnection) throws IOException { + Message errorMsg = serverConnection.getErrorResponseMessage(); errorMsg.setMessageType(messageType); errorMsg.setNumberOfParts(1); errorMsg.setTransactionId(origMsg.getTransactionId()); errorMsg.addStringPart( LocalizedStrings.BaseCommand_INVALID_DATA_RECEIVED_PLEASE_SEE_THE_CACHE_SERVER_LOG_FILE_FOR_ADDITIONAL_DETAILS .toLocalizedString()); - errorMsg.send(servConn); + errorMsg.send(serverConnection); } protected static void writeErrorResponse(Message origMsg, int messageType, String msg, - ServerConnection servConn) throws IOException { - Message errorMsg = servConn.getErrorResponseMessage(); + ServerConnection serverConnection) throws IOException { + Message errorMsg = serverConnection.getErrorResponseMessage(); errorMsg.setMessageType(messageType); errorMsg.setNumberOfParts(1); errorMsg.setTransactionId(origMsg.getTransactionId()); errorMsg.addStringPart(msg); - errorMsg.send(servConn); + errorMsg.send(serverConnection); } protected static void writeRegionDestroyedEx(Message msg, String regionName, String title, - ServerConnection servConn) throws IOException { - String reason = servConn.getName() + ": Region named " + regionName + title; + ServerConnection serverConnection) throws IOException { + String reason = serverConnection.getName() + ": Region named " + regionName + title; RegionDestroyedException ex = new RegionDestroyedException(reason, regionName); - if (servConn.getTransientFlag(REQUIRES_CHUNKED_RESPONSE)) { - writeChunkedException(msg, ex, false, servConn); + if (serverConnection.getTransientFlag(REQUIRES_CHUNKED_RESPONSE)) { + writeChunkedException(msg, ex, serverConnection); } else { - writeException(msg, ex, false, servConn); + writeException(msg, ex, false, serverConnection); } } protected static void writeResponse(Object data, Object callbackArg, Message origMsg, - boolean isObject, ServerConnection servConn) throws IOException { - Message responseMsg = servConn.getResponseMessage(); + boolean isObject, ServerConnection serverConnection) throws IOException { + Message responseMsg = serverConnection.getResponseMessage(); responseMsg.setMessageType(MessageType.RESPONSE); responseMsg.setTransactionId(origMsg.getTransactionId()); @@ -686,20 +612,20 @@ public abstract class BaseCommand implements Command { responseMsg.addRawPart((byte[]) data, isObject); } else { Assert.assertTrue(isObject, "isObject should be true when value is not a byte[]"); - responseMsg.addObjPart(data, zipValues); + responseMsg.addObjPart(data, false); } if (callbackArg != null) { responseMsg.addObjPart(callbackArg); } - servConn.getCache().getCancelCriterion().checkCancelInProgress(null); - responseMsg.send(servConn); + serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null); + responseMsg.send(serverConnection); origMsg.clearParts(); } protected static void writeResponseWithRefreshMetadata(Object data, Object callbackArg, - Message origMsg, boolean isObject, ServerConnection servConn, PartitionedRegion pr, + Message origMsg, boolean isObject, ServerConnection serverConnection, PartitionedRegion pr, byte nwHop) throws IOException { - Message responseMsg = servConn.getResponseMessage(); + Message responseMsg = serverConnection.getResponseMessage(); responseMsg.setMessageType(MessageType.RESPONSE); responseMsg.setTransactionId(origMsg.getTransactionId()); @@ -713,32 +639,32 @@ public abstract class BaseCommand implements Command { responseMsg.addRawPart((byte[]) data, isObject); } else { Assert.assertTrue(isObject, "isObject should be true when value is not a byte[]"); - responseMsg.addObjPart(data, zipValues); + responseMsg.addObjPart(data, false); } if (callbackArg != null) { responseMsg.addObjPart(callbackArg); } responseMsg.addBytesPart(new byte[] {pr.getMetadataVersion(), nwHop}); - servConn.getCache().getCancelCriterion().checkCancelInProgress(null); - responseMsg.send(servConn); + serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null); + responseMsg.send(serverConnection); origMsg.clearParts(); } protected static void writeResponseWithFunctionAttribute(byte[] data, Message origMsg, - ServerConnection servConn) throws IOException { - Message responseMsg = servConn.getResponseMessage(); + ServerConnection serverConnection) throws IOException { + Message responseMsg = serverConnection.getResponseMessage(); responseMsg.setMessageType(MessageType.RESPONSE); responseMsg.setTransactionId(origMsg.getTransactionId()); responseMsg.setNumberOfParts(1); responseMsg.addBytesPart(data); - servConn.getCache().getCancelCriterion().checkCancelInProgress(null); - responseMsg.send(servConn); + serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null); + responseMsg.send(serverConnection); origMsg.clearParts(); } - static protected void checkForInterrupt(ServerConnection servConn, Exception e) + protected static void checkForInterrupt(ServerConnection serverConnection, Exception e) throws InterruptedException, InterruptedIOException { - servConn.getCachedRegionHelper().checkCancelInProgress(e); + serverConnection.getCachedRegionHelper().checkCancelInProgress(e); if (e instanceof InterruptedException) { throw (InterruptedException) e; } @@ -747,37 +673,35 @@ public abstract class BaseCommand implements Command { } } - protected static void writeQueryResponseChunk(Object queryResponseChunk, - CollectionType collectionType, boolean lastChunk, ServerConnection servConn) + static void writeQueryResponseChunk(Object queryResponseChunk, CollectionType collectionType, boolean lastChunk, ServerConnection serverConnection) throws IOException { - ChunkedMessage queryResponseMsg = servConn.getQueryResponseMessage(); + ChunkedMessage queryResponseMsg = serverConnection.getQueryResponseMessage(); queryResponseMsg.setNumberOfParts(2); queryResponseMsg.setLastChunk(lastChunk); - queryResponseMsg.addObjPart(collectionType, zipValues); - queryResponseMsg.addObjPart(queryResponseChunk, zipValues); - queryResponseMsg.sendChunk(servConn); + queryResponseMsg.addObjPart(collectionType, false); + queryResponseMsg.addObjPart(queryResponseChunk, false); + queryResponseMsg.sendChunk(serverConnection); } - protected static void writeQueryResponseException(Message origMsg, Throwable exception, - boolean isSevere, ServerConnection servConn) throws IOException { - Throwable e = getClientException(servConn, exception); - ChunkedMessage queryResponseMsg = servConn.getQueryResponseMessage(); - ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); + protected static void writeQueryResponseException(Message origMsg, Throwable exception, ServerConnection serverConnection) throws IOException { + Throwable e = getClientException(serverConnection, exception); + ChunkedMessage queryResponseMsg = serverConnection.getQueryResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); if (queryResponseMsg.headerHasBeenSent()) { // fix for bug 35442 // This client is expecting 2 parts in this message so send 2 parts - queryResponseMsg.setServerConnection(servConn); + queryResponseMsg.setServerConnection(serverConnection); queryResponseMsg.setNumberOfParts(2); queryResponseMsg.setLastChunkAndNumParts(true, 2); queryResponseMsg.addObjPart(e); queryResponseMsg.addStringPart(getExceptionTrace(e)); if (logger.isDebugEnabled()) { - logger.debug("{}: Sending exception chunk while reply in progress: {}", servConn.getName(), + logger.debug("{}: Sending exception chunk while reply in progress: {}", serverConnection.getName(), e.getMessage(), e); } - queryResponseMsg.sendChunk(servConn); + queryResponseMsg.sendChunk(serverConnection); } else { - chunkedResponseMsg.setServerConnection(servConn); + chunkedResponseMsg.setServerConnection(serverConnection); chunkedResponseMsg.setMessageType(MessageType.EXCEPTION); chunkedResponseMsg.setNumberOfParts(2); chunkedResponseMsg.setLastChunkAndNumParts(true, 2); @@ -786,19 +710,18 @@ public abstract class BaseCommand implements Command { chunkedResponseMsg.addObjPart(e); chunkedResponseMsg.addStringPart(getExceptionTrace(e)); if (logger.isDebugEnabled()) { - logger.debug("{}: Sending exception chunk: {}", servConn.getName(), e.getMessage(), e); + logger.debug("{}: Sending exception chunk: {}", serverConnection.getName(), e.getMessage(), e); } - chunkedResponseMsg.sendChunk(servConn); + chunkedResponseMsg.sendChunk(serverConnection); } } protected static void writeChunkedErrorResponse(Message origMsg, int messageType, String message, - ServerConnection servConn) throws IOException { + ServerConnection serverConnection) throws IOException { // Send chunked response header identifying error message - ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); if (logger.isDebugEnabled()) { - logger.debug(servConn.getName() + ": Sending error message header type: " + messageType - + " transaction: " + origMsg.getTransactionId()); + logger.debug("{}: Sending error message header type: {} transaction: {}", serverConnection.getName(), messageType, origMsg.getTransactionId()); } chunkedResponseMsg.setMessageType(messageType); chunkedResponseMsg.setTransactionId(origMsg.getTransactionId()); @@ -806,32 +729,31 @@ public abstract class BaseCommand implements Command { // Send actual error if (logger.isDebugEnabled()) { - logger.debug("{}: Sending error message chunk: {}", servConn.getName(), message); + logger.debug("{}: Sending error message chunk: {}", serverConnection.getName(), message); } chunkedResponseMsg.setNumberOfParts(1); chunkedResponseMsg.setLastChunk(true); chunkedResponseMsg.addStringPart(message); - chunkedResponseMsg.sendChunk(servConn); + chunkedResponseMsg.sendChunk(serverConnection); } - protected static void writeFunctionResponseException(Message origMsg, int messageType, - String message, ServerConnection servConn, Throwable exception) throws IOException { - Throwable e = getClientException(servConn, exception); - ChunkedMessage functionResponseMsg = servConn.getFunctionResponseMessage(); - ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); + protected static void writeFunctionResponseException(Message origMsg, int messageType, ServerConnection serverConnection, Throwable exception) throws IOException { + Throwable e = getClientException(serverConnection, exception); + ChunkedMessage functionResponseMsg = serverConnection.getFunctionResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); if (functionResponseMsg.headerHasBeenSent()) { - functionResponseMsg.setServerConnection(servConn); + functionResponseMsg.setServerConnection(serverConnection); functionResponseMsg.setNumberOfParts(2); functionResponseMsg.setLastChunkAndNumParts(true, 2); functionResponseMsg.addObjPart(e); functionResponseMsg.addStringPart(getExceptionTrace(e)); if (logger.isDebugEnabled()) { - logger.debug("{}: Sending exception chunk while reply in progress: {}", servConn.getName(), + logger.debug("{}: Sending exception chunk while reply in progress: {}", serverConnection.getName(), e.getMessage(), e); } - functionResponseMsg.sendChunk(servConn); + functionResponseMsg.sendChunk(serverConnection); } else { - chunkedResponseMsg.setServerConnection(servConn); + chunkedResponseMsg.setServerConnection(serverConnection); chunkedResponseMsg.setMessageType(messageType); chunkedResponseMsg.setNumberOfParts(2); chunkedResponseMsg.setLastChunkAndNumParts(true, 2); @@ -840,9 +762,9 @@ public abstract class BaseCommand implements Command { chunkedResponseMsg.addObjPart(e); chunkedResponseMsg.addStringPart(getExceptionTrace(e)); if (logger.isDebugEnabled()) { - logger.debug("{}: Sending exception chunk: {}", servConn.getName(), e.getMessage(), e); + logger.debug("{}: Sending exception chunk: {}", serverConnection.getName(), e.getMessage(), e); } - chunkedResponseMsg.sendChunk(servConn); + chunkedResponseMsg.sendChunk(serverConnection); } } @@ -898,14 +820,14 @@ public abstract class BaseCommand implements Command { Message requestMsg = null; try { requestMsg = servConn.getRequestMessage(); - requestMsg.recv(servConn, MAX_INCOMING_DATA, incomingDataLimiter, incomingMsgLimiter); + requestMsg.recv(servConn, MAX_INCOMING_DATA, INCOMING_DATA_LIMITER, INCOMING_MSG_LIMITER); return requestMsg; } catch (EOFException eof) { handleEOFException(null, servConn, eof); - // TODO:Asif: Check if there is any need for explicitly returning + // TODO: Check if there is any need for explicitly returning } catch (InterruptedIOException e) { // Solaris only - handleInterruptedIOException(null, servConn, e); + handleInterruptedIOException(servConn, e); } catch (IOException e) { handleIOException(null, servConn, e); @@ -930,7 +852,7 @@ public abstract class BaseCommand implements Command { fillAndSendRegisterInterestResponseChunks(region, riKey, interestType, false, policy, servConn); } - /* + /** * serializeValues is unused for clients < GFE_80 */ protected static void fillAndSendRegisterInterestResponseChunks(LocalRegion region, Object riKey, @@ -959,20 +881,20 @@ public abstract class BaseCommand implements Command { // Not supported yet throw new InternalGemFireError( LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString()); + case InterestType.FILTER_CLASS: throw new InternalGemFireError( LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString()); - // handleFilter(region, (String)riKey, policy); - // break; - case InterestType.REGULAR_EXPRESSION: { + + case InterestType.REGULAR_EXPRESSION: String regEx = (String) riKey; if (regEx.equals(".*")) { handleAllKeys(region, policy, servConn); } else { handleRegEx(region, regEx, policy, servConn); } - } break; + case InterestType.KEY: if (riKey.equals("ALL_KEYS")) { handleAllKeys(region, policy, servConn); @@ -980,13 +902,13 @@ public abstract class BaseCommand implements Command { handleSingleton(region, riKey, policy, servConn); } break; + default: throw new InternalGemFireError( LocalizedStrings.BaseCommand_UNKNOWN_INTEREST_TYPE.toLocalizedString()); } } - @SuppressWarnings("rawtypes") private static void handleKeysValuesPolicy(LocalRegion region, Object riKey, int interestType, boolean serializeValues, ServerConnection servConn) throws IOException { if (riKey instanceof List) { @@ -1002,9 +924,11 @@ public abstract class BaseCommand implements Command { case InterestType.OQL_QUERY: throw new InternalGemFireError( LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString()); + case InterestType.FILTER_CLASS: throw new InternalGemFireError( LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString()); + case InterestType.REGULAR_EXPRESSION: String regEx = (String) riKey; if (regEx.equals(".*")) { @@ -1013,6 +937,7 @@ public abstract class BaseCommand implements Command { handleKVAllKeys(region, regEx, serializeValues, servConn); } break; + case InterestType.KEY: if (riKey.equals("ALL_KEYS")) { handleKVAllKeys(region, null, serializeValues, servConn); @@ -1020,6 +945,7 @@ public abstract class BaseCommand implements Command { handleKVSingleton(region, riKey, serializeValues, servConn); } break; + default: throw new InternalGemFireError( LocalizedStrings.BaseCommand_UNKNOWN_INTEREST_TYPE.toLocalizedString()); @@ -1029,18 +955,14 @@ public abstract class BaseCommand implements Command { /** * @param list is a List of entry keys */ - protected static void sendRegisterInterestResponseChunk(Region region, Object riKey, - ArrayList list, boolean lastChunk, ServerConnection servConn) throws IOException { + private static void sendRegisterInterestResponseChunk(Region region, Object riKey, List list, boolean lastChunk, ServerConnection servConn) throws IOException { ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage(); chunkedResponseMsg.setNumberOfParts(1); chunkedResponseMsg.setLastChunk(lastChunk); - chunkedResponseMsg.addObjPart(list, zipValues); - String regionName = (region == null) ? " null " : region.getFullPath(); + chunkedResponseMsg.addObjPart(list, false); + String regionName = region == null ? " null " : region.getFullPath(); if (logger.isDebugEnabled()) { - String str = servConn.getName() + ": Sending" + (lastChunk ? " last " : " ") - + "register interest response chunk for region: " + regionName + " for keys: " + riKey - + " chunk=<" + chunkedResponseMsg + ">"; - logger.debug(str); + logger.debug("{}: Sending{}register interest response chunk for region: {} for keys: {} chunk=<{}>", servConn.getName(), lastChunk ? " last " : " ", regionName, riKey, chunkedResponseMsg); } chunkedResponseMsg.sendChunk(servConn); @@ -1050,14 +972,12 @@ public abstract class BaseCommand implements Command { * Determines whether keys for destroyed entries (tombstones) should be sent to clients in * register-interest results. * - * @param servConn - * @param policy * @return true if tombstones should be sent to the client */ private static boolean sendTombstonesInRIResults(ServerConnection servConn, InterestResultPolicy policy) { - return (policy == InterestResultPolicy.KEYS_VALUES) - && (servConn.getClientVersion().compareTo(Version.GFE_80) >= 0); + return policy == InterestResultPolicy.KEYS_VALUES + && servConn.getClientVersion().compareTo(Version.GFE_80) >= 0; } /** @@ -1066,7 +986,6 @@ public abstract class BaseCommand implements Command { * @param region the region * @param keyList the list of keys * @param policy the policy - * @throws IOException */ private static void handleList(LocalRegion region, List keyList, InterestResultPolicy policy, ServerConnection servConn) throws IOException { @@ -1075,15 +994,13 @@ public abstract class BaseCommand implements Command { handleListPR((PartitionedRegion) region, keyList, policy, servConn); return; } - ArrayList newKeyList = new ArrayList(maximumChunkSize); + List newKeyList = new ArrayList(MAXIMUM_CHUNK_SIZE); // Handle list of keys if (region != null) { - for (Iterator it = keyList.iterator(); it.hasNext();) { - Object entryKey = it.next(); - if (region.containsKey(entryKey) || (sendTombstonesInRIResults(servConn, policy) - && region.containsTombstone(entryKey))) { + for (Object entryKey : keyList) { + if (region.containsKey(entryKey) || sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey)) { - appendInterestResponseKey(region, keyList, entryKey, newKeyList, "list", servConn); + appendInterestResponseKey(region, keyList, entryKey, newKeyList, servConn); } } } @@ -1095,13 +1012,11 @@ public abstract class BaseCommand implements Command { /** * Handles both RR and PR cases */ - @SuppressWarnings("rawtypes") - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_PARAM_DEREF", + @SuppressWarnings(value = "NP_NULL_PARAM_DEREF", justification = "Null value handled in sendNewRegisterInterestResponseChunk()") private static void handleKVSingleton(LocalRegion region, Object entryKey, boolean serializeValues, ServerConnection servConn) throws IOException { - VersionedObjectList values = new VersionedObjectList(maximumChunkSize, true, - region == null ? true : region.getAttributes().getConcurrencyChecksEnabled(), + VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, true, region == null || region.getAttributes().getConcurrencyChecksEnabled(), serializeValues); if (region != null) { @@ -1126,15 +1041,14 @@ public abstract class BaseCommand implements Command { * @param region the region * @param entryKey the key * @param policy the policy - * @throws IOException */ private static void handleSingleton(LocalRegion region, Object entryKey, InterestResultPolicy policy, ServerConnection servConn) throws IOException { - ArrayList keyList = new ArrayList(1); + List keyList = new ArrayList(1); if (region != null) { if (region.containsKey(entryKey) - || (sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey))) { - appendInterestResponseKey(region, entryKey, entryKey, keyList, "individual", servConn); + || sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey)) { + appendInterestResponseKey(region, entryKey, entryKey, keyList, servConn); } } // Send the last chunk (the only chunk for individual and list keys) @@ -1147,15 +1061,13 @@ public abstract class BaseCommand implements Command { * * @param region the region * @param policy the policy - * @throws IOException */ private static void handleAllKeys(LocalRegion region, InterestResultPolicy policy, ServerConnection servConn) throws IOException { - ArrayList keyList = new ArrayList(maximumChunkSize); + List keyList = new ArrayList(MAXIMUM_CHUNK_SIZE); if (region != null) { - for (Iterator it = region.keySet(sendTombstonesInRIResults(servConn, policy)).iterator(); it - .hasNext();) { - appendInterestResponseKey(region, "ALL_KEYS", it.next(), keyList, "ALL_KEYS", servConn); + for (Object entryKey : region.keySet(sendTombstonesInRIResults(servConn, policy))) { + appendInterestResponseKey(region, "ALL_KEYS", entryKey, keyList, servConn); } } // Send the last chunk (the only chunk for individual and list keys) @@ -1163,30 +1075,19 @@ public abstract class BaseCommand implements Command { sendRegisterInterestResponseChunk(region, "ALL_KEYS", keyList, true, servConn); } - /** - * @param region - * @param regex - * @param serializeValues - * @param servConn - * @throws IOException - */ private static void handleKVAllKeys(LocalRegion region, String regex, boolean serializeValues, ServerConnection servConn) throws IOException { - if (region != null && region instanceof PartitionedRegion) { + if (region instanceof PartitionedRegion) { handleKVKeysPR((PartitionedRegion) region, regex, serializeValues, servConn); return; } - VersionedObjectList values = new VersionedObjectList(maximumChunkSize, true, - region == null ? true : region.getAttributes().getConcurrencyChecksEnabled(), + VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, true, region == null || region.getAttributes().getConcurrencyChecksEnabled(), serializeValues); if (region != null) { - VersionTag versionTag = null; - Object data = null; - Pattern keyPattern = null; if (regex != null) { keyPattern = Pattern.compile(regex); @@ -1207,11 +1108,11 @@ public abstract class BaseCommand implements Command { } ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); - data = region.get(key, null, true, true, true, id, versionHolder, true); - versionTag = versionHolder.getVersionTag(); + Object data = region.get(key, null, true, true, true, id, versionHolder, true); + VersionTag versionTag = versionHolder.getVersionTag(); updateValues(values, key, data, versionTag); - if (values.size() == maximumChunkSize) { + if (values.size() == MAXIMUM_CHUNK_SIZE) { sendNewRegisterInterestResponseChunk(region, regex != null ? regex : "ALL_KEYS", values, false, servConn); values.clear(); @@ -1227,20 +1128,18 @@ public abstract class BaseCommand implements Command { private static void handleKVKeysPR(PartitionedRegion region, Object keyInfo, boolean serializeValues, ServerConnection servConn) throws IOException { - int id = 0; - HashMap bucketKeys = null; - VersionedObjectList values = new VersionedObjectList(maximumChunkSize, true, + VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, true, region.getConcurrencyChecksEnabled(), serializeValues); - if (keyInfo != null && keyInfo instanceof List) { - bucketKeys = new HashMap(); + if (keyInfo instanceof List) { + HashMap bucketKeys = new HashMap<>(); for (Object key : (List) keyInfo) { - id = PartitionedRegionHelper.getHashKey(region, null, key, null, null); + int id = PartitionedRegionHelper.getHashKey(region, null, key, null, null); if (bucketKeys.containsKey(id)) { bucketKeys.get(id).add(key); } else { - HashSet keys = new HashSet(); + HashSet keys = new HashSet<>(); keys.add(key); bucketKeys.put(id, keys); } @@ -1259,8 +1158,6 @@ public abstract class BaseCommand implements Command { /** * Copied from Get70.getValueAndIsObject(), except a minor change. (Make the method static instead * of copying it here?) - * - * @param value */ private static void updateValues(VersionedObjectList values, Object key, Object value, VersionTag versionTag) { @@ -1274,8 +1171,7 @@ public abstract class BaseCommand implements Command { boolean wasInvalid = false; if (value instanceof CachedDeserializable) { value = ((CachedDeserializable) value).getValue(); - } else if (value == Token.REMOVED_PHASE1 || value == Token.REMOVED_PHASE2 - || value == Token.DESTROYED || value == Token.TOMBSTONE) { + } else if (isRemovalToken(value)) { value = null; } else if (value == Token.INVALID || value == Token.LOCAL_INVALID) { value = null; // fix for bug 35884 @@ -1292,46 +1188,39 @@ public abstract class BaseCommand implements Command { } } + private static boolean isRemovalToken(final Object value) { + return value == Token.REMOVED_PHASE1 || value == Token.REMOVED_PHASE2 + || value == Token.DESTROYED || value == Token.TOMBSTONE; + } + public static void appendNewRegisterInterestResponseChunkFromLocal(LocalRegion region, VersionedObjectList values, Object riKeys, Set keySet, ServerConnection servConn) throws IOException { ClientProxyMembershipID requestingClient = servConn == null ? null : servConn.getProxyID(); - for (Iterator it = keySet.iterator(); it.hasNext();) { - Object key = it.next(); + for (Object key : keySet) { VersionTagHolder versionHolder = createVersionTagHolder(); Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true); updateValues(values, key, value, versionHolder.getVersionTag()); - if (values.size() == maximumChunkSize) { + if (values.size() == MAXIMUM_CHUNK_SIZE) { // Send the chunk and clear the list // values.setKeys(null); // Now we need to send keys too. - sendNewRegisterInterestResponseChunk(region, riKeys != null ? riKeys : "ALL_KEYS", values, - false, servConn); + sendNewRegisterInterestResponseChunk(region, riKeys != null ? riKeys : "ALL_KEYS", values, false, servConn); values.clear(); } } // for } - /** - * - * @param region - * @param values {@link VersionedObjectList} - * @param riKeys - * @param set set of entries - * @param servConn - * @throws IOException - */ public static void appendNewRegisterInterestResponseChunk(LocalRegion region, - VersionedObjectList values, Object riKeys, Set set, ServerConnection servConn) + VersionedObjectList values, Object riKeys, Set set, ServerConnection servConn) throws IOException { - for (Iterator it = set.iterator(); it.hasNext();) { - Map.Entry entry = it.next(); // Region.Entry or Map.Entry + for (Entry entry : set) { if (entry instanceof Region.Entry) { // local entries - VersionTag vt = null; - Object key = null; - Object value = null; + VersionTag vt; + Object key; + Object value; if (entry instanceof EntrySnapshot) { vt = ((EntrySnapshot) entry).getVersionTag(); key = ((EntrySnapshot) entry).getRegionEntry().getKey(); @@ -1349,16 +1238,14 @@ public abstract class BaseCommand implements Command { } } } else { // Map.Entry (remote entries) - ArrayList list = (ArrayList) entry.getValue(); + List list = (List) entry.getValue(); Object value = list.get(0); VersionTag tag = (VersionTag) list.get(1); updateValues(values, entry.getKey(), value, tag); } - if (values.size() == maximumChunkSize) { + if (values.size() == MAXIMUM_CHUNK_SIZE) { // Send the chunk and clear the list - // values.setKeys(null); // Now we need to send keys too. - sendNewRegisterInterestResponseChunk(region, riKeys != null ? riKeys : "ALL_KEYS", values, - false, servConn); + sendNewRegisterInterestResponseChunk(region, riKeys != null ? riKeys : "ALL_KEYS", values, false, servConn); values.clear(); } } // for @@ -1369,25 +1256,18 @@ public abstract class BaseCommand implements Command { ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage(); chunkedResponseMsg.setNumberOfParts(1); chunkedResponseMsg.setLastChunk(lastChunk); - chunkedResponseMsg.addObjPart(list, zipValues); - String regionName = (region == null) ? " null " : region.getFullPath(); + chunkedResponseMsg.addObjPart(list, false); + String regionName = region == null ? " null " : region.getFullPath(); if (logger.isDebugEnabled()) { - String str = servConn.getName() + ": Sending" + (lastChunk ? " last " : " ") - + "register interest response chunk for region: " + regionName + " for keys: " + riKey - + " chunk=<" + chunkedResponseMsg + ">"; - logger.debug(str); + logger.debug("{}: Sending{}register interest response chunk for region: {} for keys: {} chunk=<{}>", + servConn.getName(), lastChunk ? " last " : " ", regionName, riKey, chunkedResponseMsg + ); } - chunkedResponseMsg.sendChunk(servConn); } /** * Process an interest request of type {@link InterestType#REGULAR_EXPRESSION} - * - * @param region the region - * @param regex the regex - * @param policy the policy - * @throws IOException */ private static void handleRegEx(LocalRegion region, String regex, InterestResultPolicy policy, ServerConnection servConn) throws IOException { @@ -1396,13 +1276,11 @@ public abstract class BaseCommand implements Command { handleRegExPR((PartitionedRegion) region, regex, policy, servConn); return; } - ArrayList keyList = new ArrayList(maximumChunkSize); + List keyList = new ArrayList(MAXIMUM_CHUNK_SIZE); // Handle the regex pattern - Pattern keyPattern = Pattern.compile(regex); if (region != null) { - for (Iterator it = region.keySet(sendTombstonesInRIResults(servConn, policy)).iterator(); it - .hasNext();) { - Object entryKey = it.next(); + Pattern keyPattern = Pattern.compile(regex); + for (Object entryKey : region.keySet(sendTombstonesInRIResults(servConn, policy))) { if (!(entryKey instanceof String)) { // key is not a String, cannot apply regex to this entry continue; @@ -1412,7 +1290,7 @@ public abstract class BaseCommand implements Command { continue; } - appendInterestResponseKey(region, regex, entryKey, keyList, "regex", servConn); + appendInterestResponseKey(region, regex, entryKey, keyList, servConn); } } // Send the last chunk (the only chunk for individual and list keys) @@ -1422,19 +1300,15 @@ public abstract class BaseCommand implements Command { /** * Process an interest request of type {@link InterestType#REGULAR_EXPRESSION} - * - * @param region the region - * @param regex the regex - * @param policy the policy - * @throws IOException */ private static void handleRegExPR(final PartitionedRegion region, final String regex, final InterestResultPolicy policy, final ServerConnection servConn) throws IOException { - final ArrayList keyList = new ArrayList(maximumChunkSize); + final List keyList = new ArrayList(MAXIMUM_CHUNK_SIZE); region.getKeysWithRegEx(regex, sendTombstonesInRIResults(servConn, policy), new PartitionedRegion.SetCollector() { + @Override public void receiveSet(Set theSet) throws IOException { - appendInterestResponseKeys(region, regex, theSet, keyList, "regex", servConn); + appendInterestResponseKeys(region, regex, theSet, keyList, servConn); } }); // Send the last chunk (the only chunk for individual and list keys) @@ -1444,19 +1318,15 @@ public abstract class BaseCommand implements Command { /** * Process an interest request involving a list of keys - * - * @param region the region - * @param keyList the list of keys - * @param policy the policy - * @throws IOException */ private static void handleListPR(final PartitionedRegion region, final List keyList, final InterestResultPolicy policy, final ServerConnection servConn) throws IOException { - final ArrayList newKeyList = new ArrayList(maximumChunkSize); + final List newKeyList = new ArrayList(MAXIMUM_CHUNK_SIZE); region.getKeysWithList(keyList, sendTombstonesInRIResults(servConn, policy), new PartitionedRegion.SetCollector() { + @Override public void receiveSet(Set theSet) throws IOException { - appendInterestResponseKeys(region, keyList, theSet, newKeyList, "list", servConn); + appendInterestResponseKeys(region, keyList, theSet, newKeyList, servConn); } }); // Send the last chunk (the only chunk for individual and list keys) @@ -1464,34 +1334,29 @@ public abstract class BaseCommand implements Command { sendRegisterInterestResponseChunk(region, keyList, newKeyList, true, servConn); } - @SuppressWarnings("rawtypes") private static void handleKVList(final LocalRegion region, final List keyList, boolean serializeValues, final ServerConnection servConn) throws IOException { - if (region != null && region instanceof PartitionedRegion) { + if (region instanceof PartitionedRegion) { handleKVKeysPR((PartitionedRegion) region, keyList, serializeValues, servConn); return; } - VersionedObjectList values = new VersionedObjectList(maximumChunkSize, true, - region == null ? true : region.getAttributes().getConcurrencyChecksEnabled(), + VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, true, region == null || region.getAttributes().getConcurrencyChecksEnabled(), serializeValues); // Handle list of keys if (region != null) { - VersionTag versionTag = null; - Object data = null; - for (Iterator it = keyList.iterator(); it.hasNext();) { - Object key = it.next(); + for (Object key : keyList) { if (region.containsKey(key) || region.containsTombstone(key)) { VersionTagHolder versionHolder = createVersionTagHolder(); ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); - data = region.get(key, null, true, true, true, id, versionHolder, true); - versionTag = versionHolder.getVersionTag(); + Object data = region.get(key, null, true, true, true, id, versionHolder, true); + VersionTag versionTag = versionHolder.getVersionTag(); updateValues(values, key, data, versionTag); - if (values.size() == maximumChunkSize) { + if (values.size() == MAXIMUM_CHUNK_SIZE) { // Send the chunk and clear the list // values.setKeys(null); // Now we need to send keys too. sendNewRegisterInterestResponseChunk(region, keyList, values, false, servConn); @@ -1518,27 +1383,24 @@ public abstract class BaseCommand implements Command { * @param riKey the registerInterest "key" (what the client is interested in) * @param entryKey key we're responding to * @param list list to append to - * @param kind for debugging */ - private static void appendInterestResponseKey(LocalRegion region, Object riKey, Object entryKey, - ArrayList list, String kind, ServerConnection servConn) throws IOException { + private static void appendInterestResponseKey(LocalRegion region, Object riKey, Object entryKey, List list, ServerConnection servConn) throws IOException { list.add(entryKey); if (logger.isDebugEnabled()) { logger.debug("{}: appendInterestResponseKey <{}>; list size was {}; region: {}", servConn.getName(), entryKey, list.size(), region.getFullPath()); } - if (list.size() == maximumChunkSize) { + if (list.size() == MAXIMUM_CHUNK_SIZE) { // Send the chunk and clear the list sendRegisterInterestResponseChunk(region, riKey, list, false, servConn); list.clear(); } } - protected static void appendInterestResponseKeys(LocalRegion region, Object riKey, - Collection entryKeys, ArrayList collector, String riDescr, ServerConnection servConn) + private static void appendInterestResponseKeys(LocalRegion region, Object riKey, Collection entryKeys, List collector, ServerConnection servConn) throws IOException { - for (Iterator it = entryKeys.iterator(); it.hasNext();) { - appendInterestResponseKey(region, riKey, it.next(), collector, riDescr, servConn); + for (final Object entryKey : entryKeys) { + appendInterestResponseKey(region, riKey, entryKey, collector, servConn); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/92bc5159/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java index 5f7a8ef..adf702a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java @@ -193,11 +193,11 @@ public abstract class BaseCommandQuery extends BaseCommand { } } - int numberOfChunks = (int) Math.ceil(selectResults.size() * 1.0 / maximumChunkSize); + int numberOfChunks = (int) Math.ceil(selectResults.size() * 1.0 / MAXIMUM_CHUNK_SIZE); if (logger.isTraceEnabled()) { logger.trace("{}: Query results size: {}: Entries in chunk: {}: Number of chunks: {}", - servConn.getName(), selectResults.size(), maximumChunkSize, numberOfChunks); + servConn.getName(), selectResults.size(), MAXIMUM_CHUNK_SIZE, numberOfChunks); } long oldStart = start; @@ -262,7 +262,7 @@ public abstract class BaseCommandQuery extends BaseCommand { QueryInvalidException qie = new QueryInvalidException(LocalizedStrings.BaseCommand_0_QUERYSTRING_IS_1 .toLocalizedString(new Object[] {e.getLocalizedMessage(), queryString})); - writeQueryResponseException(msg, qie, false, servConn); + writeQueryResponseException(msg, qie, servConn); return false; } catch (DistributedSystemDisconnectedException se) { if (msg != null && logger.isDebugEnabled()) { @@ -282,7 +282,7 @@ public abstract class BaseCommandQuery extends BaseCommand { if ((defaultQuery).isCanceled()) { e = new QueryException(defaultQuery.getQueryCanceledException().getMessage(), e.getCause()); } - writeQueryResponseException(msg, e, false, servConn); + writeQueryResponseException(msg, e, servConn); return false; } finally { // Since the query object is being shared in case of bind queries, @@ -375,8 +375,8 @@ public abstract class BaseCommandQuery extends BaseCommand { if (logger.isTraceEnabled()) { logger.trace("{}: Creating chunk: {}", servConn.getName(), j); } - Object[] results = new Object[maximumChunkSize]; - for (int i = 0; i < maximumChunkSize; i++) { + Object[] results = new Object[MAXIMUM_CHUNK_SIZE]; + for (int i = 0; i < MAXIMUM_CHUNK_SIZE; i++) { if ((resultIndex) == selectResults.size()) { incompleteArray = true; break; @@ -427,9 +427,9 @@ public abstract class BaseCommandQuery extends BaseCommand { if (incompleteArray) { Object[] newResults; if (cqQuery != null) { - newResults = new Object[cqResultIndex % maximumChunkSize]; + newResults = new Object[cqResultIndex % MAXIMUM_CHUNK_SIZE]; } else { - newResults = new Object[resultIndex % maximumChunkSize]; + newResults = new Object[resultIndex % MAXIMUM_CHUNK_SIZE]; } for (int i = 0; i < newResults.length; i++) { newResults[i] = results[i]; @@ -463,8 +463,8 @@ public abstract class BaseCommandQuery extends BaseCommand { if (logger.isTraceEnabled()) { logger.trace("{}: Creating chunk: {}", servConn.getName(), j); } - ObjectPartList serializedObjs = new ObjectPartList(maximumChunkSize, false); - for (int i = 0; i < maximumChunkSize; i++) { + ObjectPartList serializedObjs = new ObjectPartList(MAXIMUM_CHUNK_SIZE, false); + for (int i = 0; i < MAXIMUM_CHUNK_SIZE; i++) { if ((resultIndex) == objs.size()) { break; }