geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [07/51] [partial] incubator-geode git commit: Init
Date Tue, 28 Apr 2015 21:40:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyRegion.java
new file mode 100755
index 0000000..72bb1d5
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyRegion.java
@@ -0,0 +1,689 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
+import com.gemstone.gemfire.cache.AttributesMutator;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.RegionService;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.CacheLoaderException;
+import com.gemstone.gemfire.cache.CacheStatistics;
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.EntryExistsException;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionExistsException;
+import com.gemstone.gemfire.cache.StatisticsDisabledException;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.cache.query.FunctionDomainException;
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.TypeMismatchException;
+import com.gemstone.gemfire.cache.snapshot.RegionSnapshotService;
+import com.gemstone.gemfire.internal.cache.snapshot.RegionSnapshotServiceImpl;
+
+/**
+ * A wrapper class over an actual Region instance. This is used when the
+ * multiuser-authentication attribute is set to true.
+ * 
+ * @see ProxyCache
+ * @since 6.5
+ */
+public class ProxyRegion implements Region {
+  
+  private final ProxyCache proxyCache;
+  private final Region realRegion;
+  
+  public ProxyRegion(ProxyCache proxyCache, Region realRegion) {
+    this.proxyCache = proxyCache;
+    this.realRegion = realRegion;
+  }
+
+  public void becomeLockGrantor() {
+    throw new UnsupportedOperationException();
+  }
+
+  public void clear() {
+    try {
+      preOp();
+      this.realRegion.clear();
+    } finally {
+      postOp();
+    }
+  }
+
+  public void close() {
+    try {
+      preOp();
+      this.realRegion.close();
+    } finally {
+      postOp();
+    }
+  }
+
+  public boolean containsKey(Object key) {
+    try {
+      preOp();
+      return this.realRegion.containsKey(key);
+    } finally {
+      postOp();
+    }
+  }
+
+  public boolean containsKeyOnServer(Object key) {
+    try {
+      preOp();
+      return this.realRegion.containsKeyOnServer(key);
+    } finally {
+      postOp();
+    }
+  }
+
+  public boolean containsValue(Object value) {
+    try {
+      preOp();
+      return this.realRegion.containsValue(value);
+    } finally {
+      postOp();
+    }
+  }
+
+  public boolean containsValueForKey(Object key) {
+    try {
+      preOp();
+      return this.realRegion.containsValueForKey(key);
+    } finally {
+      postOp();
+    }
+  }
+
+  public void create(Object key, Object value) throws TimeoutException,
+      EntryExistsException, CacheWriterException {
+    try {
+      preOp();
+      this.realRegion.create(key, value);
+    } finally {
+      postOp();
+    }
+  }
+
+  public void create(Object key, Object value, Object callbackArgument)
+      throws TimeoutException, EntryExistsException, CacheWriterException {
+    try {
+      preOp();
+      this.realRegion.create(key, value, callbackArgument);
+    } finally {
+      postOp();
+    }
+  }
+
+  public Region createSubregion(String subregionName,
+      RegionAttributes regionAttributes) throws RegionExistsException,
+      TimeoutException {
+    throw new UnsupportedOperationException();
+  }
+
+  public Object destroy(Object key) throws TimeoutException,
+      EntryNotFoundException, CacheWriterException {
+    try {
+      preOp();
+      return this.realRegion.destroy(key);
+    } finally {
+      postOp();
+    }
+  }
+
+  public Object destroy(Object key, Object callbackArgument)
+      throws TimeoutException, EntryNotFoundException, CacheWriterException {
+    try {
+      preOp();
+      return this.realRegion.destroy(key, callbackArgument);
+    } finally {
+      postOp();
+    }
+  }
+
+  public void destroyRegion() throws CacheWriterException, TimeoutException {
+    try {
+      preOp();
+      this.realRegion.destroyRegion();
+    } finally {
+      postOp();
+    }
+  }
+
+  public void destroyRegion(Object callbackArgument)
+      throws CacheWriterException, TimeoutException {
+    try {
+      preOp();
+      this.realRegion.destroyRegion(callbackArgument);
+    } finally {
+      postOp();
+    }
+  }
+
+  public Set entries(boolean recursive) {
+    try {
+      preOp();
+      return this.realRegion.entries(recursive);
+    } finally {
+      postOp();
+    }
+  }
+
+  public Set entrySet(boolean recursive) {
+    try {
+      preOp();
+      return this.realRegion.entrySet(recursive);
+    } finally {
+      postOp();
+    }
+  }
+
+  public Set entrySet() {
+    try {
+      preOp();
+      return this.realRegion.entrySet();
+    } finally {
+      postOp();
+    }
+  }
+
+  public boolean existsValue(String queryPredicate)
+      throws FunctionDomainException, TypeMismatchException,
+      NameResolutionException, QueryInvocationTargetException {
+    try {
+      preOp();
+      return this.realRegion.existsValue(queryPredicate);
+    } finally {
+      postOp();
+    }
+  }
+
+  public void forceRolling() {
+    throw new UnsupportedOperationException();
+  }
+
+  public Object get(Object key) throws CacheLoaderException, TimeoutException {
+    try {
+      preOp();
+      return this.realRegion.get(key);
+    } finally {
+      postOp();
+    }
+  }
+
+  public Object get(Object key, Object callbackArgument)
+      throws TimeoutException, CacheLoaderException {
+    try {
+      preOp();
+      return this.realRegion.get(key, callbackArgument);
+    } finally {
+      postOp();
+    }
+  }
+
+  public Map getAll(Collection keys) {
+    return getAll(keys, null);
+  }
+
+  public Map getAll(Collection keys, Object callback) {
+    try {
+      preOp();
+      return this.realRegion.getAll(keys, callback);
+    } finally {
+      postOp();
+    }
+  }
+
+  public RegionAttributes getAttributes() {
+    return realRegion.getAttributes();
+  }
+
+  public AttributesMutator getAttributesMutator() {
+    throw new UnsupportedOperationException();
+  }
+
+  public Cache getCache() {
+    throw new UnsupportedOperationException();
+  }
+
+  public RegionService getRegionService() {
+    return this.proxyCache;
+  }
+  
+  public ProxyCache getAuthenticatedCache() {
+    return this.proxyCache;    
+  }
+
+  public Lock getDistributedLock(Object key) throws IllegalStateException {
+    throw new UnsupportedOperationException();
+  }
+
+  public Entry getEntry(Object key) {
+    try {
+      preOp();
+      return this.realRegion.getEntry(key);
+    } finally {
+      postOp();
+    }
+  }
+
+  public String getFullPath() {
+    return this.realRegion.getFullPath();
+  }
+
+  public List getInterestList() {
+    throw new UnsupportedOperationException();
+  }
+
+  public List getInterestListRegex() {
+    throw new UnsupportedOperationException();
+  }
+
+  public String getName() {
+    return this.realRegion.getName();
+  }
+
+  public Region getParentRegion() {
+    return this.realRegion.getParentRegion();
+  }
+
+  public Lock getRegionDistributedLock() throws IllegalStateException {
+    throw new UnsupportedOperationException();
+  }
+
+  public CacheStatistics getStatistics() throws StatisticsDisabledException {
+    return this.realRegion.getStatistics();
+  }
+
+  public Region getSubregion(String path) {
+    Region region = this.realRegion.getSubregion(path);
+    return region != null ? new ProxyRegion(this.proxyCache, region) : null;
+  }
+
+  public Object getUserAttribute() {
+    return this.realRegion.getUserAttribute();
+  }
+
+  public void invalidate(Object key) throws TimeoutException,
+      EntryNotFoundException {
+    try {
+      preOp();
+      this.realRegion.invalidate(key);
+    } finally {
+      postOp();
+    }
+  }
+
+  public void invalidate(Object key, Object callbackArgument)
+      throws TimeoutException, EntryNotFoundException {
+    try {
+      preOp();
+      this.realRegion.invalidate(key, callbackArgument);
+    } finally {
+      postOp();
+    }
+  }
+
+  public void invalidateRegion() throws TimeoutException {
+    try {
+      preOp();
+      this.realRegion.invalidateRegion();
+    } finally {
+      postOp();
+    }
+  }
+
+  public void invalidateRegion(Object callbackArgument) throws TimeoutException {
+    try {
+      preOp();
+      this.realRegion.invalidateRegion(callbackArgument);
+    } finally {
+      postOp();
+    }
+  }
+
+  public boolean isDestroyed() {
+    return this.realRegion.isDestroyed();
+  }
+
+  public boolean isEmpty() {
+    return this.realRegion.isEmpty();
+  }
+
+  public Set keySet() {
+    return this.realRegion.keySet();
+  }
+
+  public Set keySetOnServer() {
+    try {
+      preOp();
+      return this.realRegion.keySetOnServer();
+    } finally {
+      postOp();
+    }
+  }
+
+  public Set keys() {
+    try {
+      preOp();
+      return this.realRegion.keys();
+    } finally {
+      postOp();
+    }
+  }
+
+  public void loadSnapshot(InputStream inputStream) throws IOException,
+      ClassNotFoundException, CacheWriterException, TimeoutException {
+    throw new UnsupportedOperationException();
+  }
+
+  public void localClear() {
+    throw new UnsupportedOperationException(
+        "Local operations are not supported when multiuser-authentication is true.");
+  }
+
+  public void localDestroy(Object key) throws EntryNotFoundException {
+    throw new UnsupportedOperationException(
+        "Local operations are not supported when multiuser-authentication is true.");
+  }
+
+  public void localDestroy(Object key, Object callbackArgument)
+      throws EntryNotFoundException {
+    throw new UnsupportedOperationException(
+        "Local operations are not supported when multiuser-authentication is true.");
+  }
+
+  public void localDestroyRegion() {
+    throw new UnsupportedOperationException(
+        "Local operations are not supported when multiuser-authentication is true.");
+  }
+
+  public void localDestroyRegion(Object callbackArgument) {
+    throw new UnsupportedOperationException(
+        "Local operations are not supported when multiuser-authentication is true.");
+  }
+
+  public void localInvalidate(Object key) throws EntryNotFoundException {
+    throw new UnsupportedOperationException(
+        "Local operations are not supported when multiuser-authentication is true.");
+  }
+
+  public void localInvalidate(Object key, Object callbackArgument)
+      throws EntryNotFoundException {
+    throw new UnsupportedOperationException(
+        "Local operations are not supported when multiuser-authentication is true.");
+  }
+
+  public void localInvalidateRegion() {
+    throw new UnsupportedOperationException(
+        "Local operations are not supported when multiuser-authentication is true.");
+  }
+
+  public void localInvalidateRegion(Object callbackArgument) {
+    throw new UnsupportedOperationException(
+        "Local operations are not supported when multiuser-authentication is true.");
+  }
+
+  public Object put(Object key, Object value) throws TimeoutException,
+      CacheWriterException {
+    try {
+      preOp();
+      return this.realRegion.put(key, value);
+    } finally {
+      postOp();
+    }
+  }
+
+  public Object put(Object key, Object value, Object callbackArgument)
+      throws TimeoutException, CacheWriterException {
+    try {
+      preOp();
+      return this.realRegion.put(key, value, callbackArgument);
+    } finally {
+      postOp();
+    }
+  }
+
+  public void putAll(Map map) {
+    putAll(map, null);
+  }
+  
+  @Override
+  public void putAll(Map map, Object callbackArg) {
+    try {
+      preOp();
+      this.realRegion.putAll(map, callbackArg);
+    } finally {
+      postOp();
+    }
+  }
+
+  public SelectResults query(String queryPredicate)
+      throws FunctionDomainException, TypeMismatchException,
+      NameResolutionException, QueryInvocationTargetException {
+    try {
+      preOp();
+      return this.realRegion.query(queryPredicate);
+    } finally {
+      postOp();
+    }
+  }
+
+  public void registerInterest(Object key) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void registerInterest(Object key, InterestResultPolicy policy) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void registerInterest(Object key, boolean isDurable) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void registerInterest(Object key, boolean isDurable,
+      boolean receiveValues) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void registerInterest(Object key, InterestResultPolicy policy,
+      boolean isDurable, boolean receiveValues) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void registerInterest(Object key, InterestResultPolicy policy,
+      boolean isDurable) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void registerInterestRegex(String regex) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void registerInterestRegex(String regex, InterestResultPolicy policy) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void registerInterestRegex(String regex, boolean isDurable) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void registerInterestRegex(String regex, boolean isDurable,
+      boolean receiveValues) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void registerInterestRegex(String regex, InterestResultPolicy policy,
+      boolean isDurable) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void registerInterestRegex(String regex, InterestResultPolicy policy,
+      boolean isDurable, boolean receiveValues) {
+    throw new UnsupportedOperationException();
+  }
+
+  public Object remove(Object key) {
+    try {
+      preOp();
+      return this.realRegion.remove(key);
+    } finally {
+      postOp();
+    }
+  }
+
+  public void saveSnapshot(OutputStream outputStream) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  public Object selectValue(String queryPredicate)
+      throws FunctionDomainException, TypeMismatchException,
+      NameResolutionException, QueryInvocationTargetException {
+    try {
+      preOp();
+      return this.realRegion.selectValue(queryPredicate);
+    } finally {
+      postOp();
+    }
+  }
+
+  public void setUserAttribute(Object value) {
+    throw new UnsupportedOperationException();
+  }
+
+  public int size() {
+    try {
+      preOp();
+      return this.realRegion.size();
+    } finally {
+      postOp();
+    }
+  }
+
+  public Set subregions(boolean recursive) {
+    return this.realRegion.subregions(recursive);
+  }
+
+  public void unregisterInterest(Object key) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void unregisterInterestRegex(String regex) {
+    throw new UnsupportedOperationException();
+  }
+
+  public Collection values() {
+    try {
+      preOp();
+      return this.realRegion.values();
+    } finally {
+      postOp();
+    }
+  }
+
+  public void writeToDisk() {
+    throw new UnsupportedOperationException();
+  }
+
+  private void preOp() {
+    if (this.proxyCache.isClosed()) {
+      throw new CacheClosedException("Cache is closed for this user.");
+    }
+    UserAttributes.userAttributes.set(this.proxyCache.getUserAttributes());
+  }
+
+  private void postOp() {
+    this.proxyCache.setUserAttributes(UserAttributes.userAttributes.get());
+    UserAttributes.userAttributes.set(null);
+  }
+
+  public Region getRealRegion() {
+    return realRegion;
+  }
+
+  /* (non-Javadoc)
+   * @see java.util.concurrent.ConcurrentMap#putIfAbsent(java.lang.Object, java.lang.Object)
+   */
+  public Object putIfAbsent(Object key, Object value) {
+    try {
+      preOp();
+      return this.realRegion.putIfAbsent(key, value);
+    } finally {
+      postOp();
+}
+  }
+
+  /* (non-Javadoc)
+   * @see java.util.concurrent.ConcurrentMap#remove(java.lang.Object, java.lang.Object)
+   */
+  public boolean remove(Object key, Object value) {
+    try {
+      preOp();
+      return this.realRegion.remove(key, value);
+    } finally {
+      postOp();
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see java.util.concurrent.ConcurrentMap#replace(java.lang.Object, java.lang.Object)
+   */
+  public Object replace(Object key, Object value) {
+    try {
+      preOp();
+      return this.realRegion.replace(key, value);
+    } finally {
+      postOp();
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see java.util.concurrent.ConcurrentMap#replace(java.lang.Object, java.lang.Object, java.lang.Object)
+   */
+  public boolean replace(Object key, Object oldValue, Object newValue) {
+    try {
+      preOp();
+      return this.realRegion.replace(key, oldValue, newValue);
+    } finally {
+      postOp();
+    }
+  }
+  
+  public RegionSnapshotService<?, ?> getSnapshotService() {
+    return new RegionSnapshotServiceImpl(this);
+  }
+
+  @Override
+  public void removeAll(Collection keys) {
+    removeAll(keys, null);
+  }
+
+  @Override
+  public void removeAll(Collection keys, Object aCallbackArgument) {
+    try {
+      preOp();
+      this.realRegion.removeAll(keys, aCallbackArgument);
+    } finally {
+      postOp();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java
new file mode 100644
index 0000000..a112bda
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java
@@ -0,0 +1,417 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.CachedDeserializable;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PutAllPartialResultException;
+import com.gemstone.gemfire.internal.cache.PutAllPartialResultException.PutAllPartialResult;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Does a region putAll on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class PutAllOp {
+  
+  private static final Logger logger = LogService.getLogger();
+  
+  public static final int FLAG_EMPTY = 0x01;
+  public static final int FLAG_CONCURRENCY_CHECKS = 0x02;
+
+  /**
+   * Does a region put on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the putAll on
+   * @param map the Map of keys and values to put
+   * @param eventId the event id for this putAll
+   * @param skipCallbacks true if no callbacks will be invoked
+   */
+  public static VersionedObjectList execute(ExecutablePool pool,
+                             Region region,
+                             Map map,
+                             EventID eventId,
+                             boolean skipCallbacks,
+                             boolean isRetry, Object callbackArg)
+  {
+    PutAllOpImpl op = new PutAllOpImpl(region, map,
+        eventId, ((PoolImpl)pool).getPRSingleHopEnabled(), skipCallbacks, callbackArg);
+    op.initMessagePart();
+    if(isRetry) {
+      op.getMessage().setIsRetry();
+    }
+    return (VersionedObjectList)pool.execute(op);
+  }
+  
+  /**
+   * Does a region put on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the putAll on
+   * @param map the Map of keys and values to put
+   * @param eventId the event id for this putAll
+   */
+  public static VersionedObjectList execute(ExecutablePool pool,
+                             Region region,
+                             Map map,
+                             EventID eventId, 
+                             boolean skipCallbacks,
+                             int retryAttempts, Object callbackArg)
+  {
+    ClientMetadataService cms = ((LocalRegion)region).getCache()
+        .getClientMetadataService();
+
+    Map<ServerLocation, HashSet> serverToFilterMap = cms.getServerToFilterMap(
+        map.keySet(), region, true);
+
+    if (serverToFilterMap == null || serverToFilterMap.isEmpty()) {
+      AbstractOp op = new PutAllOpImpl(region, map,
+          eventId, ((PoolImpl)pool).getPRSingleHopEnabled(), skipCallbacks, callbackArg);
+      op.initMessagePart();
+      return (VersionedObjectList)pool.execute(op);
+    }
+
+    List callableTasks = constructAndGetPutAllTasks(region, map,
+        eventId, skipCallbacks, serverToFilterMap, (PoolImpl)pool, callbackArg);
+
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (isDebugEnabled) {
+      logger.debug("PutAllOp#execute : Number of putAll tasks is : {}", callableTasks.size());
+    }
+    HashMap<ServerLocation, RuntimeException> failedServers = new HashMap<ServerLocation,RuntimeException>();
+    PutAllPartialResult result = new PutAllPartialResult(map.size());
+    try {
+      Map<ServerLocation, Object> results = SingleHopClientExecutor
+          .submitBulkOp(callableTasks, cms, (LocalRegion)region, failedServers);
+      for (Map.Entry<ServerLocation, Object> entry: results.entrySet()) {
+        Object value = entry.getValue();
+        if (value instanceof PutAllPartialResultException) {
+          PutAllPartialResultException pap = (PutAllPartialResultException)value;
+          if (isDebugEnabled) {
+            logger.debug("PutAll SingleHop encountered PutAllPartialResultException exception: {}, failedServers are {}", pap, failedServers.keySet());
+          }
+          result.consolidate(pap.getResult());
+        } else {
+          if (value != null) {
+            VersionedObjectList list = (VersionedObjectList)value;
+            result.addKeysAndVersions(list);
+          }
+        }
+      }
+    } catch (RuntimeException ex) {
+      if (isDebugEnabled) {
+        logger.debug("single-hop putAll encountered unexpected exception: ", ex);
+      }
+      throw ex;
+    }
+
+    if (!failedServers.isEmpty()) {
+      if (retryAttempts == 0) {
+        throw failedServers.values().iterator().next();
+      }
+
+      // if the partial result set doesn't already have keys (for tracking version tags)
+      // then we need to gather up the keys that we know have succeeded so far and
+      // add them to the partial result set
+      if (result.getSucceededKeysAndVersions().size() == 0) {
+        // if there're failed servers, we need to save the succeed keys in submitPutAll
+        // if retry succeeded, everything is ok, otherwise, the saved "succeeded
+        // keys" should be consolidated into PutAllPartialResultException
+      // succeedKeySet is used to send back to client in PartialResult case
+      // so it's not a must to use LinkedHashSet
+      Set succeedKeySet = new LinkedHashSet();
+      Set<ServerLocation> serverSet = serverToFilterMap.keySet();
+      for (ServerLocation server : serverSet) {
+        if (!failedServers.containsKey(server)) {
+          succeedKeySet.addAll(serverToFilterMap.get(server));
+        }
+      }
+  
+      // save succeedKeys, but if retries all succeeded, discard the PutAllPartialResult
+        result.addKeys(succeedKeySet);
+      }
+      
+      // send maps for the failed servers one by one instead of merging 
+      // them into one big map. The reason is, we have to keep the same event
+      // ids for each sub map. There is a unit test in PutAllCSDUnitTest for
+      // the otherwise case.
+      boolean oneSubMapRetryFailed = false;
+      Set<ServerLocation> failedServerSet = failedServers.keySet();
+      for (ServerLocation failedServer : failedServerSet) {
+        //        Throwable failedServers.values().iterator().next();
+        RuntimeException savedRTE = failedServers.get(failedServer);
+        if (savedRTE instanceof PutAllPartialResultException) {
+          // will not retry for PutAllPartialResultException
+          // but it means at least one sub map ever failed 
+          oneSubMapRetryFailed = true;
+          continue;
+        }
+        Map newMap = new LinkedHashMap();
+        Set keySet = serverToFilterMap.get(failedServer);
+        for (Object key : keySet) {
+          newMap.put(key, map.get(key));
+        }
+
+        try {
+          VersionedObjectList v = PutAllOp.execute(pool, region, newMap, eventId, skipCallbacks, true, callbackArg);
+          if (v == null) {
+            result.addKeys(keySet);
+          } else {
+            result.addKeysAndVersions(v);
+          }
+        } catch (PutAllPartialResultException pre) {
+          oneSubMapRetryFailed = true;
+          if (logger.isDebugEnabled()) {
+            logger.debug("Retry failed with PutAllPartialResultException: {} Before retry: {}", pre, result.getKeyListString());
+          }
+          result.consolidate(pre.getResult());
+        } catch (Exception rte) {
+          oneSubMapRetryFailed = true;
+            Object firstKey = newMap.keySet().iterator().next();
+            result.saveFailedKey(firstKey, rte);
+          }
+      } // for failedServer
+
+      // If all retries succeeded, the PRE in first tries can be ignored
+      if (oneSubMapRetryFailed && result.hasFailure()) {
+        PutAllPartialResultException pre = new PutAllPartialResultException(result);
+        throw pre;
+      }
+    } // failedServers!=null
+
+    return result.getSucceededKeysAndVersions();
+  }
+  
+  private PutAllOp() {
+    // no instances allowed
+  }
+  
+  
+  static List constructAndGetPutAllTasks(Region region, final Map map,
+      final EventID eventId, 
+      boolean skipCallbacks,
+      final Map<ServerLocation, HashSet> serverToFilterMap,
+      final PoolImpl pool, Object callbackArg) {
+    final List<SingleHopOperationCallable> tasks = new ArrayList<SingleHopOperationCallable>();
+    ArrayList<ServerLocation> servers = new ArrayList<ServerLocation>(
+        serverToFilterMap.keySet());
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Constructing tasks for the servers {}", servers);
+    }
+    for (ServerLocation server : servers) {
+      Set filterSet = serverToFilterMap.get(server);
+      Map newKeysValuesMap = new LinkedHashMap();
+      // iterator 1: for single hop, both iterator filterSet and newKeysValuesMap
+      for (Object key : filterSet) {
+        newKeysValuesMap.put(key, map.get(key));
+      }
+      AbstractOp putAllOp = new PutAllOpImpl(region,
+          newKeysValuesMap, eventId, true, skipCallbacks, callbackArg);
+
+      SingleHopOperationCallable task = new SingleHopOperationCallable(
+          new ServerLocation(server.getHostName(), server.getPort()), pool,
+          putAllOp,UserAttributes.userAttributes.get());
+      tasks.add(task);
+    }
+    return tasks;
+  }
+
+  private static class PutAllOpImpl extends AbstractOp {
+    
+    private boolean prSingleHopEnabled = false;
+    
+    private LocalRegion region = null;
+    
+    private Map map = null;
+    private final Object callbackArg;
+    private ArrayList keys = null;
+    
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public PutAllOpImpl(Region region, Map map,
+        EventID eventId, boolean prSingleHopEnabled, boolean skipCallbacks, Object callbackArg) {
+      super(callbackArg != null ? MessageType.PUT_ALL_WITH_CALLBACK : MessageType.PUTALL, (callbackArg != null ? 6 : 5) + (map.size() * 2));
+      this.prSingleHopEnabled = prSingleHopEnabled;
+      this.region = (LocalRegion)region;
+      getMessage().addStringPart(region.getFullPath());
+      getMessage().addBytesPart(eventId.calcBytes());
+      getMessage().addIntPart(skipCallbacks ? 1 : 0);
+      this.map = map;
+      this.callbackArg = callbackArg;
+    }
+    
+    @Override
+    protected void initMessagePart() {
+      int size = map.size();
+      int flags = 0;
+      if (region.getDataPolicy() == DataPolicy.EMPTY) {
+        flags |= FLAG_EMPTY;
+      }
+      if (region.getConcurrencyChecksEnabled()) {
+        flags |= FLAG_CONCURRENCY_CHECKS;
+      }
+      getMessage().addIntPart(flags);
+      getMessage().addIntPart(size);
+      if (this.callbackArg != null) {
+        getMessage().addObjPart(this.callbackArg);
+      }
+      this.keys = new ArrayList(size);
+      Iterator iterator = map.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry mapEntry = (Map.Entry)iterator.next();
+        Object key = mapEntry.getKey();
+        this.keys.add(key);
+        getMessage().addStringOrObjPart(key);
+        Object value = mapEntry.getValue();
+        if (value instanceof CachedDeserializable) {
+          {
+            Object cdValue = ((CachedDeserializable)value).getValue();
+            if (cdValue instanceof byte[]) {
+              getMessage().addRawPart((byte[])cdValue, true);
+            } else {
+              getMessage().addObjPart(cdValue);
+            }
+          }
+        } else {
+          getMessage().addObjPart(value);
+        }
+      }      
+    }
+    @Override  
+    protected Message createResponseMessage() {
+      return new ChunkedMessage(2, Version.CURRENT);
+    }
+    
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    protected Object processResponse(final Message msg, final Connection con) throws Exception {
+      final VersionedObjectList result = new VersionedObjectList();
+      final Exception[] exceptionRef = new Exception[1];
+      try {
+        processChunkedResponse((ChunkedMessage)msg,
+                             "putAll",
+                             new ChunkHandler() {
+                               public void handle(ChunkedMessage cm) throws Exception {
+                                 int numParts = msg.getNumberOfParts();
+                                 final boolean isDebugEnabled = logger.isDebugEnabled();
+                                 if (isDebugEnabled) {
+                                   logger.debug("putAllOp.processChunkedResponse processing message with {} parts", numParts);
+                                 }
+                                 for (int partNo=0; partNo < numParts; partNo++) {
+                                   Part part = cm.getPart(partNo);
+                                   try {
+                                     Object o = part.getObject();
+                                     if (isDebugEnabled) {
+                                       logger.debug("part({}) contained {}", partNo, o);
+                                     }
+                                     if (o == null) {
+                                       // no response is an okay response
+                                     } else if (o instanceof byte[]) {
+                                       if (prSingleHopEnabled) {
+                                         byte[] bytesReceived = part.getSerializedForm();
+                                         if (/*bytesReceived.length==1 &&*/ bytesReceived[0] != ClientMetadataService.INITIAL_VERSION) { // nw hop
+                                           if (region != null) {
+                                             ClientMetadataService cms;
+                                             try {
+                                               cms = region.getCache().getClientMetadataService();
+                                               cms.scheduleGetPRMetaData(region, false,bytesReceived[1]);
+                                             }
+                                             catch (CacheClosedException e) {
+                                             }
+                                           }
+                                         }
+                                       }
+                                     } else if (o instanceof Throwable) {
+                                       String s = "While performing a remote putAll";
+                                       exceptionRef[0] = new ServerOperationException(s, (Throwable)o);
+                                     } else {
+                                       VersionedObjectList chunk = (VersionedObjectList)o;
+                                       chunk.replaceNullIDs(con.getEndpoint().getMemberId());
+                                       result.addAll(chunk);
+                                     }
+                                   } catch(Exception e) {
+                                     exceptionRef[0] = new ServerOperationException("Unable to deserialize value" , e);
+                                   }
+                                 }
+                               }
+                             });
+      } catch (ServerOperationException e) {
+        if (e.getCause() instanceof PutAllPartialResultException) {
+          PutAllPartialResultException cause = (PutAllPartialResultException)e.getCause(); 
+          cause.getSucceededKeysAndVersions().replaceNullIDs(con.getEndpoint().getMemberId());
+          throw cause;
+        } else {
+          throw e;
+        }
+      }
+      if (exceptionRef[0] != null) {
+        throw exceptionRef[0];
+      } else {
+        // v7.0.1: fill in the keys
+        if (result.hasVersions() && result.getKeys().isEmpty()) {
+          if (logger.isTraceEnabled()) {
+            logger.trace("setting keys of response to {}", this.keys);
+          }
+          result.setKeys(this.keys);
+        }
+      }
+      return result;
+    }
+    
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.PUT_DATA_ERROR;
+    }
+
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startPutAll();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endPutAllSend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endPutAll(start, hasTimedOut(), hasFailed());
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java
new file mode 100644
index 0000000..56181e2
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java
@@ -0,0 +1,540 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.AllConnectionsInUseException;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.util.BridgeWriterException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.CachedDeserializable;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Does a region put (or create) on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class PutOp {
+  
+  private static final Logger logger = LogService.getLogger();
+  
+  /**
+   * Does a region put on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the region to do the put on
+   * @param key the entry key to do the put on
+   * @param value the entry value to put
+   * @param event the event for this put
+   * @param requireOldValue
+   * @param expectedOldValue
+   * @param callbackArg an optional callback arg to pass to any cache callbacks
+   */
+  public static Object execute(ExecutablePool pool,
+                             LocalRegion region,
+                             Object key,
+                             Object value,
+                             byte[] deltaBytes,
+                             EntryEventImpl event,
+                             Operation operation,
+                             boolean requireOldValue, Object expectedOldValue,
+                             Object callbackArg,
+                             boolean prSingleHopEnabled)
+  {
+    AbstractOp op = new PutOpImpl(region, key, value, deltaBytes, event,
+        operation, requireOldValue,
+        expectedOldValue, callbackArg,
+        false/*donot send full obj; send delta*/, prSingleHopEnabled);
+
+    if (prSingleHopEnabled) {
+      ClientMetadataService cms = region.getCache().getClientMetadataService();
+      ServerLocation server = cms.getBucketServerLocation(region,
+          Operation.UPDATE, key, value, callbackArg);
+      if (server != null) {
+        try {
+          PoolImpl poolImpl = (PoolImpl)pool;
+          boolean onlyUseExistingCnx = ((poolImpl.getMaxConnections() != -1 && poolImpl
+              .getConnectionCount() >= poolImpl.getMaxConnections()) ? true
+              : false);
+          return pool.executeOn(new ServerLocation(server.getHostName(), server
+              .getPort()), op, true, onlyUseExistingCnx);
+        }
+        catch (AllConnectionsInUseException e) {
+        }
+        catch (ServerConnectivityException e) {
+          if (e instanceof ServerOperationException) {
+            throw e; // fixed 44656
+          }
+          cms.removeBucketServerLocation(server);
+        }
+        catch (BridgeWriterException e) {
+          if (e.getCause() instanceof ServerConnectivityException)
+            cms.removeBucketServerLocation(server);
+        }
+      }
+    }
+    return pool.execute(op);
+  }
+  
+  public static Object execute(ExecutablePool pool, String regionName,
+      Object key, Object value, byte[] deltaBytes, EntryEventImpl event, Operation operation,
+      boolean requireOldValue, Object expectedOldValue,
+      Object callbackArg, boolean prSingleHopEnabled, boolean isMetaRegionPutOp) {
+    AbstractOp op = new PutOpImpl(regionName, key, value, deltaBytes, event,
+        operation, requireOldValue,
+        expectedOldValue, callbackArg,
+        false/*donot send full obj; send delta*/,  prSingleHopEnabled);
+    ((PutOpImpl)op).setMetaRegionPutOp(isMetaRegionPutOp);
+    return pool.execute(op);
+  }
+
+  
+  /**
+   * This is a unit test method.
+   * It does a region put on a server using the given connection from the given pool
+   * to communicate with the server. Do not call this method if the value is 
+   * Delta instance.
+   * @param con the connection to use 
+   * @param pool the pool to use to communicate with the server.
+   * @param regionName the name of the region to do the put on
+   * @param key the entry key to do the put on
+   * @param value the entry value to put
+   * @param event the event for this put
+   * @param callbackArg an optional callback arg to pass to any cache callbacks
+   */
+  public static void execute(Connection con,
+                             ExecutablePool pool,
+                             String regionName,
+                             Object key,
+                             Object value,
+                             EntryEventImpl event,
+                             Object callbackArg,
+                             boolean prSingleHopEnabled)
+  {
+    AbstractOp op = new PutOpImpl(regionName, key, value, null,
+        event, Operation.CREATE, false,
+        null, callbackArg, false /*donot send full Obj; send delta*/, prSingleHopEnabled);
+    pool.executeOn(con, op);
+  }
+
+  public static final byte HAS_OLD_VALUE_FLAG = 0x01;
+  public static final byte OLD_VALUE_IS_OBJECT_FLAG = 0x02;
+  public static final byte HAS_VERSION_TAG = 0x04;
+                                                               
+  private PutOp() {
+    // no instances allowed
+  }
+  
+  private static class PutOpImpl extends AbstractOp {
+
+    private Object key;
+
+    
+    private LocalRegion region;
+    
+    /**
+     * the operation will have either a region or a regionName.  Names seem
+     * to be used by unit tests to exercise operations without creating a
+     * real region 
+     */
+    private String regionName;
+
+    private Object value;
+    
+    private boolean deltaSent = false;
+
+    private EntryEventImpl event;
+    
+    private Object callbackArg;
+
+    private boolean isMetaRegionPutOp;
+
+    private boolean prSingleHopEnabled;
+    
+    private boolean requireOldValue;
+
+    private Object expectedOldValue;
+    
+    public PutOpImpl(String regionName , Object key, Object value, byte[] deltaBytes, 
+        EntryEventImpl event,
+        Operation op, boolean requireOldValue,
+        Object expectedOldValue, Object callbackArg,
+        boolean sendFullObj, boolean prSingleHopEnabled) {
+        super(MessageType.PUT, 7 + (callbackArg != null? 1:0) + (expectedOldValue != null? 1:0));
+        final boolean isDebugEnabled = logger.isDebugEnabled();
+        if (isDebugEnabled) {
+          logger.debug("PutOpImpl constructing(1) message for {}; operation={}", event.getEventId(), op);
+        }
+        this.key = key;
+        this.callbackArg = callbackArg;
+        this.event = event;
+        this.value = value;
+        this.regionName = regionName;
+        this.prSingleHopEnabled = prSingleHopEnabled;
+        this.requireOldValue = requireOldValue;
+        this.expectedOldValue = expectedOldValue;
+        getMessage().addStringPart(regionName);
+        getMessage().addObjPart(op);
+        int flags = 0;
+        if (requireOldValue) flags |= 0x01;
+        if (expectedOldValue != null) flags |= 0x02;
+        getMessage().addIntPart(flags);
+        if (expectedOldValue != null) {
+          getMessage().addObjPart(expectedOldValue);
+        }
+        getMessage().addStringOrObjPart(key);
+        // Add message part for sending either delta or full value
+        if (!sendFullObj && deltaBytes != null && op == Operation.UPDATE) {
+          getMessage().addObjPart(Boolean.TRUE);
+          getMessage().addBytesPart(deltaBytes);
+          this.deltaSent = true;
+          if (isDebugEnabled) {
+            logger.debug("PutOp: Sending delta for key {}", this.key);
+          }
+        }
+        else if (value instanceof CachedDeserializable) {
+          {
+            getMessage().addObjPart(Boolean.FALSE);
+            Object cdValue = ((CachedDeserializable)value).getValue();
+            if (cdValue instanceof byte[]) {
+              getMessage().addRawPart((byte[])cdValue, true);
+            }
+            else {
+              getMessage().addObjPart(cdValue);
+            }
+          }
+        }
+        else {
+          getMessage().addObjPart(Boolean.FALSE);
+          getMessage().addObjPart(value);
+        }
+        getMessage().addBytesPart(event.getEventId().calcBytes());
+        if (callbackArg != null) {
+          getMessage().addObjPart(callbackArg);
+        }
+    }
+
+    public PutOpImpl(Region region, Object key, Object value, byte[] deltaBytes,
+        EntryEventImpl event, 
+        Operation op, boolean requireOldValue, 
+        Object expectedOldValue, Object callbackArg,
+        boolean sendFullObj, boolean prSingleHopEnabled) {
+      super(MessageType.PUT, 7 + (callbackArg != null? 1:0) + (expectedOldValue != null? 1:0));
+      this.key = key;
+      this.callbackArg = callbackArg;
+      this.event = event;
+      this.value = value;
+      this.region = (LocalRegion)region;
+      this.regionName = region.getFullPath();
+      this.prSingleHopEnabled = prSingleHopEnabled;
+      final boolean isDebugEnabled = logger.isDebugEnabled();
+      if (isDebugEnabled) {
+        logger.debug("PutOpImpl constructing message with operation={}", op);
+      }
+      getMessage().addStringPart(region.getFullPath());
+      getMessage().addObjPart(op);
+      int flags = 0;
+      if (requireOldValue) flags |= 0x01;
+      if (expectedOldValue != null) flags |= 0x02;
+      getMessage().addIntPart(flags);
+      if (expectedOldValue != null) {
+        getMessage().addObjPart(expectedOldValue);
+      }
+      getMessage().addStringOrObjPart(key);
+      // Add message part for sending either delta or full value
+      if (!sendFullObj && deltaBytes != null && op == Operation.UPDATE) {
+        getMessage().addObjPart(Boolean.TRUE);
+        getMessage().addBytesPart(deltaBytes);
+        this.deltaSent = true;
+        if (isDebugEnabled) {
+          logger.debug("PutOp: Sending delta for key {}", this.key);
+        }
+      }
+      else if (value instanceof CachedDeserializable) {
+        {
+          getMessage().addObjPart(Boolean.FALSE);
+          Object cdValue = ((CachedDeserializable)value).getValue();
+          if (cdValue instanceof byte[]) {
+            getMessage().addRawPart((byte[])cdValue, true);
+          }
+          else {
+            getMessage().addObjPart(cdValue);
+          }
+        }
+      }
+      else {
+        getMessage().addObjPart(Boolean.FALSE);
+        getMessage().addObjPart(value);
+      }
+      getMessage().addBytesPart(event.getEventId().calcBytes());
+      if (callbackArg != null) {
+        getMessage().addObjPart(callbackArg);
+      }
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      throw new UnsupportedOperationException("processResponse should not be invoked in PutOp.  Use processResponse(Message, Connection)");
+//      processAck(msg, "put");
+//      if (prSingleHopEnabled) {
+//        byte version = 0;
+//        Part part = msg.getPart(0);
+//        byte[] bytesReceived = part.getSerializedForm();
+//        if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION
+//            && bytesReceived.length == ClientMetadataService.SIZE_BYTES_ARRAY_RECEIVED) { // nw hop
+//          if (this.region != null) {
+//            ClientMetadataService cms;
+//            try {
+//              cms = region.getCache().getClientMetadataService();
+//              version = cms.getMetaDataVersion(region, Operation.UPDATE,
+//                  key, value, callbackArg);
+//            }
+//            catch (CacheClosedException e) {
+//              return null;
+//            }
+//            if (bytesReceived[0] != version) {
+//              cms.scheduleGetPRMetaData(region, false,bytesReceived[1]);
+//            }
+//          }
+//        }
+//      }
+//      return null;
+    }
+
+    /*
+     * Process a response that contains an ack.
+     * 
+     * @param msg
+     *                the message containing the response
+     * @param con
+     *                Connection on which this op is executing
+     * @throws Exception
+     *                 if response could not be processed or we received a
+     *                 response with a server exception.
+     * @since 6.1
+     */
+    @Override
+    protected Object processResponse(Message msg, Connection con)
+        throws Exception {
+      processAck(msg, "put", con);
+      byte version = 0 ;
+      if (prSingleHopEnabled) {
+        Part part = msg.getPart(0);
+        byte[] bytesReceived = part.getSerializedForm();
+        if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION
+            && bytesReceived.length == ClientMetadataService.SIZE_BYTES_ARRAY_RECEIVED) {
+          if (this.region != null) {
+            ClientMetadataService cms;
+              cms = region.getCache().getClientMetadataService();
+              version = cms.getMetaDataVersion(region, Operation.UPDATE,
+                  key, value, callbackArg);
+            if (bytesReceived[0] != version) {
+              cms.scheduleGetPRMetaData(region, false, bytesReceived[1]);
+            }
+          }
+        }
+      }
+      if (msg.getMessageType() == MessageType.REPLY
+          &&  msg.getNumberOfParts() > 1) {
+        int flags = msg.getPart(1).getInt();
+        int partIdx = 2;
+        Object oldValue = null;
+        if ((flags & HAS_OLD_VALUE_FLAG) != 0) {
+          oldValue = msg.getPart(partIdx++).getObject();
+          if ((flags & OLD_VALUE_IS_OBJECT_FLAG) != 0 && oldValue instanceof byte[]) {
+            ByteArrayInputStream in = new ByteArrayInputStream((byte[])oldValue);
+            DataInputStream din = new DataInputStream(in);
+            oldValue = DataSerializer.readObject(din);
+          }
+//          if (lw.fineEnabled()) {
+//            lw.fine("read old value from server response: " + oldValue);
+//          }
+        }
+        // if the server has versioning we will attach it to the client's event
+        // here so it can be applied to the cache
+        if ((flags & HAS_VERSION_TAG) != 0) {
+          VersionTag tag = (VersionTag)msg.getPart(partIdx++).getObject();
+          // we use the client's ID since we apparently don't track the server's ID in connections
+          tag.replaceNullIDs((InternalDistributedMember) con.getEndpoint().getMemberId());
+          this.event.setVersionTag(tag);
+        }
+        return oldValue;
+      }
+      return null;
+    }
+    
+    /**
+     * Process a response that contains an ack.
+     * 
+     * @param msg
+     *                the message containing the response
+     * @param opName
+     *                text describing this op
+     * @param con
+     *                Connection on which this op is executing
+     * @throws Exception
+     *                 if response could not be processed or we received a
+     *                 response with a server exception.
+     * @since 6.1
+     */
+    private final void processAck(Message msg, String opName, Connection con)
+        throws Exception
+    {
+      final int msgType = msg.getMessageType();
+      // Update delta stats
+      if (this.deltaSent && this.region != null) {
+        this.region.getCachePerfStats().incDeltasSent();
+      }
+      if (msgType == MessageType.REPLY) {
+        return;
+      }
+      else {
+        Part part = msg.getPart(0);
+        if (msgType == MessageType.PUT_DELTA_ERROR) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("PutOp: Sending full value as delta failed on server...");
+          }
+          AbstractOp op = new PutOpImpl(this.regionName, this.key, this.value,
+                null, this.event, Operation.CREATE, this.requireOldValue,
+                this.expectedOldValue, this.callbackArg,
+                true /* send full obj */, this.prSingleHopEnabled);
+          
+          op.attempt(con);
+          if (this.region != null) {
+            this.region.getCachePerfStats().incDeltaFullValuesSent();
+          }
+        }
+        else if (msgType == MessageType.EXCEPTION) {
+          String s = ": While performing a remote " + opName;
+          throw new ServerOperationException(s, (Throwable)part.getObject());
+          // Get the exception toString part.
+          // This was added for c++ thin client and not used in java
+          // Part exceptionToStringPart = msg.getPart(1);
+        }
+        else if (isErrorResponse(msgType)) {
+          throw new ServerOperationException(part.getString());
+        }
+        else {
+          throw new InternalGemFireError("Unexpected message type "
+              + MessageType.getString(msgType));
+        }
+      }
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      if (!this.isMetaRegionPutOp) {
+        super.sendMessage(cnx);
+      } else {
+        getMessage().send(false);
+      }
+    }
+
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+      if (!this.isMetaRegionPutOp) {
+        super.processSecureBytes(cnx, message);
+      }
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      boolean ret = this.isMetaRegionPutOp ? false : super.needsUserId();
+      return ret;
+    }
+
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.PUT_DATA_ERROR;
+    }
+
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startPut();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endPutSend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endPut(start, hasTimedOut(), hasFailed());
+    }
+    
+    @Override
+    public String toString() {
+      return "PutOp:"+key;
+    }
+    
+ /**
+   * Attempts to read a response to this operation by reading it from the given
+   * connection, and returning it.
+   * 
+   * @param cnx
+   *                the connection to read the response from
+   * @return the result of the operation or
+   *         <code>null</code if the operation has no result.
+     * @throws Exception if the execute failed
+   */
+    @Override
+    protected Object attemptReadResponse(Connection cnx) throws Exception
+    {
+      Message msg = createResponseMessage();
+      if (msg != null) {
+        msg.setComms(cnx.getSocket(), cnx.getInputStream(), cnx
+            .getOutputStream(), cnx.getCommBuffer(), cnx.getStats());
+        if (msg instanceof ChunkedMessage) {
+          try {
+            return processResponse(msg, cnx);
+          }
+          finally {
+            msg.unsetComms();
+            processSecureBytes(cnx, msg);
+          }
+        }
+        else {
+          try {
+            msg.recv();
+          }
+          finally {
+            msg.unsetComms();
+            processSecureBytes(cnx, msg);
+          }
+          return processResponse(msg, cnx);
+        }
+      }
+      else {
+        return null;
+      }
+    }
+
+    void setMetaRegionPutOp(boolean bool) {
+      this.isMetaRegionPutOp = bool;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueryOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueryOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueryOp.java
new file mode 100644
index 0000000..a729ba7
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueryOp.java
@@ -0,0 +1,194 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Arrays;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ObjectPartList;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.types.CollectionType;
+import com.gemstone.gemfire.cache.query.types.ObjectType;
+import com.gemstone.gemfire.cache.query.internal.QueryUtils;
+import com.gemstone.gemfire.cache.query.internal.StructImpl;
+import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl;
+import com.gemstone.gemfire.cache.query.internal.types.TypeUtils;
+import com.gemstone.gemfire.SerializationException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+
+/**
+ * Does a region query on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class QueryOp {
+  /**
+   * Does a region query on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param queryPredicate A query language boolean query predicate
+   * @return  A <code>SelectResults</code> containing the values
+   *            that match the <code>queryPredicate</code>.
+   */
+  public static SelectResults execute(ExecutablePool pool, String queryPredicate,
+                                      Object[] queryParams)
+  {
+    AbstractOp op = null;
+    
+    if (queryParams != null && queryParams.length > 0) {
+      op = new QueryOpImpl(queryPredicate, queryParams);
+    } else {
+      op = new QueryOpImpl(queryPredicate);      
+    }
+    return (SelectResults)pool.execute(op);
+  }
+                                                               
+  private QueryOp() {
+    // no instances allowed
+  }
+
+  /**
+   * Note: this class is extended by CreateCQWithIROpImpl.
+   */
+  protected static class QueryOpImpl extends AbstractOp {
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public QueryOpImpl(String queryPredicate) {
+      super(MessageType.QUERY, 1);
+      getMessage().addStringPart(queryPredicate);
+    }
+
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public QueryOpImpl(String queryPredicate, Object[] queryParams) {
+      super(MessageType.QUERY_WITH_PARAMETERS, 2 + queryParams.length);
+      getMessage().addStringPart(queryPredicate);
+      getMessage().addIntPart(queryParams.length);
+      for (Object param : queryParams){
+        getMessage().addObjPart(param);
+      }
+    }
+
+    /**
+     * This constructor is used by our subclass CreateCQWithIROpImpl
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    protected QueryOpImpl(int msgType, int numParts) {
+      super(msgType, numParts);
+    }
+    @Override  
+    protected Message createResponseMessage() {
+      return new ChunkedMessage(2, Version.CURRENT);
+    }
+    @Override  
+    protected Object processResponse(Message msg) throws Exception {
+      final SelectResults[] resultRef = new SelectResults[1];
+      final Exception[] exceptionRef = new Exception[1];
+      ChunkHandler ch = new ChunkHandler() {
+          public void handle(ChunkedMessage cm) throws Exception {
+            Part collectionTypePart = cm.getPart(0);
+            Object o = collectionTypePart.getObject();
+            if (o instanceof Throwable) {
+              String s = "While performing a remote " + getOpName();
+              exceptionRef[0] = new ServerOperationException(s, (Throwable)o);
+              return;
+            }
+            CollectionType collectionType = (CollectionType)o;
+            Part resultPart = cm.getPart(1);
+            Object queryResult = null;
+            try {
+              queryResult = resultPart.getObject();
+            } catch (Exception e) {
+              String s = "While deserializing " + getOpName() + " result";
+              exceptionRef[0] = new SerializationException(s, e);
+              return;
+            }
+            if (queryResult instanceof Throwable) {
+              String s = "While performing a remote " + getOpName();
+              exceptionRef[0] = new ServerOperationException(s, (Throwable)queryResult);
+              return;
+            } else if (queryResult instanceof Integer) {
+              // Create the appropriate SelectResults instance if necessary
+              if (resultRef[0] == null) {
+                resultRef[0] = QueryUtils.
+                               getEmptySelectResults(TypeUtils.OBJECT_TYPE,
+                                                     null);
+              }
+              resultRef[0].add(queryResult);
+            } else { // typical query result
+              // Create the appropriate SelectResults instance if necessary
+              if (resultRef[0] == null) {
+                resultRef[0] = QueryUtils.getEmptySelectResults(collectionType,
+                                                                null);
+              }
+              SelectResults selectResults = resultRef[0];
+              ObjectType objectType = collectionType.getElementType();
+              Object[] resultArray;
+             // for select * queries, the serialized object byte arrays are
+             // returned as part of ObjectPartList
+              boolean isObjectPartList = false;
+              if (queryResult instanceof ObjectPartList) {
+                isObjectPartList = true;
+                resultArray = ((ObjectPartList) queryResult).getObjects().toArray();
+              } else{ 
+                // Add the results to the SelectResults
+                resultArray = (Object[]) queryResult;
+              }
+              if (objectType.isStructType()) {
+                for (int i = 0; i < resultArray.length; i++) {
+                  if (isObjectPartList) {
+                    selectResults
+                        .add(new StructImpl((StructTypeImpl) objectType,
+                            ((ObjectPartList) resultArray[i]).getObjects()
+                                .toArray()));
+                  } else {
+                    selectResults.add(new StructImpl((StructTypeImpl) objectType,
+                        (Object[]) resultArray[i]));
+                  }
+                }
+              } else {
+                selectResults.addAll(Arrays.asList(resultArray));
+              }
+            }
+          }
+        };
+      processChunkedResponse((ChunkedMessage)msg, getOpName(), ch);
+      if (exceptionRef[0] != null) {
+        throw exceptionRef[0];
+      } else {
+        return resultRef[0];
+      }
+    }
+    protected String getOpName() {
+      return "query";
+    }
+    @Override  
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.QUERY_DATA_ERROR
+        || msgType == MessageType.CQDATAERROR_MSG_TYPE
+        || msgType == MessageType.CQ_EXCEPTION_TYPE;
+    }
+    @Override  
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startQuery();
+    }
+    @Override  
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endQuerySend(start, hasFailed());
+    }
+    @Override  
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endQuery(start, hasTimedOut(), hasFailed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueConnectionImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueConnectionImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueConnectionImpl.java
new file mode 100644
index 0000000..7a4f2ff
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueConnectionImpl.java
@@ -0,0 +1,216 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.client.internal.ServerBlackList.FailureTracker;
+import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerQueueStatus;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+
+/**
+ * A wrapper that holds a client to server connection and
+ * a server to client connection.
+ * 
+ *  The clientToServerConnection should not
+ *  be used outside of this class.
+ * @author dsmith
+ *
+ */
+public class QueueConnectionImpl implements Connection {
+  private static final Logger logger = LogService.getLogger();
+
+  private final AtomicReference/*<Connection>*/ clientToServerConn = new AtomicReference();
+  private final Endpoint endpoint;
+  private volatile ClientUpdater updater;
+  private boolean shouldDestroy;
+  private QueueManagerImpl manager;
+  private final AtomicBoolean sentClientReady = new AtomicBoolean();
+  private FailureTracker failureTracker;
+  
+  public QueueConnectionImpl(QueueManagerImpl manager, Connection clientToServer,
+      ClientUpdater updater, FailureTracker failureTracker) {
+    this.manager = manager;
+    this.clientToServerConn.set(clientToServer);
+    this.endpoint = clientToServer.getEndpoint();
+    this.updater = updater;
+    this.failureTracker = failureTracker;
+  }
+
+  public void close(boolean keepAlive) throws Exception {
+    throw new UnsupportedOperationException("Subscription connections should only be closed by subscription manager");
+  }
+  
+  public void emergencyClose() {
+    Connection conn = (Connection) clientToServerConn.getAndSet(null);
+    if(conn != null) {
+      conn.emergencyClose();
+    }
+  }
+  
+  public void internalClose(boolean keepAlive) throws Exception {
+    try {
+      getConnection().close(keepAlive);
+    } finally {
+      updater.close();
+    }
+  }
+
+  public void destroy() {
+    Connection conn = (Connection)this.clientToServerConn.get();
+    if (conn != null) {
+      manager.connectionCrashed(conn);
+    } // else someone else destroyed it
+  }
+  
+  public void internalDestroy() {
+    Connection currentConn = (Connection)this.clientToServerConn.get();
+    if(currentConn != null) {
+      if (!this.clientToServerConn.compareAndSet(currentConn, null)) {
+        // someone else did (or is doing) the internalDestroy so return
+        return;
+      }
+      try {
+        currentConn.destroy();
+      } catch(Exception e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("SubscriptionConnectionImpl - error destroying client to server connection", e);
+        }
+      }
+    } 
+
+    ClientUpdater currentUpdater = updater;
+    if(currentUpdater != null) {
+      try {
+        currentUpdater.close();
+      } catch(Exception e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("SubscriptionConnectionImpl - error destroying client updater", e);
+        }
+      }
+    }
+    updater = null;
+  }
+
+  /**
+   * test hook
+   */
+  public ClientUpdater getUpdater() {
+    return this.updater;
+  }
+  
+  public boolean isDestroyed() {
+    return clientToServerConn.get() == null;
+  }
+  
+  public boolean getShouldDestroy() {
+    return shouldDestroy;
+  }
+
+  public ByteBuffer getCommBuffer() {
+    return getConnection().getCommBuffer();
+  }
+
+  public Endpoint getEndpoint() {
+    return this.endpoint;
+  }
+
+  public ServerQueueStatus getQueueStatus() {
+    return getConnection().getQueueStatus();
+  }
+
+  public ServerLocation getServer() {
+    return getEndpoint().getLocation();
+  }
+
+  public Socket getSocket() {
+    return getConnection().getSocket();
+  }
+  
+  public OutputStream getOutputStream() {
+    return getConnection().getOutputStream();
+  }
+  
+  public InputStream getInputStream() {
+    return getConnection().getInputStream();
+  }
+
+  public ConnectionStats getStats() {
+    return getEndpoint().getStats();
+  }
+
+  public Object execute(Op op) throws Exception {
+    return getConnection().execute(op);
+  }
+  
+  public Connection getConnection() {
+    Connection result = (Connection)this.clientToServerConn.get();
+    if (result == null) {
+      throw new ConnectionDestroyedException();
+    }
+    return result;
+  }
+  
+  public FailureTracker getFailureTracker() {
+    return failureTracker;
+  }
+  
+  /**
+   * Indicate that we have, or are about to send
+   * the client create message on this connection.
+   * 
+   * @return true if we have not yet sent client ready.
+   */
+  public boolean sendClientReady() {
+    return sentClientReady.compareAndSet(false, true);
+  }
+  
+  @Override
+  public String toString() { 
+    Connection result = (Connection)this.clientToServerConn.get();
+    if(result != null) {
+      return result.toString();
+    } else {
+      return "SubscriptionConnectionImpl[" + getServer() + ":closed]";
+    }
+  }
+
+  public static void loadEmergencyClasses() {
+    ConnectionImpl.loadEmergencyClasses();
+  }
+  
+  public short getWanSiteVersion(){
+    throw new UnsupportedOperationException();
+  }
+  
+  public int getDistributedSystemId() {
+    throw new UnsupportedOperationException();
+  }
+  
+  public void setWanSiteVersion(short wanSiteVersion){
+    throw new UnsupportedOperationException();
+  }
+
+  public void setConnectionID(long id) {
+    ((Connection)this.clientToServerConn.get()).setConnectionID(id);
+  }
+
+  public long getConnectionID() {
+    return ((Connection)this.clientToServerConn.get()).getConnectionID();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueManager.java
new file mode 100644
index 0000000..062bee6
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueManager.java
@@ -0,0 +1,48 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.List;
+
+import java.util.concurrent.ScheduledExecutorService;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+
+/**
+ * @author dsmith
+ * @since 5.7
+ * 
+ */
+public interface QueueManager {
+  
+  public QueueConnections getAllConnectionsNoWait();
+
+  public QueueConnections getAllConnections();
+
+  void start(ScheduledExecutorService background);
+
+  void close(boolean keepAlive);
+  
+  public static interface QueueConnections {
+    Connection getPrimary();
+    List/*<Connection>*/ getBackups();
+    QueueConnectionImpl getConnection(Endpoint endpoint);
+  }
+
+  public QueueState getState();
+
+  public InternalPool getPool();
+  
+  public InternalLogWriter getSecurityLogger();
+
+  public void readyForEvents(InternalDistributedSystem system);
+
+  public void emergencyClose();
+  
+  public void checkEndpoint(ClientUpdater qc, Endpoint endpoint);
+}


Mime
View raw message