geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [080/100] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop
Date Mon, 22 Feb 2016 21:44:08 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
index 0000000,dd13f19..bef4bf1
mode 000000,100755..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
@@@ -1,0 -1,1625 +1,1611 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ /**
+  *
+  */
+ package com.gemstone.gemfire.internal.cache.tier.sockets;
+ 
+ import java.io.EOFException;
+ import java.io.IOException;
+ import java.io.InterruptedIOException;
+ import java.io.PrintWriter;
+ import java.io.StringWriter;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.Semaphore;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.logging.log4j.Logger;
+ 
+ import com.gemstone.gemfire.CancelException;
+ import com.gemstone.gemfire.CopyException;
+ import com.gemstone.gemfire.InternalGemFireError;
+ import com.gemstone.gemfire.SerializationException;
+ import com.gemstone.gemfire.SystemFailure;
+ import com.gemstone.gemfire.cache.CacheLoaderException;
+ import com.gemstone.gemfire.cache.CacheWriterException;
+ import com.gemstone.gemfire.cache.InterestResultPolicy;
+ import com.gemstone.gemfire.cache.Region;
+ import com.gemstone.gemfire.cache.RegionDestroyedException;
+ import com.gemstone.gemfire.cache.TransactionException;
+ import com.gemstone.gemfire.cache.persistence.PartitionOfflineException;
+ import com.gemstone.gemfire.cache.query.types.CollectionType;
+ import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
+ import com.gemstone.gemfire.distributed.internal.DistributionStats;
+ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+ import com.gemstone.gemfire.internal.Assert;
+ import com.gemstone.gemfire.internal.Version;
+ import com.gemstone.gemfire.internal.cache.CachedDeserializable;
+ import com.gemstone.gemfire.internal.cache.DistributedRegion;
+ import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+ import com.gemstone.gemfire.internal.cache.EntrySnapshot;
+ import com.gemstone.gemfire.internal.cache.EventID;
+ import com.gemstone.gemfire.internal.cache.FindVersionTagOperation;
+ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+ import com.gemstone.gemfire.internal.cache.LocalRegion;
+ import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry;
+ import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+ import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper;
+ import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+ import com.gemstone.gemfire.internal.cache.TXStateProxy;
+ import com.gemstone.gemfire.internal.cache.Token;
+ import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
+ import com.gemstone.gemfire.internal.cache.tier.Command;
+ import com.gemstone.gemfire.internal.cache.tier.InterestType;
+ import com.gemstone.gemfire.internal.cache.tier.MessageType;
+ import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
+ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+ import com.gemstone.gemfire.internal.logging.LogService;
+ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+ import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+ import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
+ import com.gemstone.gemfire.security.GemFireSecurityException;
+ 
+ /**
+  * @author ashahid
+  *
+  */
+ public abstract class BaseCommand implements Command {
+   protected static final Logger logger = LogService.getLogger();
+ 
+   /**
+    * Whether zipped values are being passed to/from the client. Can be modified
+    * using the system property Message.ZIP_VALUES ? This does not appear to
+    * happen anywhere
+    */
+   protected static final boolean zipValues = false;
+ 
+   protected static final boolean APPLY_RETRIES = Boolean
+       .getBoolean("gemfire.gateway.ApplyRetries");
+ 
+   public static final byte[] OK_BYTES = new byte[]{0};  
+ 
+   public static final int maximumChunkSize = Integer.getInteger(
+       "BridgeServer.MAXIMUM_CHUNK_SIZE", 100).intValue();
+ 
+   /** Maximum number of entries in each chunked response chunk */
+ 
+   /** Whether to suppress logging of IOExceptions */
+   private static boolean suppressIOExceptionLogging = Boolean
+       .getBoolean("gemfire.bridge.suppressIOExceptionLogging");
+ 
+   /**
+    * Maximum number of concurrent incoming client message bytes that a bridge
+    * server will allow. Once a server is working on this number additional
+    * incoming client messages will wait until one of them completes or fails.
+    * The bytes are computed based in the size sent in the incoming msg header.
+    */
+   private static final int MAX_INCOMING_DATA = Integer.getInteger(
+       "BridgeServer.MAX_INCOMING_DATA", -1).intValue();
+ 
+   /**
+    * Maximum number of concurrent incoming client messages that a bridge server
+    * will allow. Once a server is working on this number additional incoming
+    * client messages will wait until one of them completes or fails.
+    */
+   private static final int MAX_INCOMING_MSGS = Integer.getInteger(
+       "BridgeServer.MAX_INCOMING_MSGS", -1).intValue();
+ 
+   private static final Semaphore incomingDataLimiter;
+ 
+   private static final Semaphore incomingMsgLimiter;
+   static {
+     Semaphore tmp;
+     if (MAX_INCOMING_DATA > 0) {
+       // backport requires that this is fair since we inc by values > 1
+       tmp = new Semaphore(MAX_INCOMING_DATA, true);
+     }
+     else {
+       tmp = null;
+     }
+     incomingDataLimiter = tmp;
+     if (MAX_INCOMING_MSGS > 0) {
+       tmp = new Semaphore(MAX_INCOMING_MSGS, false); // unfair for best
+       // performance
+     }
+     else {
+       tmp = null;
+     }
+     incomingMsgLimiter = tmp;
+ 
+   }
+ 
+   final public void execute(Message msg, ServerConnection servConn) {
+     // Read the request and update the statistics
+     long start = DistributionStats.getStatTime();
+     //servConn.resetTransientData();
+     if(EntryLogger.isEnabled() && servConn  != null) {
+       EntryLogger.setSource(servConn.getMembershipID(), "c2s");
+     }
+     boolean shouldMasquerade = shouldMasqueradeForTx(msg, servConn);
+     try {
+       if (shouldMasquerade) {
+         GemFireCacheImpl  cache = (GemFireCacheImpl)servConn.getCache();
+         InternalDistributedMember member = (InternalDistributedMember)servConn.getProxyID().getDistributedMember();
+         TXManagerImpl txMgr = cache.getTxManager();
+         TXStateProxy tx = null;
+         try {
+           tx = txMgr.masqueradeAs(msg, member, false);
+           cmdExecute(msg, servConn, start);
+         } finally {
+           txMgr.unmasquerade(tx);
+         }
+       } else {
+         cmdExecute(msg, servConn, start);
+       }
+       
+     }   
++    catch (TransactionException
++        | CopyException
++        | SerializationException
++        | CacheWriterException
++        | CacheLoaderException
++        | GemFireSecurityException
++        | PartitionOfflineException
++        | MessageTooLargeException e) {
++      handleExceptionNoDisconnect(msg, servConn, e);
++    }
+     catch (EOFException eof) {
+       BaseCommand.handleEOFException(msg, servConn, eof);
+       // TODO:Asif: Check if there is any need for explicitly returning
+       return;
+     }
+     catch (InterruptedIOException e) { // Solaris only
+       BaseCommand.handleInterruptedIOException(msg, servConn, e);
 -      return;
+     }
+     catch (IOException e) {
+       BaseCommand.handleIOException(msg, servConn, e);
 -      return;
+     }
+     catch (DistributedSystemDisconnectedException e) {
+       BaseCommand.handleShutdownException(msg, servConn, e);
 -      return;
 -    }
 -    catch (PartitionOfflineException e) { // fix for bug #42225
 -      handleExceptionNoDisconnect(msg, servConn, e);
 -    }
 -    catch (GemFireSecurityException e) {
 -      handleExceptionNoDisconnect(msg, servConn, e);
+     }
 -    catch (CacheLoaderException e) {
 -      handleExceptionNoDisconnect(msg, servConn, e);
 -    }
 -    catch (CacheWriterException e) {
 -      handleExceptionNoDisconnect(msg, servConn, e);
 -    } catch (SerializationException e) {
 -      handleExceptionNoDisconnect(msg, servConn, e);
 -    } catch (CopyException e) {
 -      handleExceptionNoDisconnect(msg, servConn, e);
 -    } catch (TransactionException e) {
 -      handleExceptionNoDisconnect(msg, servConn, e);
 -    }
 -    
+     catch (VirtualMachineError err) {
+       SystemFailure.initiateFailure(err);
+       // If this ever returns, rethrow the error.  We're poisoned
+       // now, so don't let this thread continue.
+       throw err;
+     }
+     catch (Throwable e) {
+       BaseCommand.handleThrowable(msg, servConn, e);
+     } finally {
+       EntryLogger.clearSource();
+     }
 -    /*
 -     * finally { // Keep track of the fact that a message is no longer being //
 -     * processed. servConn.setNotProcessingMessage();
 -     * servConn.clearRequestMsg(); }
 -     */
+   }
+ 
+   /**
+    * checks to see if this thread needs to masquerade as a transactional thread.
+    * clients after GFE_66 should be able to start a transaction.
+    * @param msg
+    * @param servConn
+    * @return true if thread should masquerade as a transactional thread.
+    */
+   protected boolean shouldMasqueradeForTx(Message msg, ServerConnection servConn) {
+     if (servConn.getClientVersion().compareTo(Version.GFE_66) >= 0
+         && msg.getTransactionId() > TXManagerImpl.NOTX) {
+       return true;
+     }
+     return false;
+   }
+   
+   /**
+    * If an operation is retried then some server may have seen it already.
+    * We cannot apply this operation to the cache without knowing whether a
+    * version tag has already been created for it.  Otherwise caches that have
+    * seen the event already will reject it but others will not, but will have
+    * no version tag with which to perform concurrency checks.
+    * <p>The client event should have the event identifier from the client and
+    * the region affected by the operation.
+    * @param clientEvent
+    */
+   public boolean recoverVersionTagForRetriedOperation(EntryEventImpl clientEvent) {
+     LocalRegion r = clientEvent.getRegion();
+     VersionTag tag =  null;
+     if ((clientEvent.getVersionTag() != null) && (clientEvent.getVersionTag().isGatewayTag())) {
+       tag = r.findVersionTagForGatewayEvent(clientEvent.getEventId());
+     }
+     else {
+       tag = r.findVersionTagForClientEvent(clientEvent.getEventId());
+     }
+     if (tag == null) {
+       if (r instanceof DistributedRegion || r instanceof PartitionedRegion) {
+         // TODO this could be optimized for partitioned regions by sending the key
+         // so that the PR could look at an individual bucket for the event
+         tag = FindVersionTagOperation.findVersionTag(r, clientEvent.getEventId(), false);
+       }
+     }
+     if (tag != null) {
+       if (logger.isDebugEnabled()) {
+         logger.debug("recovered version tag {} for replayed operation {}", tag, clientEvent.getEventId());
+       }
+       clientEvent.setVersionTag(tag);
+     }
+     return (tag != null);
+   }
+   
+   /**
+    * If an operation is retried then some server may have seen it already.
+    * We cannot apply this operation to the cache without knowing whether a
+    * version tag has already been created for it.  Otherwise caches that have
+    * seen the event already will reject it but others will not, but will have
+    * no version tag with which to perform concurrency checks.
+    * <p>The client event should have the event identifier from the client and
+    * the region affected by the operation.
+    */
+   protected VersionTag findVersionTagsForRetriedBulkOp(LocalRegion r, EventID eventID) {
+     VersionTag tag = r.findVersionTagForClientBulkOp(eventID);
+     if(tag != null) {
+       if (logger.isDebugEnabled()) {
+         logger.debug("recovered version tag {} for replayed bulk operation {}", tag, eventID);
+       }
+       return tag;
+     }
+     if (r instanceof DistributedRegion || r instanceof PartitionedRegion) {
+       // TODO this could be optimized for partitioned regions by sending the key
+       // so that the PR could look at an individual bucket for the event
+       tag = FindVersionTagOperation.findVersionTag(r, eventID, true);
+     }
+     if (tag != null) {
+       if (logger.isDebugEnabled()) {
+         logger.debug("recovered version tag {} for replayed bulk operation {}", tag, eventID);
+       }
+     }
+     return tag;
+   }
+ 
+   abstract public void cmdExecute(Message msg, ServerConnection servConn,
+       long start) throws IOException, ClassNotFoundException, InterruptedException;
+ 
+   protected void writeReply(Message origMsg, ServerConnection servConn)
+       throws IOException {
+     Message replyMsg = servConn.getReplyMessage();
+     servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
+     replyMsg.setMessageType(MessageType.REPLY);
+     replyMsg.setNumberOfParts(1);
+     replyMsg.setTransactionId(origMsg.getTransactionId());
+     replyMsg.addBytesPart(OK_BYTES);
+     replyMsg.send(servConn);
+     if (logger.isTraceEnabled()) {
+       logger.trace("{}: rpl tx: {}", servConn.getName(), origMsg.getTransactionId());
+     }
+   }
+   protected void writeReplyWithRefreshMetadata(Message origMsg,
+       ServerConnection servConn, PartitionedRegion pr, byte nwHop) throws IOException {
+     Message replyMsg = servConn.getReplyMessage();
+     servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
+     replyMsg.setMessageType(MessageType.REPLY);
+     replyMsg.setNumberOfParts(1);
+     replyMsg.setTransactionId(origMsg.getTransactionId());
+     replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(), nwHop});
+     replyMsg.send(servConn);
+     pr.getPrStats().incPRMetaDataSentCount();
+     if (logger.isTraceEnabled()) {
+       logger.trace("{}: rpl with REFRESH_METADAT tx: {}", servConn.getName(), origMsg.getTransactionId());
+     }
+   }
+ 
+   private static void handleEOFException(Message msg,
+       ServerConnection servConn, Exception eof) {
+     CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+     CacheServerStats stats = servConn.getCacheServerStats();
+     boolean potentialModification = servConn.getPotentialModification();
+     if (!crHelper.isShutdown()) {
+       if (potentialModification) {
+         stats.incAbandonedWriteRequests();
+       }
+       else {
+         stats.incAbandonedReadRequests();
+       }
+       if (!suppressIOExceptionLogging) {
+         if (potentialModification) {
+           int transId = (msg != null) ? msg.getTransactionId()
+               : Integer.MIN_VALUE;
+           logger.warn(LocalizedMessage.create(
+             LocalizedStrings.BaseCommand_0_EOFEXCEPTION_DURING_A_WRITE_OPERATION_ON_REGION__1_KEY_2_MESSAGEID_3,
+             new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(), Integer.valueOf(transId)}));
+         }
+         else {
+           logger.debug("EOF exception", eof);
+           logger.info(LocalizedMessage.create(
+             LocalizedStrings.BaseCommand_0_CONNECTION_DISCONNECT_DETECTED_BY_EOF,
+             servConn.getName()));
+         }
+       }
+     }
+     servConn.setFlagProcessMessagesAsFalse();
+   }
+ 
+   private static void handleInterruptedIOException(Message msg,
+       ServerConnection servConn, Exception e) {
+     CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+     if (!crHelper.isShutdown() && servConn.isOpen()) {
+       if (!suppressIOExceptionLogging) {
+         if (logger.isDebugEnabled())
+           logger.debug("Aborted message due to interrupt: {}", e.getMessage(), e);
+       }
+     }
+     servConn.setFlagProcessMessagesAsFalse();
+   }
+ 
+   private static void handleIOException(Message msg, ServerConnection servConn,
+       Exception e) {
+     CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+     boolean potentialModification = servConn.getPotentialModification();
+ 
+     if (!crHelper.isShutdown() && servConn.isOpen()) {
+       if (!suppressIOExceptionLogging) {
+         if (potentialModification) {
+           int transId = (msg != null) ? msg.getTransactionId()
+               : Integer.MIN_VALUE;
+           logger.warn(LocalizedMessage.create(
+             LocalizedStrings.BaseCommand_0_UNEXPECTED_IOEXCEPTION_DURING_OPERATION_FOR_REGION_1_KEY_2_MESSID_3,
+             new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(), Integer.valueOf(transId)}), e);
+         }
+         else {
+           logger.warn(LocalizedMessage.create(
+             LocalizedStrings.BaseCommand_0_UNEXPECTED_IOEXCEPTION,
+             servConn.getName()), e);
+         }
+       }
+     }
+     servConn.setFlagProcessMessagesAsFalse();
+   }
+ 
+   private static void handleShutdownException(Message msg,
+       ServerConnection servConn, Exception e) {
+     CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+     boolean potentialModification = servConn.getPotentialModification();
+ 
+     if (!crHelper.isShutdown()) {
+       if (potentialModification) {
+         int transId = (msg != null) ? msg.getTransactionId()
+             : Integer.MIN_VALUE;
+         logger.warn(LocalizedMessage.create(
+           LocalizedStrings.BaseCommand_0_UNEXPECTED_SHUTDOWNEXCEPTION_DURING_OPERATION_ON_REGION_1_KEY_2_MESSAGEID_3,
+           new Object[] {servConn.getName(), servConn.getModRegion(), servConn.getModKey(), Integer.valueOf(transId)}), e);
+       }
+       else {
+         logger.warn(LocalizedMessage.create(
+           LocalizedStrings.BaseCommand_0_UNEXPECTED_SHUTDOWNEXCEPTION,
+           servConn.getName()),e);
+         }
+     }
+     servConn.setFlagProcessMessagesAsFalse();
+   }
+ 
+   // Handle GemfireSecurityExceptions separately since the connection should not
+   // be terminated (by setting processMessages to false) unlike in
+   // handleThrowable. Fixes bugs #38384 and #39392.
+ //  private static void handleGemfireSecurityException(Message msg,
+ //      ServerConnection servConn, GemFireSecurityException e) {
+ //
+ //    boolean requiresResponse = servConn.getTransientFlag(REQUIRES_RESPONSE);
+ //    boolean responded = servConn.getTransientFlag(RESPONDED);
+ //    boolean requiresChunkedResponse = servConn
+ //        .getTransientFlag(REQUIRES_CHUNKED_RESPONSE);
+ //    boolean potentialModification = servConn.getPotentialModification();
+ //
+ //    try {
+ //      try {
+ //        if (requiresResponse && !responded) {
+ //          if (requiresChunkedResponse) {
+ //            writeChunkedException(msg, e, false, servConn);
+ //          }
+ //          else {
+ //            writeException(msg, e, false, servConn);
+ //          }
+ //          servConn.setAsTrue(RESPONDED);
+ //        }
+ //      }
+ //      finally { // inner try-finally to ensure proper ordering of logging
+ //        if (potentialModification) {
+ //          int transId = (msg != null) ? msg.getTransactionId()
+ //              : Integer.MIN_VALUE;
+ //        }
+ //      }
+ //    }
+ //    catch (IOException ioe) {
+ //      if (logger.isDebugEnabled()) {
+ //        logger.fine(servConn.getName()
+ //            + ": Unexpected IOException writing security exception: ", ioe);
+ //      }
+ //    }
+ //  }
+ 
+   private static void handleExceptionNoDisconnect(Message msg,
+       ServerConnection servConn, Exception e) {
+     boolean requiresResponse = servConn.getTransientFlag(REQUIRES_RESPONSE);
+     boolean responded = servConn.getTransientFlag(RESPONDED);
+     boolean requiresChunkedResponse = servConn
+         .getTransientFlag(REQUIRES_CHUNKED_RESPONSE);
+     boolean potentialModification = servConn.getPotentialModification();
+     boolean wroteExceptionResponse = false;
+ 
+     try {
+       try {
+         if (requiresResponse && !responded) {
+           if (requiresChunkedResponse) {
+             writeChunkedException(msg, e, false, servConn);
+           }
+           else {
+             writeException(msg, e, false, servConn);
+           }
+           wroteExceptionResponse = true;
+           servConn.setAsTrue(RESPONDED);
+         }
+       }
+       finally { // inner try-finally to ensure proper ordering of logging
+         if (potentialModification) {
+           int transId = (msg != null) ? msg.getTransactionId()
+               : Integer.MIN_VALUE;
+           if (!wroteExceptionResponse) {
+             logger.warn(LocalizedMessage.create(
+                 LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION_DURING_OPERATION_ON_REGION_1_KEY_2_MESSAGEID_3,
+                 new Object[] {servConn.getName(),servConn.getModRegion(), servConn.getModKey(), Integer.valueOf(transId)}), e);
+           } else {
+             if (logger.isDebugEnabled()) {
+               logger.debug("{}: Exception during operation on region: {} key: {} messageId: {}", servConn.getName(),
+                   servConn.getModRegion(), servConn.getModKey(), transId, e);
+             }
+           }
+         }
+         else {
+           if (!wroteExceptionResponse) {
+             logger.warn(LocalizedMessage.create(
+                 LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION,
+                 servConn.getName()), e);
+           } else {
+             if (logger.isDebugEnabled()) {
+               logger.debug("{}: Exception: {}", servConn.getName(), e.getMessage(), e);
+             }
+           }
+         }
+       }
+     }
+     catch (IOException ioe) {
+       if (logger.isDebugEnabled()) {
+         logger.debug("{}: Unexpected IOException writing exception: {}", servConn.getName(), ioe.getMessage(), ioe);
+       }
+     }
+   }
+ 
+   private static void handleThrowable(Message msg, ServerConnection servConn,
+       Throwable th) {
+     boolean requiresResponse = servConn.getTransientFlag(REQUIRES_RESPONSE);
+     boolean responded = servConn.getTransientFlag(RESPONDED);
+     boolean requiresChunkedResponse = servConn
+         .getTransientFlag(REQUIRES_CHUNKED_RESPONSE);
+     boolean potentialModification = servConn.getPotentialModification();
+ 
+     try {
+       try {
+         if (th instanceof Error) {
+           logger.fatal(LocalizedMessage.create(LocalizedStrings.BaseCommand_0_UNEXPECTED_ERROR_ON_SERVER,
+               servConn.getName()), th);
+         }
+         if (requiresResponse && !responded) {
+           if (requiresChunkedResponse) {
+             writeChunkedException(msg, th, false, servConn);
+           }
+           else {
+             writeException(msg, th, false, servConn);
+           }
+           servConn.setAsTrue(RESPONDED);
+         }
+       }
+       finally { // inner try-finally to ensure proper ordering of logging
+         if (th instanceof Error) {
+           // log nothing
+         } else if (th instanceof CancelException) {
+           // log nothing
+         } else {
+           if (potentialModification) {
+             int transId = (msg != null) ? msg.getTransactionId()
+                 : Integer.MIN_VALUE;
+             logger.warn(LocalizedMessage.create(
+               LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION_DURING_OPERATION_ON_REGION_1_KEY_2_MESSAGEID_3,
+               new Object[] {servConn.getName(),servConn.getModRegion(), servConn.getModKey(), Integer.valueOf(transId)}), th);
+           }
+           else {
+             logger.warn(LocalizedMessage.create(
+               LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION,
+               servConn.getName()), th);
+           }
+         }
+       }
+     } catch (IOException ioe) {
+       if (logger.isDebugEnabled()) {
+         logger.debug("{}: Unexpected IOException writing exception: {}", servConn.getName(), ioe.getMessage(), ioe);
+       }
+     } finally {
+       servConn.setFlagProcessMessagesAsFalse();
+     }
+   }
+   
+   protected static void writeChunkedException(Message origMsg, Throwable e,
+       boolean isSevere, ServerConnection servConn) throws IOException {
+     writeChunkedException(origMsg, e, isSevere, servConn, servConn.getChunkedResponseMessage());
+   }
+ 
+   protected static void writeChunkedException(Message origMsg, Throwable e,
+       boolean isSevere, ServerConnection servConn, ChunkedMessage originalReponse) throws IOException {
+     writeChunkedException(origMsg, e, isSevere, servConn, originalReponse, 2);
+   }
+ 
+   protected static void writeChunkedException(Message origMsg, Throwable e,
+       boolean isSevere, ServerConnection servConn, ChunkedMessage originalReponse, int numOfParts) throws IOException {
+     ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+     chunkedResponseMsg.setServerConnection(servConn);
+     if (originalReponse.headerHasBeenSent()) {
+       //chunkedResponseMsg = originalReponse;
+       // fix for bug 35442
+       chunkedResponseMsg.setNumberOfParts(numOfParts);
+       chunkedResponseMsg.setLastChunkAndNumParts(true, numOfParts);
+       chunkedResponseMsg.addObjPart(e); 
+       if (numOfParts == 2) {
+         chunkedResponseMsg.addStringPart(getExceptionTrace(e));
+       }
+       if (logger.isDebugEnabled()) {
+         logger.debug("{}: Sending exception chunk while reply in progress: {}", servConn.getName(), e.getMessage(), e);
+       }
+     }
+     else {
+       chunkedResponseMsg.setMessageType(MessageType.EXCEPTION);
+       chunkedResponseMsg.setNumberOfParts(numOfParts);
+       chunkedResponseMsg.setLastChunkAndNumParts(true, numOfParts);
+       chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
+       chunkedResponseMsg.sendHeader();
+       chunkedResponseMsg.addObjPart(e);
+       if (numOfParts == 2) {
+         chunkedResponseMsg.addStringPart(getExceptionTrace(e));
+       }
+       if (logger.isDebugEnabled()) {
+         logger.debug("{}: Sending exception chunk: {}", servConn.getName(), e.getMessage(), e);
+       }
+     }
+     chunkedResponseMsg.sendChunk(servConn);
+   }
+ 
+   // Get the exception stacktrace for native clients
+   public static String getExceptionTrace(Throwable ex) {
+     StringWriter sw = new StringWriter();
+     PrintWriter pw = new PrintWriter(sw);
+     ex.printStackTrace(pw);
+     pw.close();
+     return sw.toString();
+   }
+ 
+   protected static void writeException(Message origMsg, Throwable e,
+       boolean isSevere, ServerConnection servConn) throws IOException {
+     writeException(origMsg, MessageType.EXCEPTION, e, isSevere, servConn);
+   }
+ 
+   protected static void writeException(Message origMsg, int msgType, Throwable e,
+       boolean isSevere, ServerConnection servConn) throws IOException {
+     Message errorMsg = servConn.getErrorResponseMessage();
+     errorMsg.setMessageType(msgType);
+     errorMsg.setNumberOfParts(2);
+     errorMsg.setTransactionId(origMsg.getTransactionId());
+     if (isSevere) {
+       String msg = e.getMessage();
+       if (msg == null) {
+         msg = e.toString();
+       }
+       logger.fatal(LocalizedMessage.create(LocalizedStrings.BaseCommand_SEVERE_CACHE_EXCEPTION_0, msg));
+     }
+     errorMsg.addObjPart(e);
+     errorMsg.addStringPart(getExceptionTrace(e));
+     errorMsg.send(servConn);
+     if (logger.isDebugEnabled()) {
+       logger.debug("{}: Wrote exception: {}", servConn.getName(), e.getMessage(), e);
+     }
++    if (e instanceof MessageTooLargeException) {
++      throw (IOException)e;
++    }
+   }
+ 
+   protected static void writeErrorResponse(Message origMsg, int messageType,
+       ServerConnection servConn) throws IOException {
+     Message errorMsg = servConn.getErrorResponseMessage();
+     errorMsg.setMessageType(messageType);
+     errorMsg.setNumberOfParts(1);
+     errorMsg.setTransactionId(origMsg.getTransactionId());
+     errorMsg
+         .addStringPart(LocalizedStrings.BaseCommand_INVALID_DATA_RECEIVED_PLEASE_SEE_THE_CACHE_SERVER_LOG_FILE_FOR_ADDITIONAL_DETAILS.toLocalizedString());
+     errorMsg.send(servConn);
+   }
+ 
+   protected static void writeErrorResponse(Message origMsg, int messageType,
+       String msg, ServerConnection servConn) throws IOException {
+     Message errorMsg = servConn.getErrorResponseMessage();
+     errorMsg.setMessageType(messageType);
+     errorMsg.setNumberOfParts(1);
+     errorMsg.setTransactionId(origMsg.getTransactionId());
+     errorMsg.addStringPart(msg);
+     errorMsg.send(servConn);
+   }
+ 
+   protected static void writeRegionDestroyedEx(Message msg, String regionName,
+       String title, ServerConnection servConn) throws IOException {
+     String reason = servConn.getName() + ": Region named " + regionName + title;
+     RegionDestroyedException ex = new RegionDestroyedException(reason,
+         regionName);
+     if (servConn.getTransientFlag(REQUIRES_CHUNKED_RESPONSE)) {
+       writeChunkedException(msg, ex, false, servConn);
+     }
+     else {
+       writeException(msg, ex, false, servConn);
+     }
+   }
+ 
+   protected static void writeResponse(Object data, Object callbackArg,
+       Message origMsg, boolean isObject, ServerConnection servConn)
+       throws IOException {
+     Message responseMsg = servConn.getResponseMessage();
+     responseMsg.setMessageType(MessageType.RESPONSE);
+     responseMsg.setTransactionId(origMsg.getTransactionId());
+ 
+     
+     if (callbackArg == null) {
+       responseMsg.setNumberOfParts(1);
+     }
+     else {
+       responseMsg.setNumberOfParts(2);
+     }
+     if (data instanceof byte[]) {
+       responseMsg.addRawPart((byte[])data, isObject);
+     }
+     else {
+       Assert.assertTrue(isObject,
+           "isObject should be true when value is not a byte[]");
+       responseMsg.addObjPart(data, zipValues);
+     }
+     if (callbackArg != null) {
+       responseMsg.addObjPart(callbackArg);
+     }
+     servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
+     responseMsg.send(servConn);
+     origMsg.clearParts();
+   }
+   
+   protected static void writeResponseWithRefreshMetadata(Object data,
+       Object callbackArg, Message origMsg, boolean isObject,
+       ServerConnection servConn, PartitionedRegion pr, byte nwHop) throws IOException {
+     Message responseMsg = servConn.getResponseMessage();
+     responseMsg.setMessageType(MessageType.RESPONSE);
+     responseMsg.setTransactionId(origMsg.getTransactionId());
+ 
+     if (callbackArg == null) {
+       responseMsg.setNumberOfParts(2);
+     }
+     else {
+       responseMsg.setNumberOfParts(3);
+     }
+ 
+     if (data instanceof byte[]) {
+       responseMsg.addRawPart((byte[])data, isObject);
+     }
+     else {
+       Assert.assertTrue(isObject,
+           "isObject should be true when value is not a byte[]");
+       responseMsg.addObjPart(data, zipValues);
+     }
+     if (callbackArg != null) {
+       responseMsg.addObjPart(callbackArg);
+     }
+     responseMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(),nwHop});
+     servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
+     responseMsg.send(servConn);
+     origMsg.clearParts();
+   }
+ 
+   protected static void writeResponseWithFunctionAttribute(byte[] data,
+       Message origMsg, ServerConnection servConn) throws IOException {
+     Message responseMsg = servConn.getResponseMessage();
+     responseMsg.setMessageType(MessageType.RESPONSE);
+     responseMsg.setTransactionId(origMsg.getTransactionId());
+     responseMsg.setNumberOfParts(1);
+     responseMsg.addBytesPart(data);
+     servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
+     responseMsg.send(servConn);
+     origMsg.clearParts();
+   }
+   
+   static protected void checkForInterrupt(ServerConnection servConn, Exception e) 
+       throws InterruptedException, InterruptedIOException {
+     servConn.getCachedRegionHelper().checkCancelInProgress(e);
+     if (e instanceof InterruptedException) {
+       throw (InterruptedException)e;
+     }
+     if (e instanceof InterruptedIOException) {
+       throw (InterruptedIOException)e;
+     }
+   }
+ 
+   protected static void writeQueryResponseChunk(Object queryResponseChunk,
+       CollectionType collectionType, boolean lastChunk,
+       ServerConnection servConn) throws IOException {
+     ChunkedMessage queryResponseMsg = servConn.getQueryResponseMessage();
+     queryResponseMsg.setNumberOfParts(2);
+     queryResponseMsg.setLastChunk(lastChunk);
+     queryResponseMsg.addObjPart(collectionType, zipValues);
+     queryResponseMsg.addObjPart(queryResponseChunk, zipValues);
+     queryResponseMsg.sendChunk(servConn);
+   }
+ 
+   protected static void writeQueryResponseException(Message origMsg,
+       Throwable e, boolean isSevere, ServerConnection servConn)
+       throws IOException {
+     ChunkedMessage queryResponseMsg = servConn.getQueryResponseMessage();
+     ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+     if (queryResponseMsg.headerHasBeenSent()) {
+       // fix for bug 35442
+       // This client is expecting 2 parts in this message so send 2 parts
+       queryResponseMsg.setServerConnection(servConn);
+       queryResponseMsg.setNumberOfParts(2);
+       queryResponseMsg.setLastChunkAndNumParts(true, 2);
+       queryResponseMsg.addObjPart(e);
+       queryResponseMsg.addStringPart(getExceptionTrace(e));
+       if (logger.isDebugEnabled()) {
+         logger.debug("{}: Sending exception chunk while reply in progress: {}", servConn.getName(), e.getMessage(), e);
+       }
+       queryResponseMsg.sendChunk(servConn);
+     }
+     else {
+       chunkedResponseMsg.setServerConnection(servConn);
+       chunkedResponseMsg.setMessageType(MessageType.EXCEPTION);
+       chunkedResponseMsg.setNumberOfParts(2);
+       chunkedResponseMsg.setLastChunkAndNumParts(true, 2);
+       chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
+       chunkedResponseMsg.sendHeader();
+       chunkedResponseMsg.addObjPart(e);
+       chunkedResponseMsg.addStringPart(getExceptionTrace(e));
+       if (logger.isDebugEnabled()) {
+         logger.debug("{}: Sending exception chunk: {}", servConn.getName(), e.getMessage(), e);
+       }
+       chunkedResponseMsg.sendChunk(servConn);
+     }
+   }
+ 
+   protected static void writeChunkedErrorResponse(Message origMsg,
+       int messageType, String message, ServerConnection servConn)
+       throws IOException {
+     // Send chunked response header identifying error message
+     ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+     if (logger.isDebugEnabled()) {
+       logger.debug(servConn.getName() + ": Sending error message header type: "
+           + messageType + " transaction: " + origMsg.getTransactionId());
+     }
+     chunkedResponseMsg.setMessageType(messageType);
+     chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
+     chunkedResponseMsg.sendHeader();
+ 
+     // Send actual error
+     if (logger.isDebugEnabled()) {
+       logger.debug("{}: Sending error message chunk: {}", servConn.getName(), message);
+     }
+     chunkedResponseMsg.setNumberOfParts(1);
+     chunkedResponseMsg.setLastChunk(true);
+     chunkedResponseMsg.addStringPart(message);
+     chunkedResponseMsg.sendChunk(servConn);
+   }
+   
+   protected static void writeFunctionResponseException(Message origMsg,
+       int messageType, String message, ServerConnection servConn, Throwable e)
+       throws IOException {
+     ChunkedMessage functionResponseMsg = servConn.getFunctionResponseMessage();
+     ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+     if (functionResponseMsg.headerHasBeenSent()) {
+       functionResponseMsg.setServerConnection(servConn);
+       functionResponseMsg.setNumberOfParts(2);
+       functionResponseMsg.setLastChunkAndNumParts(true,2);
+       functionResponseMsg.addObjPart(e);
+       functionResponseMsg.addStringPart(getExceptionTrace(e));
+       if (logger.isDebugEnabled()) {
+         logger.debug("{}: Sending exception chunk while reply in progress: {}", servConn.getName(), e.getMessage(), e);
+       }
+       functionResponseMsg.sendChunk(servConn);
+     }
+     else {
+       chunkedResponseMsg.setServerConnection(servConn);
+       chunkedResponseMsg.setMessageType(messageType);
+       chunkedResponseMsg.setNumberOfParts(2);
+       chunkedResponseMsg.setLastChunkAndNumParts(true,2);
+       chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
+       chunkedResponseMsg.sendHeader();
+       chunkedResponseMsg.addObjPart(e);
+       chunkedResponseMsg.addStringPart(getExceptionTrace(e));
+       if (logger.isDebugEnabled()) {
+         logger.debug("{}: Sending exception chunk: {}", servConn.getName(), e.getMessage(), e);
+       }
+       chunkedResponseMsg.sendChunk(servConn);
+     }
+   }
+   
+   protected static void writeFunctionResponseError(Message origMsg,
+       int messageType, String message, ServerConnection servConn)
+       throws IOException {
+     ChunkedMessage functionResponseMsg = servConn.getFunctionResponseMessage();
+     ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+     if (functionResponseMsg.headerHasBeenSent()) {
+       functionResponseMsg.setNumberOfParts(1);
+       functionResponseMsg.setLastChunk(true);
+       functionResponseMsg.addStringPart(message);
+       if (logger.isDebugEnabled()) {
+         logger.debug("{}: Sending Error chunk while reply in progress: {}", servConn.getName(), message);
+       }
+       functionResponseMsg.sendChunk(servConn);
+     }
+     else {
+       chunkedResponseMsg.setMessageType(messageType);
+       chunkedResponseMsg.setNumberOfParts(1);
+       chunkedResponseMsg.setLastChunk(true);
+       chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
+       chunkedResponseMsg.sendHeader();
+       chunkedResponseMsg.addStringPart(message);
+       if (logger.isDebugEnabled()) {
+         logger.debug("{}: Sending Error chunk: {}", servConn.getName(), message);
+       }
+       chunkedResponseMsg.sendChunk(servConn);
+     }
+   }
+ 
+   protected static void writeKeySetErrorResponse(Message origMsg,
+       int messageType, String message, ServerConnection servConn)
+       throws IOException {
+     // Send chunked response header identifying error message
+     ChunkedMessage chunkedResponseMsg = servConn.getKeySetResponseMessage();
+     if (logger.isDebugEnabled()) {
+       logger.debug("{}: Sending error message header type: {} transaction: {}",
+           servConn.getName(), messageType, origMsg.getTransactionId());
+     }
+     chunkedResponseMsg.setMessageType(messageType);
+     chunkedResponseMsg.setTransactionId(origMsg.getTransactionId());
+     chunkedResponseMsg.sendHeader();
+     // Send actual error
+     if (logger.isDebugEnabled()) {
+       logger.debug("{}: Sending error message chunk: {}", servConn.getName(), message);
+     }
+     chunkedResponseMsg.setNumberOfParts(1);
+     chunkedResponseMsg.setLastChunk(true);
+     chunkedResponseMsg.addStringPart(message);
+     chunkedResponseMsg.sendChunk(servConn);
+   }
+   
+   static Message readRequest(ServerConnection servConn) {
+     Message requestMsg = null;
+     try {
+       requestMsg = servConn.getRequestMessage();
+       requestMsg.recv(servConn, MAX_INCOMING_DATA, incomingDataLimiter,
+           incomingMsgLimiter);
+       return requestMsg;
+     }
+     catch (EOFException eof) {
+       handleEOFException(null, servConn, eof);
+       // TODO:Asif: Check if there is any need for explicitly returning
+ 
+     }
+     catch (InterruptedIOException e) { // Solaris only
+       handleInterruptedIOException(null, servConn, e);
+ 
+     }
+     catch (IOException e) {
+       handleIOException(null, servConn, e);
+ 
+     }
+     catch (DistributedSystemDisconnectedException e) {
+       handleShutdownException(null, servConn, e);
+ 
+     }
+     catch (VirtualMachineError err) {
+       SystemFailure.initiateFailure(err);
+       // If this ever returns, rethrow the error.  We're poisoned
+       // now, so don't let this thread continue.
+       throw err;
+     }
+     catch (Throwable e) {
+       SystemFailure.checkFailure();
+       handleThrowable(null, servConn, e);
+     }
+     return requestMsg;
+   }
+ 
+   protected static void fillAndSendRegisterInterestResponseChunks(
+       LocalRegion region, Object riKey, int interestType,
+       InterestResultPolicy policy, ServerConnection servConn)
+       throws IOException {
+     fillAndSendRegisterInterestResponseChunks(region, riKey, interestType,
+         false, policy, servConn);
+   }
+ 
+   /*
+    * serializeValues is unused for clients < GFE_80
+    */
+   protected static void fillAndSendRegisterInterestResponseChunks(
+       LocalRegion region, Object riKey, int interestType, boolean serializeValues,
+       InterestResultPolicy policy, ServerConnection servConn)
+       throws IOException {
+     // Client is not interested.
+     if (policy.isNone()) {
+       sendRegisterInterestResponseChunk(region, riKey, new ArrayList(), true,
+           servConn);
+       return;
+     }
+     if (policy.isKeysValues()
+         && servConn.getClientVersion().compareTo(Version.GFE_80) >= 0) {
+         handleKeysValuesPolicy(region, riKey, interestType, serializeValues, servConn);
+         return;
+     }
+     if (riKey instanceof List) {
+       handleList(region, (List)riKey, policy, servConn);
+       return;
+     }
+     if (!(riKey instanceof String)) {
+       handleSingleton(region, riKey, policy, servConn);
+       return;
+     }
+ 
+     switch (interestType) {
+     case InterestType.OQL_QUERY:
+       // Not supported yet
+       throw new InternalGemFireError(LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString());
+     case InterestType.FILTER_CLASS:
+       throw new InternalGemFireError(LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString());
+       // handleFilter(region, (String)riKey, policy);
+       // break;
+     case InterestType.REGULAR_EXPRESSION: {
+       String regEx = (String)riKey;
+       if (regEx.equals(".*")) {
+         handleAllKeys(region, policy, servConn);
+       }
+       else {
+         handleRegEx(region, regEx, policy, servConn);
+       }
+     }
+       break;
+     case InterestType.KEY:
+       if (riKey.equals("ALL_KEYS")) {
+         handleAllKeys(region, policy, servConn);
+       }
+       else {
+         handleSingleton(region, riKey, policy, servConn);
+       }
+       break;
+     default:
+       throw new InternalGemFireError(LocalizedStrings.BaseCommand_UNKNOWN_INTEREST_TYPE.toLocalizedString());
+     }
+   }
+ 
+   @SuppressWarnings("rawtypes")
+   private static void handleKeysValuesPolicy(LocalRegion region, Object riKey,
+       int interestType, boolean serializeValues, ServerConnection servConn)
+       throws IOException {
+     if (riKey instanceof List) {
+       handleKVList(region, (List)riKey, serializeValues, servConn);
+       return;
+     }
+     if (!(riKey instanceof String)) {
+       handleKVSingleton(region, riKey, serializeValues, servConn);
+       return;
+     }
+ 
+     switch (interestType) {
+     case InterestType.OQL_QUERY:
+       throw new InternalGemFireError(LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString());
+     case InterestType.FILTER_CLASS:
+       throw new InternalGemFireError(LocalizedStrings.BaseCommand_NOT_YET_SUPPORTED.toLocalizedString());
+     case InterestType.REGULAR_EXPRESSION:
+       String regEx = (String)riKey;
+       if (regEx.equals(".*")) {
+         handleKVAllKeys(region, null, serializeValues, servConn);
+       } else {
+         handleKVAllKeys(region, regEx, serializeValues, servConn);
+       }
+       break;
+     case InterestType.KEY:
+       if (riKey.equals("ALL_KEYS")) {
+         handleKVAllKeys(region, null, serializeValues, servConn);
+       } else {
+         handleKVSingleton(region, riKey, serializeValues, servConn);
+       }
+       break;
+     default:
+       throw new InternalGemFireError(LocalizedStrings.BaseCommand_UNKNOWN_INTEREST_TYPE.toLocalizedString());
+     }
+   }
+ 
+   /**
+    * @param list
+    *                is a List of entry keys
+    */
+   protected static void sendRegisterInterestResponseChunk(Region region,
+       Object riKey, ArrayList list, boolean lastChunk, ServerConnection servConn)
+       throws IOException {
+     ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage();
+     chunkedResponseMsg.setNumberOfParts(1);
+     chunkedResponseMsg.setLastChunk(lastChunk);
+     chunkedResponseMsg.addObjPart(list, zipValues);
+     String regionName = (region == null) ? " null " : region.getFullPath();
+     if (logger.isDebugEnabled()) {
+       String str = servConn.getName() + ": Sending"
+           + (lastChunk ? " last " : " ")
+           + "register interest response chunk for region: " + regionName
+           + " for keys: " + riKey + " chunk=<" + chunkedResponseMsg + ">";
+       logger.debug(str);
+     }
+ 
+     chunkedResponseMsg.sendChunk(servConn);
+   }
+   
+   /**
+    * Determines whether keys for destroyed entries (tombstones) should be sent
+    * to clients in register-interest results.
+    * 
+    * @param servConn
+    * @param policy
+    * @return true if tombstones should be sent to the client
+    */
+   private static boolean sendTombstonesInRIResults(ServerConnection servConn, InterestResultPolicy policy) {
+     return (policy == InterestResultPolicy.KEYS_VALUES)
+          && (servConn.getClientVersion().compareTo(Version.GFE_80) >= 0);
+   }
+ 
+   /**
+    * Process an interest request involving a list of keys
+    *
+    * @param region
+    *                the region
+    * @param keyList
+    *                the list of keys
+    * @param policy
+    *                the policy
+    * @throws IOException
+    */
+   private static void handleList(LocalRegion region, List keyList,
+       InterestResultPolicy policy, ServerConnection servConn)
+       throws IOException {
+     if (region instanceof PartitionedRegion) {
+       // too bad java doesn't provide another way to do this...
+       handleListPR((PartitionedRegion)region, keyList, policy, servConn);
+       return;
+     }
+     ArrayList newKeyList = new ArrayList(maximumChunkSize);
+     // Handle list of keys
+     if (region != null) {
+       for (Iterator it = keyList.iterator(); it.hasNext();) {
+         Object entryKey = it.next();
+         if (region.containsKey(entryKey)
+             || (sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey))) {
+           
+           appendInterestResponseKey(region, keyList, entryKey, newKeyList,
+               "list", servConn);
+         }
+       }
+     }
+     // Send the last chunk (the only chunk for individual and list keys)
+     // always send it back, even if the list is of zero size.
+     sendRegisterInterestResponseChunk(region, keyList, newKeyList, true,
+         servConn);
+   }
+ 
+   /**
+    * Handles both RR and PR cases
+    */
+   @SuppressWarnings("rawtypes")
+   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_PARAM_DEREF", justification="Null value handled in sendNewRegisterInterestResponseChunk()")
+   private static void handleKVSingleton(LocalRegion region, Object entryKey,
+       boolean serializeValues, ServerConnection servConn)
+       throws IOException {
+     VersionedObjectList values = new VersionedObjectList(maximumChunkSize,
+         true, region == null ? true : region.getAttributes()
+             .getConcurrencyChecksEnabled(), serializeValues);
+ 
+     if (region != null) {
+       if (region.containsKey(entryKey) || region.containsTombstone(entryKey)) {
+         EntryEventImpl versionHolder = EntryEventImpl.createVersionTagHolder();
+         ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
+         // From Get70.getValueAndIsObject()
+         Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true, false);
+         VersionTag vt = versionHolder.getVersionTag();
+ 
+         updateValues(values, entryKey, data, vt);
+       }
+     }
+     // Send the last chunk (the only chunk for individual and list keys)
+     // always send it back, even if the list is of zero size.
+     sendNewRegisterInterestResponseChunk(region, entryKey, values, true, servConn);
+   }
+ 
+   /**
+    * Process an interest request consisting of a single key
+    *
+    * @param region
+    *                the region
+    * @param entryKey
+    *                the key
+    * @param policy
+    *                the policy
+    * @throws IOException
+    */
+   private static void handleSingleton(LocalRegion region, Object entryKey,
+       InterestResultPolicy policy, ServerConnection servConn)
+       throws IOException {
+     ArrayList keyList = new ArrayList(1);
+     if (region != null) {
+       if (region.containsKey(entryKey) ||
+           (sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey))) {
+         appendInterestResponseKey(region, entryKey, entryKey, keyList,
+             "individual", servConn);
+       }
+     }
+     // Send the last chunk (the only chunk for individual and list keys)
+     // always send it back, even if the list is of zero size.
+     sendRegisterInterestResponseChunk(region, entryKey, keyList, true, servConn);
+   }
+ 
+   /**
+    * Process an interest request of type ALL_KEYS
+    *
+    * @param region
+    *                the region
+    * @param policy
+    *                the policy
+    * @throws IOException
+    */
+   private static void handleAllKeys(LocalRegion region,
+       InterestResultPolicy policy, ServerConnection servConn)
+       throws IOException {
+     ArrayList keyList = new ArrayList(maximumChunkSize);
+     if (region != null) {
+       for (Iterator it = region.keySet(sendTombstonesInRIResults(servConn, policy)).iterator(); it.hasNext();) {
+         appendInterestResponseKey(region, "ALL_KEYS", it.next(), keyList,
+             "ALL_KEYS", servConn);
+       }
+     }
+     // Send the last chunk (the only chunk for individual and list keys)
+     // always send it back, even if the list is of zero size.
+     sendRegisterInterestResponseChunk(region, "ALL_KEYS", keyList, true,
+         servConn);
+   }
+ 
+   /**
+    * @param region
+    * @param regex
+    * @param serializeValues
+    * @param servConn
+    * @throws IOException
+    */
+   private static void handleKVAllKeys(LocalRegion region, String regex,
+       boolean serializeValues, ServerConnection servConn) throws IOException {
+ 
+     if (region != null && region instanceof PartitionedRegion) {
+       handleKVKeysPR((PartitionedRegion) region, regex, serializeValues, servConn);
+       return;
+     }
+ 
+     VersionedObjectList values = new VersionedObjectList(maximumChunkSize,
+         true, region == null ? true : region.getAttributes()
+             .getConcurrencyChecksEnabled(), serializeValues);
+ 
+     if (region != null) {
+ 
+       VersionTag versionTag = null;
+       Object data = null;
+ 
+       Pattern keyPattern = null;
+       if (regex != null) {
+         keyPattern = Pattern.compile(regex);
+       }
+ 
+       for (Object key : region.keySet(true)) {
+         EntryEventImpl versionHolder = EntryEventImpl.createVersionTagHolder();
+         if (keyPattern != null) {
+           if (!(key instanceof String)) {
+             // key is not a String, cannot apply regex to this entry
+             continue;
+           }
+           if (!keyPattern.matcher((String) key).matches()) {
+             // key does not match the regex, this entry should not be
+             // returned.
+             continue;
+           }
+         }
+ 
+         ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
+         data = region.get(key, null, true, true, true, id, versionHolder, true, false);
+         versionTag = versionHolder.getVersionTag();
+         updateValues(values, key, data, versionTag);
+ 
+         if (values.size() == maximumChunkSize) {
+           sendNewRegisterInterestResponseChunk(region, regex != null ? regex : "ALL_KEYS", values, false, servConn);
+           values.clear();
+         }
+       } // for
+     } // if
+ 
+     // Send the last chunk (the only chunk for individual and list keys)
+     // always send it back, even if the list is of zero size.
+     sendNewRegisterInterestResponseChunk(region, regex != null ? regex : "ALL_KEYS", values, true, servConn);
+   }
+ 
+   private static void handleKVKeysPR(PartitionedRegion region, Object keyInfo,
+       boolean serializeValues, ServerConnection servConn) throws IOException {
+     int id = 0;
+     HashMap<Integer, HashSet> bucketKeys = null;
+ 
+     VersionedObjectList values = new VersionedObjectList(maximumChunkSize,
+         true, region.getConcurrencyChecksEnabled(), serializeValues);
+ 
+     if (keyInfo != null && keyInfo instanceof List) {
+       bucketKeys = new HashMap<Integer, HashSet>();
+       for (Object key : (List) keyInfo) {
+         id = PartitionedRegionHelper.getHashKey(region, null, key, null, null);
+         if (bucketKeys.containsKey(id)) {
+           bucketKeys.get(id).add(key);
+         } else {
+           HashSet<Object> keys = new HashSet<Object>();
+           keys.add(key);
+           bucketKeys.put(id, keys);
+         }
+       }
+       region.fetchEntries(bucketKeys, values, servConn);
+     } else { // keyInfo is a String
+       region.fetchEntries((String)keyInfo, values, servConn);
+     }
+ 
+     // Send the last chunk (the only chunk for individual and list keys)
+     // always send it back, even if the list is of zero size.
+     sendNewRegisterInterestResponseChunk(region, keyInfo != null ? keyInfo : "ALL_KEYS", values, true, servConn);
+   }
+ 
+   /**
+    * Copied from Get70.getValueAndIsObject(), except a minor change. (Make the
+    * method static instead of copying it here?)
+    * 
+    * @param value
+    */
+   private static void updateValues(VersionedObjectList values, Object key, Object value, VersionTag versionTag) {
+     boolean isObject = true;
+ 
+     // If the value in the VM is a CachedDeserializable,
+     // get its value. If it is Token.REMOVED, Token.DESTROYED,
+     // Token.INVALID, or Token.LOCAL_INVALID
+     // set it to null. If it is NOT_AVAILABLE, get the value from
+     // disk. If it is already a byte[], set isObject to false.
+     boolean wasInvalid = false;
+     if (value instanceof CachedDeserializable) {
+       value = ((CachedDeserializable)value).getValue();
+     }
+     else if (value == Token.REMOVED_PHASE1 || value == Token.REMOVED_PHASE2 || value == Token.DESTROYED || value == Token.TOMBSTONE) {
+       value = null;
+     }
+     else if (value == Token.INVALID || value == Token.LOCAL_INVALID) {
+       value = null; // fix for bug 35884
+       wasInvalid = true;
+     }
+     else if (value instanceof byte[]) {
+       isObject = false;
+     }
+     boolean keyNotPresent = !wasInvalid && (value == null || value == Token.TOMBSTONE);
+ 
+     if (keyNotPresent) {
+       values.addObjectPartForAbsentKey(key, value, versionTag);
+     } else {
+       values.addObjectPart(key, value, isObject, versionTag);
+     }
+   }
+ 
+   public static void appendNewRegisterInterestResponseChunkFromLocal(LocalRegion region,
+       VersionedObjectList values, Object riKeys, Set keySet, ServerConnection servConn)
+       throws IOException {
+     Object key = null;
+     EntryEventImpl versionHolder = null;
+     ClientProxyMembershipID requestingClient = servConn == null ? null : servConn.getProxyID();
+     for (Iterator it = keySet.iterator(); it.hasNext();) {
+       key = it.next();
+       versionHolder = EntryEventImpl.createVersionTagHolder();
+ 
+       Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true, false);
+       
+       updateValues(values, key, value, versionHolder.getVersionTag());
+ 
+       if (values.size() == maximumChunkSize) {
+         // Send the chunk and clear the list
+         // values.setKeys(null); // Now we need to send keys too.
+         sendNewRegisterInterestResponseChunk(region, riKeys != null ? riKeys : "ALL_KEYS", values, false, servConn);
+         values.clear();
+       }
+     } // for
+   }
+ 
+   /**
+    * 
+    * @param region
+    * @param values {@link VersionedObjectList}
+    * @param riKeys
+    * @param set set of entries
+    * @param servConn
+    * @throws IOException
+    */
+   public static void appendNewRegisterInterestResponseChunk(LocalRegion region,
+       VersionedObjectList values, Object riKeys, Set set, ServerConnection servConn)
+       throws IOException {
+     for (Iterator<Map.Entry> it = set.iterator(); it.hasNext();) {
+       Map.Entry entry = it.next(); // Region.Entry or Map.Entry
+       if (entry instanceof Region.Entry) { // local entries
+         VersionTag vt = null;
+         Object key = null;
+         Object value = null;
+         if (entry instanceof EntrySnapshot) {
+           vt = ((EntrySnapshot) entry).getVersionTag();
+           key = ((EntrySnapshot) entry).getRegionEntry().getKey();
+           value = ((EntrySnapshot) entry).getRegionEntry().getValue(null);
+           updateValues(values, key, value, vt);
+         } else {
+           VersionStamp vs = ((NonTXEntry)entry).getRegionEntry().getVersionStamp();
+           vt = vs == null ? null : vs.asVersionTag();
+           key = entry.getKey();
+           value = ((NonTXEntry)entry).getRegionEntry()._getValueRetain(region, true);
+           try {
+             updateValues(values, key, value, vt);
+           } finally {
+             // TODO OFFHEAP: in the future we might want to delay this release
+             // until the "values" VersionedObjectList is released.
+             // But for now "updateValues" copies the off-heap value to the heap.
+             OffHeapHelper.release(value);
+           }
+         }
+       } else { // Map.Entry (remote entries)
+         ArrayList list = (ArrayList)entry.getValue();
+         Object value = list.get(0);
+         VersionTag tag = (VersionTag)list.get(1);
+         updateValues(values, entry.getKey(), value, tag);
+       }
+       if (values.size() == maximumChunkSize) {
+         // Send the chunk and clear the list
+         // values.setKeys(null); // Now we need to send keys too.
+         sendNewRegisterInterestResponseChunk(region, riKeys != null ? riKeys : "ALL_KEYS", values, false, servConn);
+         values.clear();
+       }
+     } // for
+   }
+ 
+   public static void sendNewRegisterInterestResponseChunk(LocalRegion region,
+       Object riKey, VersionedObjectList list, boolean lastChunk, ServerConnection servConn)
+       throws IOException {
+     ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage();
+     chunkedResponseMsg.setNumberOfParts(1);
+     chunkedResponseMsg.setLastChunk(lastChunk);
+     chunkedResponseMsg.addObjPart(list, zipValues);
+     String regionName = (region == null) ? " null " : region.getFullPath();
+     if (logger.isDebugEnabled()) {
+       String str = servConn.getName() + ": Sending"
+           + (lastChunk ? " last " : " ")
+           + "register interest response chunk for region: " + regionName
+           + " for keys: " + riKey + " chunk=<" + chunkedResponseMsg + ">";
+       logger.debug(str);
+     }
+ 
+     chunkedResponseMsg.sendChunk(servConn);
+   }
+ 
+   /**
+    * Process an interest request of type {@link InterestType#REGULAR_EXPRESSION}
+    *
+    * @param region
+    *                the region
+    * @param regex
+    *                the regex
+    * @param policy
+    *                the policy
+    * @throws IOException
+    */
+   private static void handleRegEx(LocalRegion region, String regex,
+       InterestResultPolicy policy, ServerConnection servConn)
+       throws IOException {
+     if (region instanceof PartitionedRegion) {
+       // too bad java doesn't provide another way to do this...
+       handleRegExPR((PartitionedRegion)region, regex, policy, servConn);
+       return;
+     }
+     ArrayList keyList = new ArrayList(maximumChunkSize);
+     // Handle the regex pattern
+     Pattern keyPattern = Pattern.compile(regex);
+     if (region != null) {
+       for (Iterator it = region.keySet(sendTombstonesInRIResults(servConn, policy)).iterator(); it.hasNext();) {
+         Object entryKey = it.next();
+         if (!(entryKey instanceof String)) {
+           // key is not a String, cannot apply regex to this entry
+           continue;
+         }
+         if (!keyPattern.matcher((String)entryKey).matches()) {
+           // key does not match the regex, this entry should not be returned.
+           continue;
+         }
+ 
+         appendInterestResponseKey(region, regex, entryKey, keyList, "regex",
+             servConn);
+       }
+     }
+     // Send the last chunk (the only chunk for individual and list keys)
+     // always send it back, even if the list is of zero size.
+     sendRegisterInterestResponseChunk(region, regex, keyList, true, servConn);
+   }
+ 
+   /**
+    * Process an interest request of type {@link InterestType#REGULAR_EXPRESSION}
+    *
+    * @param region
+    *                the region
+    * @param regex
+    *                the regex
+    * @param policy
+    *                the policy
+    * @throws IOException
+    */
+   private static void handleRegExPR(final PartitionedRegion region,
+       final String regex, final InterestResultPolicy policy,
+       final ServerConnection servConn) throws IOException {
+     final ArrayList keyList = new ArrayList(maximumChunkSize);
+     region.getKeysWithRegEx(regex, sendTombstonesInRIResults(servConn, policy), new PartitionedRegion.SetCollector() {
+       public void receiveSet(Set theSet) throws IOException {
+         appendInterestResponseKeys(region, regex, theSet, keyList, "regex",
+             servConn);
+       }
+     });
+     // Send the last chunk (the only chunk for individual and list keys)
+     // always send it back, even if the list is of zero size.
+     sendRegisterInterestResponseChunk(region, regex, keyList, true, servConn);
+   }
+ 
+   /**
+    * Process an interest request involving a list of keys
+    *
+    * @param region
+    *                the region
+    * @param keyList
+    *                the list of keys
+    * @param policy
+    *                the policy
+    * @throws IOException
+    */
+   private static void handleListPR(final PartitionedRegion region,
+       final List keyList, final InterestResultPolicy policy,
+       final ServerConnection servConn) throws IOException {
+     final ArrayList newKeyList = new ArrayList(maximumChunkSize);
+     region.getKeysWithList(keyList, sendTombstonesInRIResults(servConn, policy), new PartitionedRegion.SetCollector() {
+       public void receiveSet(Set theSet) throws IOException {
+         appendInterestResponseKeys(region, keyList, theSet, newKeyList, "list",
+             servConn);
+       }
+     });
+     // Send the last chunk (the only chunk for individual and list keys)
+     // always send it back, even if the list is of zero size.
+     sendRegisterInterestResponseChunk(region, keyList, newKeyList, true,
+         servConn);
+   }
+ 
+   @SuppressWarnings("rawtypes")
+   private static void handleKVList(final LocalRegion region,
+       final List keyList, boolean serializeValues,
+       final ServerConnection servConn) throws IOException {
+ 
+     if (region != null && region instanceof PartitionedRegion) {
+       handleKVKeysPR((PartitionedRegion)region, keyList, serializeValues, servConn);
+       return;
+     }
+     VersionedObjectList values = new VersionedObjectList(maximumChunkSize,
+         true, region == null ? true : region.getAttributes()
+             .getConcurrencyChecksEnabled(), serializeValues);
+ 
+     // Handle list of keys
+     if (region != null) {
+       VersionTag versionTag = null;
+       Object data = null;
+ 
+       for (Iterator it = keyList.iterator(); it.hasNext();) {
+         Object key = it.next();
+         if (region.containsKey(key) || region.containsTombstone(key)) {
+           EntryEventImpl versionHolder = EntryEventImpl
+               .createVersionTagHolder();
+ 
+           ClientProxyMembershipID id = servConn == null ? null : servConn
+               .getProxyID();
+           data = region.get(key, null, true, true, true, id, versionHolder,
+               true, false);
+           versionTag = versionHolder.getVersionTag();
+           updateValues(values, key, data, versionTag);
+ 
+           if (values.size() == maximumChunkSize) {
+             // Send the chunk and clear the list
+             // values.setKeys(null); // Now we need to send keys too.
+             sendNewRegisterInterestResponseChunk(region, keyList, values, false, servConn);
+             values.clear();
+           }
+         }
+       }
+     }
+     // Send the last chunk (the only chunk for individual and list keys)
+     // always send it back, even if the list is of zero size.
+     sendNewRegisterInterestResponseChunk(region, keyList, values, true, servConn);
+   }
+ 
+   /**
+    * Append an interest response
+    *
+    * @param region
+    *                the region (for debugging)
+    * @param riKey
+    *                the registerInterest "key" (what the client is interested
+    *                in)
+    * @param entryKey
+    *                key we're responding to
+    * @param list
+    *                list to append to
+    * @param kind
+    *                for debugging
+    */
+   private static void appendInterestResponseKey(LocalRegion region,
+       Object riKey, Object entryKey, ArrayList list, String kind,
+       ServerConnection servConn) throws IOException {
+     list.add(entryKey);
+     if (logger.isDebugEnabled()) {
+       logger.debug("{}: appendInterestResponseKey <{}>; list size was {}; region: {}",
+           servConn.getName(), entryKey, list.size(), region.getFullPath());
+     }
+     if (list.size() == maximumChunkSize) {
+       // Send the chunk and clear the list
+       sendRegisterInterestResponseChunk(region, riKey, list, false, servConn);
+       list.clear();
+     }
+   }
+ 
+   protected static void appendInterestResponseKeys(LocalRegion region,
+       Object riKey, Collection entryKeys, ArrayList collector, String riDescr,
+       ServerConnection servConn) throws IOException {
+     for (Iterator it = entryKeys.iterator(); it.hasNext();) {
+       appendInterestResponseKey(region, riKey, it.next(), collector, riDescr,
+           servConn);
+     }
+   }
+ }


Mime
View raw message