geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [04/51] [partial] incubator-geode git commit: Init
Date Tue, 28 Apr 2015 21:40:09 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerRegionProxy.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerRegionProxy.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerRegionProxy.java
new file mode 100644
index 0000000..a1cf4ba
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerRegionProxy.java
@@ -0,0 +1,894 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.CacheLoader;
+import com.gemstone.gemfire.cache.CacheWriter;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.Region.Entry;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.internal.ContainsKeyOp.MODE;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.cache.util.BridgeClient;
+import com.gemstone.gemfire.cache.util.BridgeLoader;
+import com.gemstone.gemfire.cache.util.BridgeWriter;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.AbstractRegion;
+import com.gemstone.gemfire.internal.cache.BridgeObserver;
+import com.gemstone.gemfire.internal.cache.BridgeObserverHolder;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.TXCommitMessage;
+import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+import com.gemstone.gemfire.internal.cache.TXStateProxy;
+import com.gemstone.gemfire.internal.cache.execute.ServerRegionFunctionExecutor;
+import com.gemstone.gemfire.internal.cache.tier.InterestType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList.Iterator;
+import com.gemstone.gemfire.internal.cache.tx.ClientTXStateStub;
+import com.gemstone.gemfire.internal.cache.tx.TransactionalOperation.ServerRegionOperation;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * Used to send region operations from a client to a server
+ * @author darrel
+ * @since 5.7
+ */
+@SuppressWarnings("deprecation")
+public class ServerRegionProxy extends ServerProxy implements ServerRegionDataAccess {
+  private static final Logger logger = LogService.getLogger();
+  
+  private final LocalRegion region;
+  private final String regionName;
+
+  
+  /**
+   * Creates a server region proxy for the given region.
+   * @param r the region
+   * @throws IllegalStateException if the region does not have a pool
+   */
+  public ServerRegionProxy(Region r) {
+    super(calcPool(r));
+    assert r instanceof LocalRegion;
+    this.region = (LocalRegion)r;
+    this.regionName = r.getFullPath();
+  }
+  /**
+   * Used by tests to create proxies for "fake" regions.
+   * Also, used by ClientStatsManager for admin region.
+   */
+  public ServerRegionProxy(String regionName, PoolImpl pool) {
+    super(pool);
+    this.region = null;
+    this.regionName = regionName;
+  }
+  
+  private static InternalPool calcPool(Region r) {
+    String poolName = r.getAttributes().getPoolName();
+    if (poolName == null || "".equals(poolName)) {
+      final CacheLoader cl = r.getAttributes().getCacheLoader();
+      final CacheWriter cw = r.getAttributes().getCacheWriter();
+      if (AbstractRegion.isBridgeLoader(cl) || AbstractRegion.isBridgeWriter(cw)) {
+        Object loaderPool = null;
+        Object writerPool = null;
+        if (AbstractRegion.isBridgeLoader(cl)) {
+          if (cl instanceof BridgeLoader) {
+            loaderPool = ((BridgeLoader)cl).getConnectionProxy();
+          } else {
+            loaderPool = ((BridgeClient)cl).getConnectionProxy();
+          }
+        }
+        if (AbstractRegion.isBridgeWriter(cw)) {
+          writerPool = ((BridgeWriter)cw).getConnectionProxy();
+        }
+        if (loaderPool != writerPool && loaderPool != null && writerPool != null) {
+          throw new IllegalStateException("The region " + r.getFullPath()
+                                          + " has a BridgeLoader and a BridgeWriter/BridgeClient "
+                                          + " that are configured with different connection pools. "
+                                          + " This is not allowed. Instead create a single BridgeClient and install it as both the loader and the writer."
+                                          + " loaderPool="+loaderPool + " writerPool=" + writerPool);
+        }
+        InternalPool result = (InternalPool)loaderPool;
+        if (result == null) {
+          result = (InternalPool)writerPool;
+        }
+        return result;
+      } else {
+        throw new IllegalStateException("The region " + r.getFullPath()
+                                        + " did not have a client pool configured.");
+      }
+    } else {
+      InternalPool pool = (InternalPool)PoolManager.find(poolName);
+      if (pool == null) {
+        throw new IllegalStateException("The pool " + poolName
+                                        + " does not exist.");
+      }
+      return pool;
+    }
+  }
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess#get(java.lang.Object, java.lang.Object)
+   */
+  public Object get(Object key, Object callbackArg, EntryEventImpl clientEvent) {
+    recordTXOperation(ServerRegionOperation.GET, key, callbackArg);
+    return GetOp.execute(this.pool, this.region, key, callbackArg, this.pool.getPRSingleHopEnabled(), clientEvent);
+  }
+
+  
+  
+  public int size() {
+    return SizeOp.execute(this.pool, this.regionName);
+  }
+  
+  /**
+   * Do not call this method if the value is Delta instance. Exclicitly passing
+   * <code>Operation.CREATE</code> to the <code>PutOp.execute()</code>
+   * method as the caller of this method does not put Delta instances as value.
+   * 
+   * @param key
+   * @param value
+   * @param event
+   * @param callbackArg
+   */
+  public Object putForMetaRegion(Object key,
+                  Object value,
+                  byte[] deltaBytes,
+                  EntryEventImpl event,
+                  Object callbackArg,
+                  boolean isMetaRegionPutOp)
+  {
+    if (this.region == null) {
+      return PutOp.execute(this.pool, this.regionName, key, value, deltaBytes, event,
+          Operation.CREATE,
+          false, null,
+          callbackArg, this.pool.getPRSingleHopEnabled(),
+          isMetaRegionPutOp);
+    } else {
+      return PutOp.execute(this.pool, this.region, key, value, deltaBytes,
+          event, Operation.CREATE, false, null, callbackArg, this.pool
+              .getPRSingleHopEnabled());
+    }
+  }
+
+  public Object put(Object key,
+                  Object value,
+                  byte[] deltaBytes,
+                  EntryEventImpl event,
+                  Operation op,
+                  boolean requireOldValue, Object expectedOldValue,
+                  Object callbackArg,
+                  boolean isCreate)
+  {
+    recordTXOperation(ServerRegionOperation.PUT, key, value, deltaBytes,
+        event.getEventId(), op, Boolean.valueOf(requireOldValue), expectedOldValue,
+        callbackArg, Boolean.valueOf(isCreate));
+    Operation operation = op;
+    if (!isCreate && this.region.getDataPolicy() == DataPolicy.EMPTY
+        && op.isCreate() && op != Operation.PUT_IF_ABSENT) {
+      operation = Operation.UPDATE;
+    }
+
+    if (this.region == null) {
+      return PutOp.execute(this.pool, this.regionName, key, value, deltaBytes,
+          event, operation, requireOldValue, expectedOldValue, callbackArg,
+          this.pool.getPRSingleHopEnabled(), false);
+    }
+    else {
+      return PutOp.execute(this.pool, this.region, key, value, deltaBytes,
+          event, operation, requireOldValue, expectedOldValue, callbackArg,
+          this.pool.getPRSingleHopEnabled());
+    }
+  }
+  
+  
+  /**
+   * Does a region put on the server using the given connection.
+   * @param con the connection to use to send to the server
+   * @param key the entry key to do the put on
+   * @param value the entry value to put
+   * @param eventId the event ID for this put
+   * @param callbackArg an optional callback arg to pass to any cache callbacks
+   */
+  public void putOnForTestsOnly(Connection con,
+                    Object key,
+                    Object value,
+                    EventID eventId,
+                    Object callbackArg)
+  {
+    EntryEventImpl event = new EntryEventImpl(eventId);
+    PutOp.execute(con, this.pool, this.regionName, key, value, event, callbackArg, this.pool.getPRSingleHopEnabled());
+  }
+  
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess#destroy(java.lang.Object, java.lang.Object, com.gemstone.gemfire.cache.Operation, com.gemstone.gemfire.internal.cache.EventID, java.lang.Object)
+   */
+  public Object destroy(Object key,
+                      Object expectedOldValue,
+                      Operation operation,
+                      EntryEventImpl event,
+                      Object callbackArg)
+  {
+    if (event.isBulkOpInProgress()) {
+      // this is a removeAll, ignore this!
+      return null;
+    }
+    recordTXOperation(ServerRegionOperation.DESTROY, key, expectedOldValue, operation, event.getEventId(), callbackArg);
+    return DestroyOp.execute(this.pool, this.region, key, expectedOldValue,
+        operation, event, callbackArg, this.pool.getPRSingleHopEnabled());
+    }
+  
+  
+  public void invalidate(EntryEventImpl event) {
+    recordTXOperation(ServerRegionOperation.INVALIDATE, event.getKey(), event);
+    InvalidateOp.execute(this.pool, this.region.getFullPath(),
+        event);
+  }
+  
+  
+  /**
+   * Does a region entry destroy on the server using the given connection.
+   * @param con the connection to use to send to the server
+   * @param key the entry key to do the destroy on
+   * @param expectedOldValue the value that the entry must have to perform the operation, or null
+   * @param operation the operation being performed (Operation.DESTROY, Operation.REMOVE)
+   * @param event the event for this destroy operation
+   * @param callbackArg an optional callback arg to pass to any cache callbacks
+   */
+  public void destroyOnForTestsOnly(Connection con,
+                        Object key,
+                        Object expectedOldValue,
+                        Operation operation,
+                        EntryEventImpl event,
+                        Object callbackArg)
+  {
+    DestroyOp.execute(con, this.pool, this.regionName, key, 
+        expectedOldValue, operation, event, callbackArg);
+  }
+  /**
+   * Does a region destroy on the server
+   * @param eventId the event id for this destroy
+   * @param callbackArg an optional callback arg to pass to any cache callbacks
+   */
+  public void destroyRegion(EventID eventId,
+                            Object callbackArg)
+  {
+    DestroyRegionOp.execute(this.pool, this.regionName, eventId, callbackArg);
+  }
+  /**
+   * Does a region destroy on the server using the given connection.
+   * @param con the connection to use to send to the server
+   * @param eventId the event id for this destroy
+   * @param callbackArg an optional callback arg to pass to any cache callbacks
+   */
+  public void destroyRegionOnForTestsOnly(Connection con,
+                              EventID eventId,
+                              Object callbackArg)
+  {
+    DestroyRegionOp.execute(con, this.pool, this.regionName, eventId, callbackArg);
+  }
+  
+  public TXCommitMessage commit(int txId) {
+    TXCommitMessage tx = CommitOp.execute(this.pool,txId);
+    return tx;
+  }
+
+  public void rollback(int txId) {
+    RollbackOp.execute(this.pool, txId);
+  }
+  
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess#clear(com.gemstone.gemfire.internal.cache.EventID, java.lang.Object)
+   */
+  public void clear(EventID eventId,
+                    Object callbackArg)
+  {
+    ClearOp.execute(this.pool, this.regionName, eventId, callbackArg);
+  }
+  
+  /**
+   * Does a region clear on the server using the given connection.
+   * @param con the connection to use to send to the server
+   * @param eventId the event id for this clear
+   * @param callbackArg an optional callback arg to pass to any cache callbacks
+   */
+  public void clearOnForTestsOnly(Connection con,
+                      EventID eventId,
+                      Object callbackArg)
+  {
+    ClearOp.execute(con, this.pool, this.regionName, eventId, callbackArg);
+  }
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess#containsKey(java.lang.Object)
+   */
+  public boolean containsKey(Object key) {
+    recordTXOperation(ServerRegionOperation.CONTAINS_KEY, key);
+    return ContainsKeyOp.execute(this.pool, this.regionName, key,MODE.KEY);
+  }
+  
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess#containsKey(java.lang.Object)
+   */
+  public boolean containsValueForKey(Object key) {
+    recordTXOperation(ServerRegionOperation.CONTAINS_VALUE_FOR_KEY, key);
+    return ContainsKeyOp.execute(this.pool, this.regionName, key,MODE.VALUE_FOR_KEY);
+  }
+  
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess#containsKey(java.lang.Object)
+   */
+  public boolean containsValue(Object value) {
+    recordTXOperation(ServerRegionOperation.CONTAINS_VALUE, null, value);
+    return ContainsKeyOp.execute(this.pool, this.regionName, value,MODE.VALUE);
+  }
+  
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess#keySet()
+   */
+  public Set keySet() {
+    recordTXOperation(ServerRegionOperation.KEY_SET, null);
+    return KeySetOp.execute(this.pool, this.regionName);
+  }
+  
+  /**
+   * Does a region registerInterest on a server
+   * @param key describes what we are interested in
+   * @param interestType the {@link InterestType} for this registration
+   * @param policy the interest result policy for this registration
+   * @param isDurable true if this registration is durable
+   * @param regionDataPolicy the data policy ordinal of the region
+   * @return list of keys
+   */
+  public List registerInterest(final Object key,
+                               final int interestType,
+                               final InterestResultPolicy policy,
+                               final boolean isDurable,
+                               final byte regionDataPolicy)
+  {
+    return registerInterest(key, interestType, policy, isDurable, false, regionDataPolicy);
+  }
+  
+  /**
+   * Does a region registerInterest on a server
+   * @param key describes what we are interested in
+   * @param interestType the {@link InterestType} for this registration
+   * @param policy the interest result policy for this registration
+   * @param isDurable true if this registration is durable
+   * @param receiveUpdatesAsInvalidates whether to act like notify-by-subscription is false.
+   * @param regionDataPolicy the data policy ordinal of the region
+   * @return list of keys
+   */
+  public List registerInterest(final Object key,
+                               final int interestType,
+                               final InterestResultPolicy policy,
+                               final boolean isDurable,
+                               final boolean receiveUpdatesAsInvalidates,
+                               final byte regionDataPolicy)
+  {
+    if (interestType == InterestType.KEY
+        && key instanceof List) {
+      return registerInterestList((List)key, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+    } else {
+      
+      final RegisterInterestTracker rit = this.pool.getRITracker();
+      List result = null;
+      boolean finished = false;
+      try {
+        // register with the tracker early
+        rit.addSingleInterest(this.region, key, interestType, policy, isDurable, receiveUpdatesAsInvalidates);
+        result = RegisterInterestOp.execute(this.pool, this.regionName, key,
+            interestType, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+        //////// TEST PURPOSE ONLY ///////////
+        if (PoolImpl.AFTER_REGISTER_CALLBACK_FLAG) {
+          BridgeObserver bo = BridgeObserverHolder.getInstance();
+          bo.afterInterestRegistration();
+        }
+        /////////////////////////////////////////
+        finished = true;
+        return result;
+      }
+      finally {
+        if (!finished) {
+          rit.removeSingleInterest(this.region, key, interestType,
+              isDurable, receiveUpdatesAsInvalidates);
+        }
+      }
+    }
+  }
+  /**
+   * Support for server-side interest registration
+   */
+  public void addSingleInterest(Object key, int interestType,
+      InterestResultPolicy pol, boolean isDurable, boolean receiveUpdatesAsInvalidates) {
+    RegisterInterestTracker rit = this.pool.getRITracker();
+    boolean finished = false;
+    try {
+      rit.addSingleInterest(this.region, key, interestType, pol,
+          isDurable, receiveUpdatesAsInvalidates);
+      finished = true;
+    }
+    finally {
+      if (!finished) {
+        rit.removeSingleInterest(this.region, key, interestType,
+            isDurable, receiveUpdatesAsInvalidates);
+      }
+    }
+  }
+  
+  public void addListInterest(List keys, InterestResultPolicy pol,
+      boolean isDurable, boolean receiveUpdatesAsInvalidates) {
+    RegisterInterestTracker rit = this.pool.getRITracker();
+    boolean finished = false;
+    try {
+      rit.addInterestList(this.region, keys, pol, isDurable,
+          receiveUpdatesAsInvalidates);
+      finished = true;
+    }
+    finally {
+      if (!finished) {
+        rit.removeInterestList(this.region, keys, isDurable,
+            receiveUpdatesAsInvalidates);
+      }
+    }
+  }
+
+  /**
+   * Support for server-side interest registration
+   */
+  public void removeSingleInterest(Object key, int interestType,
+      boolean isDurable, boolean receiveUpdatesAsInvalidates) {
+    this.pool.getRITracker()
+      .removeSingleInterest(this.region, key, interestType,
+          isDurable, receiveUpdatesAsInvalidates);
+  }
+  
+  public void removeListInterest(List keys, boolean isDurable,
+      boolean receiveUpdatesAsInvalidates) {
+    this.pool.getRITracker().removeInterestList(this.region, keys, isDurable,
+        receiveUpdatesAsInvalidates);
+  }
+  
+  /**
+   * Does a region registerInterest on a server described by the given server location
+   * <p>Note that this call by-passes the RegisterInterestTracker.
+   * @param sl the server to do the register interest on.
+   * @param key describes what we are interested in
+   * @param interestType the {@link InterestType} for this registration
+   * @param policy the interest result policy for this registration
+   * @param isDurable true if this registration is durable
+   * @param regionDataPolicy the data policy ordinal of the region
+   * @return list of keys
+   */
+  public List registerInterestOn(ServerLocation sl,
+                                 final Object key,
+                                 final int interestType,
+                                 final InterestResultPolicy policy,
+                                 final boolean isDurable,
+                                 final byte regionDataPolicy)
+  {
+    return registerInterestOn(sl, key, interestType, policy, isDurable, false, regionDataPolicy);
+  }
+  /**
+   * Does a region registerInterest on a server described by the given server location
+   * <p>Note that this call by-passes the RegisterInterestTracker.
+   * @param sl the server to do the register interest on.
+   * @param key describes what we are interested in
+   * @param interestType the {@link InterestType} for this registration
+   * @param policy the interest result policy for this registration
+   * @param isDurable true if this registration is durable
+   * @param receiveUpdatesAsInvalidates whether to act like notify-by-subscription is false.
+   * @param regionDataPolicy the data policy ordinal of the region
+   * @return list of keys
+   */
+  public List registerInterestOn(ServerLocation sl,
+                                 final Object key,
+                                 final int interestType,
+                                 final InterestResultPolicy policy,
+                                 final boolean isDurable,
+                                 final boolean receiveUpdatesAsInvalidates,
+                                 final byte regionDataPolicy)
+  {
+    if (interestType == InterestType.KEY
+        && key instanceof List) {
+      return RegisterInterestListOp.executeOn(sl, this.pool, this.regionName,
+          (List)key, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+    } else {
+      return RegisterInterestOp.executeOn(sl, this.pool, this.regionName, key,
+          interestType, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+    }
+  }
+  
+  /**
+   * Does a region registerInterest on a server described by the given connection
+   * <p>Note that this call by-passes the RegisterInterestTracker.
+   * @param conn the connection to do the register interest on.
+   * @param key describes what we are interested in
+   * @param interestType the {@link InterestType} for this registration
+   * @param policy the interest result policy for this registration
+   * @param isDurable true if this registration is durable
+   * @param regionDataPolicy the data policy ordinal of the region
+   * @return list of keys
+   */
+  public List registerInterestOn(Connection conn,
+                                 final Object key,
+                                 final int interestType,
+                                 final InterestResultPolicy policy,
+                                 final boolean isDurable,
+                                 final byte regionDataPolicy)
+  {
+    return registerInterestOn(conn, key, interestType, policy, isDurable, false, regionDataPolicy); 
+  }
+  /**
+   * Does a region registerInterest on a server described by the given connection
+   * <p>Note that this call by-passes the RegisterInterestTracker.
+   * @param conn the connection to do the register interest on.
+   * @param key describes what we are interested in
+   * @param interestType the {@link InterestType} for this registration
+   * @param policy the interest result policy for this registration
+   * @param isDurable true if this registration is durable
+   * @param receiveUpdatesAsInvalidates whether to act like notify-by-subscription is false.
+   * @param regionDataPolicy the data policy ordinal of the region
+   * @return list of keys
+   */
+  public List registerInterestOn(Connection conn,
+                                 final Object key,
+                                 final int interestType,
+                                 final InterestResultPolicy policy,
+                                 final boolean isDurable,
+                                 final boolean receiveUpdatesAsInvalidates,
+                                 final byte regionDataPolicy)
+  {
+    if (interestType == InterestType.KEY
+        && key instanceof List) {
+      return RegisterInterestListOp.executeOn(conn, this.pool, this.regionName,
+          (List)key, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+    } else {
+      return RegisterInterestOp.executeOn(conn, this.pool, this.regionName,
+          key, interestType, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+    }
+  }
+    
+  
+  
+  /**
+   * Does a region registerInterestList on a server
+   * @param keys list of keys we are interested in
+   * @param policy the interest result policy for this registration
+   * @param isDurable true if this registration is durable
+   * @param regionDataPolicy the data policy ordinal of the region
+   * @return list of keys
+   */
+  public List registerInterestList(List keys,
+                                   InterestResultPolicy policy,
+                                   boolean isDurable,
+                                   boolean receiveUpdatesAsInvalidates,
+                                   final byte regionDataPolicy)
+  {
+    final RegisterInterestTracker rit = this.pool.getRITracker();
+    List result = null;
+    boolean finished = false;
+    try {
+      // register with the tracker early
+      rit.addInterestList(this.region, keys, policy, isDurable, receiveUpdatesAsInvalidates);
+      result = RegisterInterestListOp.execute(this.pool, this.regionName, keys, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+      finished = true;
+      //////// TEST PURPOSE ONLY ///////////
+      if (PoolImpl.AFTER_REGISTER_CALLBACK_FLAG) {
+        BridgeObserver bo = BridgeObserverHolder.getInstance();
+        bo.afterInterestRegistration();
+      }
+      /////////////////////////////////////////
+      return result;
+    }
+    finally {
+      if (!finished) {
+        rit.removeInterestList(this.region, keys, isDurable, receiveUpdatesAsInvalidates);
+      }
+    }
+  }
+  /**
+   * Does a region unregisterInterest on a server
+   * @param key describes what we are no longer interested in
+   * @param interestType the {@link InterestType} for this unregister
+   * @param isClosing true if this unregister is done by a close
+   * @param keepAlive true if this unregister should not undo a durable registration
+   */
+  public void unregisterInterest(Object key,
+                                 int interestType,
+                                 boolean isClosing,
+                                 boolean keepAlive)
+  {
+    if (interestType == InterestType.KEY
+        && key instanceof List) {
+      unregisterInterestList((List)key, isClosing, keepAlive);
+    } else {
+      RegisterInterestTracker rit = this.pool.getRITracker();
+      boolean removed =
+        rit.removeSingleInterest(this.region, key, interestType, false, false) ||
+        rit.removeSingleInterest(this.region, key, interestType, true, false) ||
+        rit.removeSingleInterest(this.region, key, interestType, false, true) ||
+        rit.removeSingleInterest(this.region, key, interestType, true, true);
+      if (removed) {
+        UnregisterInterestOp.execute(this.pool, this.regionName, key, interestType, isClosing, keepAlive);
+      }
+    }
+  }
+  /**
+   * Does a region unregisterInterestList on a server
+   * @param keys list of keys we are interested in
+   * @param isClosing true if this unregister is done by a close
+   * @param keepAlive true if this unregister should not undo a durable registration
+   */
+  public void unregisterInterestList(List keys,
+                                     boolean isClosing,
+                                     boolean keepAlive)
+  {
+    RegisterInterestTracker rit = this.pool.getRITracker();
+    boolean removed =
+      rit.removeInterestList(this.region, keys, false, true) ||
+      rit.removeInterestList(this.region, keys, false, false) ||
+      rit.removeInterestList(this.region, keys, true, true) ||
+      rit.removeInterestList(this.region, keys, true, false);
+    if (removed) {
+      UnregisterInterestListOp.execute(this.pool, this.regionName, keys, isClosing, keepAlive);
+    }
+  }
+  public List getInterestList(int interestType) {
+    return this.pool.getRITracker().getInterestList(this.regionName,
+                                                    interestType);
+  }
+
+  @Override
+  public VersionedObjectList putAll(Map map, EventID eventId, boolean skipCallbacks, Object callbackArg) {
+    recordTXOperation(ServerRegionOperation.PUT_ALL, null, map, eventId);
+    int txID = TXManagerImpl.getCurrentTXUniqueId();
+    if (this.pool.getPRSingleHopEnabled() && (txID == TXManagerImpl.NOTX)) {
+      return PutAllOp.execute(this.pool, this.region, map, eventId, skipCallbacks, this.pool.getRetryAttempts(), callbackArg);
+    }
+    else {
+      return PutAllOp.execute(this.pool, this.region, map, eventId, skipCallbacks, false, callbackArg);
+    }
+  }
+  
+  @Override
+  public VersionedObjectList removeAll(Collection<Object> keys, EventID eventId, Object callbackArg) {
+    recordTXOperation(ServerRegionOperation.REMOVE_ALL, null, keys, eventId);
+    int txID = TXManagerImpl.getCurrentTXUniqueId();
+    if (this.pool.getPRSingleHopEnabled() && (txID == TXManagerImpl.NOTX)) {
+      return RemoveAllOp.execute(this.pool, this.region, keys, eventId, this.pool.getRetryAttempts(), callbackArg);
+    }
+    else {
+      return RemoveAllOp.execute(this.pool, this.region, keys, eventId, false, callbackArg);
+    }
+  }
+  
+  
+  @Override
+  public VersionedObjectList getAll(List keys, Object callback) {
+    recordTXOperation(ServerRegionOperation.GET_ALL, null, keys);
+    int txID = TXManagerImpl.getCurrentTXUniqueId();
+    VersionedObjectList result;
+    if (this.pool.getPRSingleHopEnabled() && (txID == TXManagerImpl.NOTX)) {
+      result = GetAllOp.execute(this.pool, this.region, keys,this.pool.getRetryAttempts(), callback);
+    }
+    else {
+      result = GetAllOp.execute(this.pool, this.regionName, keys, callback);
+    }
+    if (result != null) {
+      for (Iterator it=result.iterator(); it.hasNext(); ) {
+        VersionedObjectList.Entry entry = it.next();
+        Object key = entry.getKey();
+        Object value = entry.getValue();
+        boolean isOnServer = entry.isKeyNotOnServer();
+        if (!isOnServer) {
+          if (value instanceof Throwable) {
+            logger.warn(LocalizedMessage.create(
+                    LocalizedStrings.GetAll_0_CAUGHT_THE_FOLLOWING_EXCEPTION_ATTEMPTING_TO_GET_VALUE_FOR_KEY_1,
+                    new Object[]{value, key}), (Throwable)value);
+          } 
+        }
+      }
+    }
+    return result;
+  }
+    
+  /**
+   * Release use of this pool
+   */
+  public void detach(boolean keepalive) {
+    this.pool.getRITracker().unregisterRegion(this, keepalive);
+    super.detach();
+  }
+  public String getRegionName() {
+    return this.regionName;
+  }
+  
+  public Region getRegion() {
+    return this.region;
+  }
+    
+  public void executeFunction(String rgnName, Function function,
+      ServerRegionFunctionExecutor serverRegionExecutor,
+      ResultCollector resultCollector, byte hasResult, boolean replaying) {
+    
+    recordTXOperation(ServerRegionOperation.EXECUTE_FUNCTION, null, Integer.valueOf(1),
+          function, serverRegionExecutor, resultCollector,
+          Byte.valueOf(hasResult));
+
+    int retryAttempts = pool.getRetryAttempts();
+
+    if (this.pool.getPRSingleHopEnabled()) {
+      ClientMetadataService cms = region.getCache()
+      .getClientMetadataService();
+      if (serverRegionExecutor.getFilter().isEmpty()) {
+        HashMap<ServerLocation, HashSet<Integer>> serverToBuckets = cms
+        .groupByServerToAllBuckets(this.region, function.optimizeForWrite());
+        if (serverToBuckets == null || serverToBuckets.isEmpty()) {
+          ExecuteRegionFunctionOp.execute(this.pool, rgnName, function,
+              serverRegionExecutor, resultCollector, hasResult, retryAttempts);
+          cms.scheduleGetPRMetaData(region, false);
+        }
+        else {
+          ExecuteRegionFunctionSingleHopOp.execute(this.pool, this.region,
+              function, serverRegionExecutor, resultCollector, hasResult,
+              serverToBuckets, retryAttempts, true);
+        }
+      }
+      else {
+        boolean isBucketFilter = serverRegionExecutor.getExecuteOnBucketSetFlag();
+        Map<ServerLocation, HashSet> serverToFilterMap = cms
+        .getServerToFilterMap(serverRegionExecutor.getFilter(), region,
+            function.optimizeForWrite(), isBucketFilter);
+        if (serverToFilterMap == null || serverToFilterMap.isEmpty()) {         
+          ExecuteRegionFunctionOp.execute(this.pool, rgnName, function,
+              serverRegionExecutor, resultCollector, hasResult, retryAttempts);
+            cms.scheduleGetPRMetaData(region, false);          
+        }
+        else {
+          //Asif: In case of withBucketFilter , the serverToFilterMap is nothing but
+          // serverToBucketsMap, so allBuckets flag should be true in that case 
+          // so allBuckets flag is governed by isBucketFilter flag
+          ExecuteRegionFunctionSingleHopOp.execute(this.pool, this.region,
+              function, serverRegionExecutor, resultCollector, hasResult,
+              serverToFilterMap, retryAttempts, isBucketFilter);
+          
+         
+        }
+      }
+    }
+    else {
+      ExecuteRegionFunctionOp.execute(this.pool, rgnName, function,
+          serverRegionExecutor, resultCollector, hasResult, retryAttempts);
+    }
+  }
+
+
+  public void executeFunction(String rgnName, String functionId,
+      ServerRegionFunctionExecutor serverRegionExecutor,
+      ResultCollector resultCollector, byte hasResult, boolean isHA, boolean optimizeForWrite,
+      boolean replaying) {
+
+    recordTXOperation(ServerRegionOperation.EXECUTE_FUNCTION, null, Integer.valueOf(2), 
+          functionId, serverRegionExecutor, resultCollector, Byte.valueOf(hasResult),
+          Boolean.valueOf(isHA), Boolean.valueOf(optimizeForWrite));
+    
+    int retryAttempts = pool.getRetryAttempts();
+
+    if (this.pool.getPRSingleHopEnabled()) {
+      ClientMetadataService cms = this.region.getCache()
+      .getClientMetadataService();
+      if (serverRegionExecutor.getFilter().isEmpty()) {
+        HashMap<ServerLocation, HashSet<Integer>> serverToBuckets = cms
+        .groupByServerToAllBuckets(this.region, optimizeForWrite);
+        if (serverToBuckets == null || serverToBuckets.isEmpty()) {
+          ExecuteRegionFunctionOp.execute(this.pool, rgnName, functionId,
+              serverRegionExecutor, resultCollector, hasResult, retryAttempts, isHA, optimizeForWrite);
+          cms.scheduleGetPRMetaData(this.region, false);
+        }
+        else {
+          ExecuteRegionFunctionSingleHopOp.execute(this.pool, this.region,
+              functionId, serverRegionExecutor, resultCollector, hasResult,
+              serverToBuckets, retryAttempts, true, isHA, optimizeForWrite);
+        }
+      }
+      else {
+        boolean isBucketsAsFilter = serverRegionExecutor.getExecuteOnBucketSetFlag();
+        Map<ServerLocation, HashSet> serverToFilterMap = cms
+        .getServerToFilterMap(serverRegionExecutor.getFilter(), region,
+            optimizeForWrite, isBucketsAsFilter);
+        if (serverToFilterMap == null || serverToFilterMap.isEmpty()) {         
+          ExecuteRegionFunctionOp.execute(this.pool, rgnName, functionId,
+              serverRegionExecutor, resultCollector, hasResult, retryAttempts, isHA, optimizeForWrite);
+            cms.scheduleGetPRMetaData(region, false);          
+        }
+        else {
+          ExecuteRegionFunctionSingleHopOp.execute(this.pool, this.region,
+              functionId, serverRegionExecutor, resultCollector, hasResult,
+              serverToFilterMap, retryAttempts, false, isHA, optimizeForWrite);
+        }
+      }
+    }
+    else {
+      ExecuteRegionFunctionOp.execute(this.pool, rgnName, functionId,
+          serverRegionExecutor, resultCollector, hasResult, retryAttempts, isHA, optimizeForWrite);
+    }
+  }
+
+  
+  public void executeFunctionNoAck(String rgnName, Function function,
+      ServerRegionFunctionExecutor serverRegionExecutor, byte hasResult, boolean replaying) {
+    recordTXOperation(ServerRegionOperation.EXECUTE_FUNCTION, null, Integer.valueOf(3),
+          function, serverRegionExecutor, Byte.valueOf(hasResult));
+    ExecuteRegionFunctionNoAckOp.execute(this.pool, rgnName, function,
+        serverRegionExecutor, hasResult);
+  }    
+  
+  public void executeFunctionNoAck(String rgnName, String functionId,
+      ServerRegionFunctionExecutor serverRegionExecutor, byte hasResult,
+      boolean isHA, boolean optimizeForWrite, boolean replaying) {
+    recordTXOperation(ServerRegionOperation.EXECUTE_FUNCTION, null, Integer.valueOf(4),
+          functionId, serverRegionExecutor, Byte.valueOf(hasResult));
+    ExecuteRegionFunctionNoAckOp.execute(this.pool, rgnName, functionId,
+        serverRegionExecutor, hasResult, isHA,  optimizeForWrite);
+  } 
+
+  public Entry getEntry(Object key) {
+    recordTXOperation(ServerRegionOperation.GET_ENTRY, key);
+    return (Entry) GetEntryOp.execute(pool, region, key);
+  }
+
+  
+  /**
+   * Transaction synchronization notification to the servers
+   * @see com.gemstone.gemfire.internal.cache.tx.ClientTXStateStub#beforeCompletion()
+   */
+  public void beforeCompletion(int txId) {
+    TXSynchronizationOp.execute(pool, 0, txId, TXSynchronizationOp.CompletionType.BEFORE_COMPLETION);
+  }
+  
+  /**
+   * Transaction synchronization notification to the servers
+   * @param status
+   * @return the server's TXCommitMessage
+   * @see com.gemstone.gemfire.internal.cache.tx.ClientTXStateStub#afterCompletion(int)
+   */
+  public TXCommitMessage afterCompletion(int status, int txId) {
+    return TXSynchronizationOp.execute(pool, status, txId, TXSynchronizationOp.CompletionType.AFTER_COMPLETION);
+  }
+
+  public byte[] getFunctionAttributes(String functionId){
+    return (byte[])GetFunctionAttributeOp.execute(this.pool, functionId);
+  }
+  
+  /** test hook */
+  private void recordTXOperation(ServerRegionOperation op, Object key, Object... arguments) {
+    if (ClientTXStateStub.transactionRecordingEnabled()) {
+      TXStateProxy tx = TXManagerImpl.getCurrentTXState();
+      if (tx == null) {
+        return;
+      }
+      tx.recordTXOperation(this, op, key, arguments);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopClientExecutor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopClientExecutor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopClientExecutor.java
new file mode 100644
index 0000000..ada3ab2
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopClientExecutor.java
@@ -0,0 +1,387 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireException;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.client.internal.GetAllOp.GetAllOpImpl;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PutAllPartialResultException;
+import com.gemstone.gemfire.internal.cache.execute.InternalFunctionInvocationTargetException;
+import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+public class SingleHopClientExecutor {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  static final ExecutorService execService = Executors
+      .newCachedThreadPool(new ThreadFactory() {
+        AtomicInteger threadNum = new AtomicInteger();
+
+        public Thread newThread(final Runnable r) {
+          Thread result = new Thread(r, "Function Execution Thread-"
+              + threadNum.incrementAndGet());
+          result.setDaemon(true);
+          return result;
+        }
+      });
+
+  static void submitAll(List callableTasks) {
+    if (callableTasks != null && !callableTasks.isEmpty()) {
+      List futures = null;
+      try {
+        futures = execService.invokeAll(callableTasks);
+      }
+      catch (RejectedExecutionException rejectedExecutionEx) {
+        throw rejectedExecutionEx;
+      }
+      catch (InterruptedException e) {
+        throw new InternalGemFireException(e.getMessage());
+      }
+      if (futures != null) {
+        Iterator itr = futures.iterator();
+        while (itr.hasNext() && !execService.isShutdown()
+            && !execService.isTerminated()) {
+          Future fut = (Future)itr.next();
+          try {
+            fut.get();
+          }
+          catch (InterruptedException e) {
+            throw new InternalGemFireException(e.getMessage());
+          }
+          catch (ExecutionException ee) {
+            if (ee.getCause() instanceof FunctionException) {
+              throw (FunctionException)ee.getCause();
+            }
+            else if (ee.getCause() instanceof ServerOperationException) {
+              throw (ServerOperationException)ee.getCause();
+            }
+            else if (ee.getCause() instanceof ServerConnectivityException) {
+              throw (ServerConnectivityException)ee.getCause();
+            }
+            else {
+              throw executionThrowable(ee.getCause());
+            }
+          }
+        }
+      }
+    }
+  }
+
+  static boolean submitAllHA(List callableTasks, LocalRegion region,
+      ResultCollector rc, Set<String> failedNodes) {
+
+    ClientMetadataService cms = region.getCache()
+        .getClientMetadataService();
+    boolean reexecute = false;
+
+    if (callableTasks != null && !callableTasks.isEmpty()) {
+      List futures = null;
+      try {
+        futures = execService.invokeAll(callableTasks);
+      }
+      catch (RejectedExecutionException rejectedExecutionEx) {
+        throw rejectedExecutionEx;
+      }
+      catch (InterruptedException e) {
+        throw new InternalGemFireException(e.getMessage());
+      }
+      if (futures != null) {
+        Iterator futureItr = futures.iterator();
+        Iterator taskItr = callableTasks.iterator();
+        final boolean isDebugEnabled = logger.isDebugEnabled();
+        while (futureItr.hasNext() && !execService.isShutdown()
+            && !execService.isTerminated()) {
+          Future fut = (Future)futureItr.next();
+          SingleHopOperationCallable task = (SingleHopOperationCallable)taskItr.next();
+          ServerLocation server = task.getServer();
+          try {
+            fut.get();
+            if (isDebugEnabled) {
+              logger.debug("ExecuteRegionFunctionSingleHopOp#got result from {}", server);
+            }
+          }
+          catch (InterruptedException e) {
+            throw new InternalGemFireException(e.getMessage());
+          }
+          catch (ExecutionException ee) {
+            if (ee.getCause() instanceof InternalFunctionInvocationTargetException) {
+              if (isDebugEnabled) {
+                logger.debug("ExecuteRegionFunctionSingleHopOp#ExecutionException.InternalFunctionInvocationTargetException : Caused by :{}", ee.getCause());
+              }
+              try {
+                cms = region.getCache().getClientMetadataService();
+              }
+              catch (CacheClosedException e) {
+                return false;
+              }
+              cms.scheduleGetPRMetaData(region, false);
+              cms.removeBucketServerLocation(server);
+              reexecute = true;
+              failedNodes.addAll(((InternalFunctionInvocationTargetException)ee
+                  .getCause()).getFailedNodeSet());
+              rc.clearResults();
+            }
+            else if (ee.getCause() instanceof FunctionException) {
+              if (isDebugEnabled) {
+                logger.debug("ExecuteRegionFunctionSingleHopOp#ExecutionException.FunctionException : Caused by :{}", ee.getCause());
+              }
+              throw (FunctionException)ee.getCause();
+            }
+            else if (ee.getCause() instanceof ServerOperationException) {
+              if (isDebugEnabled) {
+                logger.debug("ExecuteRegionFunctionSingleHopOp#ExecutionException.ServerOperationException : Caused by :{}", ee.getCause());
+              }
+              throw (ServerOperationException)ee.getCause();
+            }
+            else if (ee.getCause() instanceof ServerConnectivityException) {
+              if (isDebugEnabled) {
+                logger.debug("ExecuteRegionFunctionSingleHopOp#ExecutionException.ServerConnectivityException : Caused by :{} The failed server is: {}", ee.getCause(), server);
+              }
+              try {
+                cms = region.getCache().getClientMetadataService();
+              }
+              catch (CacheClosedException e) {
+                return false;
+              }
+              cms.removeBucketServerLocation(server);
+              cms.scheduleGetPRMetaData(region, false);
+              reexecute = true;
+              rc.clearResults();
+            }
+            else {
+              throw executionThrowable(ee.getCause());
+            }
+          }
+        }
+      }
+    }
+    return reexecute;
+  }
+  
+  /**
+   * execute bulk op (putAll or removeAll) on multiple PR servers, returning a map of the results.
+   * Results are either a VersionedObjectList or a BulkOpPartialResultsException
+   * @param callableTasks
+   * @param cms
+   * @param region
+   * @param failedServers
+   * @return the per-server results
+   */
+  static Map<ServerLocation, Object> submitBulkOp(List callableTasks, ClientMetadataService cms, 
+      LocalRegion region, Map<ServerLocation,RuntimeException> failedServers) {
+    if (callableTasks != null && !callableTasks.isEmpty()) {
+      Map<ServerLocation, Object> resultMap = new HashMap<ServerLocation, Object>();
+      boolean anyPartialResults = false;
+      List futures = null;
+      try {
+        futures = execService.invokeAll(callableTasks);
+      }
+      catch (RejectedExecutionException rejectedExecutionEx) {
+        throw rejectedExecutionEx;
+      }
+      catch (InterruptedException e) {
+        throw new InternalGemFireException(e.getMessage());
+      }
+      if (futures != null) {
+        Iterator futureItr = futures.iterator();
+        Iterator taskItr = callableTasks.iterator();
+        RuntimeException rte = null;
+        final boolean isDebugEnabled = logger.isDebugEnabled();
+        while (futureItr.hasNext() && !execService.isShutdown()
+            && !execService.isTerminated()) {
+          Future fut = (Future)futureItr.next();
+          SingleHopOperationCallable task = (SingleHopOperationCallable)taskItr
+              .next();
+          ServerLocation server = task.getServer();
+          try {
+            VersionedObjectList versions = (VersionedObjectList)fut.get();
+            if (logger.isDebugEnabled()) {
+              logger.debug("submitBulkOp#got result from {}:{}",
+                  server, versions);
+            }
+            resultMap.put(server, versions);
+          }
+          catch (InterruptedException e) {
+            InternalGemFireException ige = new InternalGemFireException(e);
+            // only to make this server as failed server, not to throw right now
+            failedServers.put(server,  ige);
+            if (rte == null) {
+              rte = ige;
+            }
+          }
+          catch (ExecutionException ee) {
+            if (ee.getCause() instanceof ServerOperationException) {
+              if (logger.isDebugEnabled()) {
+                logger.debug("submitBulkOp#ExecutionException from server {}", server, ee);
+              }
+              ServerOperationException soe = (ServerOperationException)ee.getCause();
+              // only to make this server as failed server, not to throw right now
+              failedServers.put(server, soe);
+              if (rte == null) {
+                rte = soe;
+              }
+            }
+            else if (ee.getCause() instanceof ServerConnectivityException) {
+              if (logger.isDebugEnabled()) {
+                logger.debug("submitBulkOp#ExecutionException for server {}", server, ee);
+              }
+              cms = region.getCache().getClientMetadataService();
+              cms.removeBucketServerLocation(server);
+              cms.scheduleGetPRMetaData(region, false);
+              failedServers.put(server, (ServerConnectivityException)ee.getCause());
+            }
+            else {
+              Throwable t = ee.getCause();
+              if (t instanceof PutAllPartialResultException) {
+                resultMap.put(server, t);
+                anyPartialResults = true;
+                failedServers.put(server, (PutAllPartialResultException)t);
+              } else {
+                RuntimeException other_rte = executionThrowable(ee.getCause());
+                failedServers.put(server, other_rte);
+                if (rte == null) {
+                  rte = other_rte;
+                }
+              }
+            }
+          } // catch
+        } // while
+        // if there are any partial results we suppress throwing an exception
+        // so the partial results can be processed
+        if (rte != null && !anyPartialResults) {
+          throw rte;
+        }
+      }
+      return resultMap;
+    }
+    return null;
+  }
+  
+  static Map<ServerLocation, Object> submitGetAll(
+      Map<ServerLocation, HashSet> serverToFilterMap, List callableTasks,
+      ClientMetadataService cms, LocalRegion region) {
+
+    if (callableTasks != null && !callableTasks.isEmpty()) {
+      Map<ServerLocation, Object> resultMap = new HashMap<ServerLocation, Object>();
+      List futures = null;
+      try {
+        futures = execService.invokeAll(callableTasks);
+      }
+      catch (RejectedExecutionException rejectedExecutionEx) {
+        throw rejectedExecutionEx;
+      }
+      catch (InterruptedException e) {
+        throw new InternalGemFireException(e.getMessage());
+      }
+      if (futures != null) {
+        Iterator futureItr = futures.iterator();
+        Iterator taskItr = callableTasks.iterator();
+        while (futureItr.hasNext() && !execService.isShutdown()
+            && !execService.isTerminated()) {
+          Future fut = (Future)futureItr.next();
+          SingleHopOperationCallable task = (SingleHopOperationCallable)taskItr
+              .next();
+          List keys = ((GetAllOpImpl)task.getOperation()).getKeyList();
+          ServerLocation server = task.getServer();
+          try {
+
+            VersionedObjectList valuesFromServer = (VersionedObjectList)fut.get();
+            valuesFromServer.setKeys(keys);
+
+            for (VersionedObjectList.Iterator it=valuesFromServer.iterator(); it.hasNext(); ) {
+              VersionedObjectList.Entry entry = it.next();
+              Object key = entry.getKey();
+              Object value = entry.getValue();
+              if (!entry.isKeyNotOnServer()) {
+                if (value instanceof Throwable) {
+                  logger.warn(LocalizedMessage.create(
+                    LocalizedStrings.GetAll_0_CAUGHT_THE_FOLLOWING_EXCEPTION_ATTEMPTING_TO_GET_VALUE_FOR_KEY_1,
+                    new Object[]{value, key}), (Throwable)value);
+                } 
+              }
+            }
+            if (logger.isDebugEnabled()) {
+              logger.debug("GetAllOp#got result from {}: {}", server, valuesFromServer);
+            }
+            resultMap.put(server, valuesFromServer);
+          }
+          catch (InterruptedException e) {
+            throw new InternalGemFireException(e.getMessage());
+          }
+          catch (ExecutionException ee) {
+            if (ee.getCause() instanceof ServerOperationException) {
+              if (logger.isDebugEnabled()) {
+                logger.debug("GetAllOp#ExecutionException.ServerOperationException : Caused by :{}", ee.getCause());
+              }
+              throw (ServerOperationException)ee.getCause();
+            }
+            else if (ee.getCause() instanceof ServerConnectivityException) {
+              if (logger.isDebugEnabled()) {
+                logger.debug("GetAllOp#ExecutionException.ServerConnectivityException : Caused by :{} The failed server is: {}", ee.getCause(), server);
+              }
+              try {
+                cms = region.getCache()
+                    .getClientMetadataService();
+              }
+              catch (CacheClosedException e) {
+                return null;
+              }
+              cms.removeBucketServerLocation(server);
+              cms.scheduleGetPRMetaData((LocalRegion)region, false);
+              resultMap.put(server, ee.getCause());
+            }
+            else {
+              throw executionThrowable(ee.getCause());
+            }
+          }
+        }
+        return resultMap;
+      }
+    }
+    return null;
+  }
+  
+  static void submitTask(Runnable task) {
+    execService.submit(task);
+  }
+
+  // Find out what exception to throw?
+  private static RuntimeException executionThrowable(Throwable t) {
+    if (t instanceof RuntimeException)
+      return (RuntimeException)t;
+    else if (t instanceof Error)
+      throw (Error)t;
+    else
+      throw new IllegalStateException("Don't know", t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopOperationCallable.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopOperationCallable.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopOperationCallable.java
new file mode 100644
index 0000000..18de8b7
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopOperationCallable.java
@@ -0,0 +1,74 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.concurrent.Callable;
+
+import com.gemstone.gemfire.cache.client.AllConnectionsInUseException;
+import com.gemstone.gemfire.cache.client.internal.ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl;
+import com.gemstone.gemfire.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+/**
+ * 
+ * @author ymahajan
+ *
+ */
+public class SingleHopOperationCallable implements Callable {
+
+  final private ServerLocation server;
+
+  final private PoolImpl pool;
+
+  final private AbstractOp op;
+
+  final private UserAttributes securityAttributes;
+
+  public SingleHopOperationCallable(ServerLocation server, PoolImpl pool,
+      AbstractOp op, UserAttributes securityAttributes) {
+    this.server = server;
+    this.pool = pool;
+    this.op = op;
+    this.securityAttributes = securityAttributes;
+  }
+
+  public Object call() throws Exception {
+    op.initMessagePart();
+    Object result = null;
+    boolean onlyUseExistingCnx = ((pool.getMaxConnections() != -1 && pool
+        .getConnectionCount() >= pool.getMaxConnections()) ? true : false);
+    try {
+      UserAttributes.userAttributes.set(securityAttributes);
+      result = this.pool.executeOn(server, op, true, onlyUseExistingCnx);
+    }
+    catch (AllConnectionsInUseException ex) {
+      // if we reached connection limit and don't have available connection to
+      // that server,then execute function on one of the connections available
+      // from other servers instead of creating new connection to the original
+      // server
+      if (op instanceof ExecuteRegionFunctionSingleHopOpImpl){
+        ExecuteRegionFunctionSingleHopOpImpl newop = (ExecuteRegionFunctionSingleHopOpImpl)op;
+        result = this.pool.execute(new ExecuteRegionFunctionOpImpl(newop));
+      }else {
+        result = this.pool.execute(this.op);
+      }
+    }
+    finally {
+      UserAttributes.userAttributes.set(null);
+    }
+    return result;
+  }
+  
+  public ServerLocation getServer() {
+    return this.server;
+  }
+  
+  public AbstractOp getOperation() {
+    return this.op;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SizeOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SizeOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SizeOp.java
new file mode 100644
index 0000000..2c446d8
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SizeOp.java
@@ -0,0 +1,83 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Does a region size on a server
+ * @author gregp
+ * @since 6.6
+ */
+public class SizeOp {
+  /**
+   * Does a region size on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the entry keySet on
+   */
+  public static Integer execute(InternalPool pool,
+                            String region)
+  {
+    AbstractOp op = new SizeOpImpl(region);
+    return (Integer)pool.execute(op);
+  }
+                                                               
+  private SizeOp() {
+    // no instances allowed
+  }
+  
+  private static class SizeOpImpl extends AbstractOp {
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public SizeOpImpl(String region) {
+      super(MessageType.SIZE, 1);
+      getMessage().addStringPart(region);
+    }
+
+    @Override  
+    protected Object processResponse(Message msg) throws Exception {
+      
+      return processObjResponse(msg, "size");
+    }
+    @Override  
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.SIZE_ERROR;
+    }
+    @Override  
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startSize();
+    }
+    @Override  
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endSizeSend(start, hasFailed());
+    }
+    @Override  
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endSize(start, hasTimedOut(), hasFailed());
+    }
+
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXFailoverOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXFailoverOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXFailoverOp.java
new file mode 100644
index 0000000..417d32c
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXFailoverOp.java
@@ -0,0 +1,84 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Indicates to the server that a transaction is
+ * failing over to this server. The server then
+ * performs the necessary bootstrapping for the tx.
+ * @author sbawaska
+ * @since 6.6
+ */
+public class TXFailoverOp {
+
+  public static void execute(ExecutablePool pool, int txId) {
+    pool.execute(new TXFailoverOpImpl(txId));
+  }
+  
+  private TXFailoverOp() {
+    // no instance
+  }
+  
+  private static class TXFailoverOpImpl extends AbstractOp {
+    int txId;
+
+    protected TXFailoverOpImpl(int txId) {
+      super(MessageType.TX_FAILOVER, 1);
+      getMessage().setTransactionId(txId);
+      this.txId = txId;
+    }
+    
+    @Override
+    public String toString() {
+      return "TXFailoverOp(txId="+this.txId+")";
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      processAck(msg, "txFailover");
+      return null;
+    }
+
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.EXCEPTION;
+    }
+
+    @Override  
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startTxFailover();
+    }
+    @Override  
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endTxFailoverSend(start, hasFailed());
+    }
+    @Override  
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endTxFailover(start, hasTimedOut(), hasFailed());
+    }
+
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXSynchronizationOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXSynchronizationOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXSynchronizationOp.java
new file mode 100644
index 0000000..092e6ce
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/TXSynchronizationOp.java
@@ -0,0 +1,154 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.GemFireException;
+import com.gemstone.gemfire.cache.CommitConflictException;
+import com.gemstone.gemfire.cache.SynchronizationCommitConflictException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.cache.TXCommitMessage;
+import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+
+/**
+ * TXSynchronizationOp sends JTA beforeCompletion and afterCompletion
+ * messages to the server pool.
+ * 
+ * @author bruce
+ *
+ */
+public class TXSynchronizationOp {
+
+  public static enum CompletionType {
+    BEFORE_COMPLETION, AFTER_COMPLETION
+  }
+
+  /**
+   * @param pool
+   * @param status - the status of an afterCompletion notification
+   * @param txId - the transaction identifier
+   * @param type - BEFORE_COMPLETION or AFTER_COMPLETION
+   * @return the server's commit message
+   */
+  public static TXCommitMessage execute(InternalPool pool, int status, int txId, CompletionType type) {
+    Impl impl = new Impl(status, txId, type);
+    pool.execute(impl);
+    return impl.tXCommitMessageResponse;
+  }
+  
+  static class Impl extends AbstractOp {
+
+    private int status;
+    private CompletionType type;
+    TXCommitMessage tXCommitMessageResponse;
+
+    /**
+     * @param status
+     * @param type
+     */
+    public Impl(int status, int txId, CompletionType type) {
+      super(MessageType.TX_SYNCHRONIZATION, (type==CompletionType.AFTER_COMPLETION)? 3 : 2);
+      this.status = status;
+      this.type = type;
+      getMessage().addIntPart(type.ordinal());
+      getMessage().addIntPart(txId);
+      if (type == CompletionType.AFTER_COMPLETION) {
+        getMessage().addIntPart(status);
+      }
+    }
+    
+    @Override
+    public String toString() {
+      return "TXSynchronization(threadTxId=" + TXManagerImpl.getCurrentTXUniqueId()
+      +"; "+this.type + "; status=" + this.status + ")";
+    }
+
+    @Override
+  protected void processAck(Message msg, String opName)
+    throws Exception
+  {
+    final int msgType = msg.getMessageType();
+    if (msgType == MessageType.REPLY) {
+      return;
+    } else {
+      Part part = msg.getPart(0);
+      if (msgType == MessageType.EXCEPTION) {
+        Throwable t = (Throwable) part.getObject();
+        if (t instanceof CommitConflictException ||
+            t instanceof SynchronizationCommitConflictException) {
+          throw (GemFireException)t;
+        }
+      }
+      super.processAck(msg, opName);
+    }
+  }
+
+    
+    /* (non-Javadoc)
+     * @see com.gemstone.gemfire.cache.client.internal.AbstractOp#processResponse(com.gemstone.gemfire.internal.cache.tier.sockets.Message)
+     */
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      if (this.type == CompletionType.BEFORE_COMPLETION) {
+        try {
+          processAck(msg, type.toString());
+        } catch (ServerOperationException e) {
+          if (e.getCause() instanceof SynchronizationCommitConflictException) {
+            throw (SynchronizationCommitConflictException)e.getCause();
+          }
+        }
+        return null;
+      } else {
+        TXCommitMessage rcs = (TXCommitMessage)processObjResponse(msg, this.type.toString());
+        this.tXCommitMessageResponse = rcs;
+        return rcs;
+      }
+    }
+
+    /* (non-Javadoc)
+     * @see com.gemstone.gemfire.cache.client.internal.AbstractOp#isErrorResponse(int)
+     */
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.REQUESTDATAERROR;
+    }
+
+    @Override  
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startTxSynchronization();
+    }
+    @Override  
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endTxSynchronizationSend(start, hasFailed());
+    }
+    @Override  
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endTxSynchronization(start, hasTimedOut(), hasFailed());
+    }
+
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/UnregisterInterestListOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/UnregisterInterestListOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/UnregisterInterestListOp.java
new file mode 100644
index 0000000..4c30f6c
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/UnregisterInterestListOp.java
@@ -0,0 +1,91 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Does a region unregisterInterestList on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class UnregisterInterestListOp {
+  /**
+   * Does a region unregisterInterestList on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the unregisterInterestList on
+   * @param keys list of keys we are interested in
+   * @param isClosing true if this unregister is done by a close
+   * @param keepAlive true if this unregister should not undo a durable registration
+   */
+  public static void execute(ExecutablePool pool,
+                             String region,
+                             List keys,
+                             boolean isClosing,
+                             boolean keepAlive)
+  {
+    AbstractOp op = new UnregisterInterestListOpImpl(region, keys, isClosing, keepAlive);
+    pool.executeOnAllQueueServers(op);
+  }
+                                                               
+  private UnregisterInterestListOp() {
+    // no instances allowed
+  }
+  
+  private static class UnregisterInterestListOpImpl extends AbstractOp {
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public UnregisterInterestListOpImpl(String region,
+                                        List keys,
+                                        boolean isClosing,
+                                        boolean keepAlive) {
+      super(MessageType.UNREGISTER_INTEREST_LIST, 4+keys.size());
+      getMessage().addStringPart(region);
+      {
+        byte closingByte = (byte)(isClosing ? 0x01 : 0x00);
+        getMessage().addBytesPart(new byte[] {closingByte});
+      }
+      {
+        byte keepAliveByte = (byte)(keepAlive ? 0x01 : 0x00);
+        getMessage().addBytesPart(new byte[] {keepAliveByte});
+      }
+      getMessage().addIntPart(keys.size());
+      for (Iterator i = keys.iterator(); i.hasNext();) {
+        getMessage().addStringOrObjPart(i.next());
+      }
+    }
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      processAck(msg, "unregisterInterestList");
+      return null;
+    }
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.UNREGISTER_INTEREST_DATA_ERROR;
+    }
+    // using UnregisterInterest stats
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startUnregisterInterest();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endUnregisterInterestSend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endUnregisterInterest(start, hasTimedOut(), hasFailed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/UnregisterInterestOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/UnregisterInterestOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/UnregisterInterestOp.java
new file mode 100644
index 0000000..1e89b13
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/UnregisterInterestOp.java
@@ -0,0 +1,89 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.InterestType;
+
+/**
+ * Does a region unregisterInterest on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class UnregisterInterestOp {
+  /**
+   * Does a region unregisterInterest on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the unregisterInterest on
+   * @param key describes what we are no longer interested in
+   * @param interestType the {@link InterestType} for this unregister
+   * @param isClosing true if this unregister is done by a close
+   * @param keepAlive true if this unregister should not undo a durable registration
+   */
+  public static void execute(ExecutablePool pool,
+                             String region,
+                             Object key,
+                             int interestType,
+                             boolean isClosing,
+                             boolean keepAlive)
+  {
+    AbstractOp op = new UnregisterInterestOpImpl(region, key, interestType, isClosing, keepAlive);
+    pool.executeOnAllQueueServers(op);
+  }
+                                                               
+  private UnregisterInterestOp() {
+    // no instances allowed
+  }
+  
+  private static class UnregisterInterestOpImpl extends AbstractOp {
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public UnregisterInterestOpImpl(String region,
+                                    Object key,
+                                    int interestType,
+                                    boolean isClosing,
+                                    boolean keepAlive) {
+      super(MessageType.UNREGISTER_INTEREST, 5);
+      getMessage().addStringPart(region);
+      getMessage().addIntPart(interestType);
+      getMessage().addStringOrObjPart(key);
+      {
+        byte closingByte = (byte)(isClosing ? 0x01 : 0x00);
+        getMessage().addBytesPart(new byte[] {closingByte});
+      }
+      {
+        byte keepAliveByte = (byte)(keepAlive ? 0x01 : 0x00);
+        getMessage().addBytesPart(new byte[] {keepAliveByte});
+      }
+    }
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      processAck(msg, "unregisterInterest");
+      return null;
+    }
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.UNREGISTER_INTEREST_DATA_ERROR;
+    }
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startUnregisterInterest();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endUnregisterInterestSend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endUnregisterInterest(start, hasTimedOut(), hasFailed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/UserAttributes.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/UserAttributes.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/UserAttributes.java
new file mode 100755
index 0000000..5c959e9
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/UserAttributes.java
@@ -0,0 +1,50 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+
+public class UserAttributes {
+
+  private Properties credentials;
+  // Update this whenever we lose/add a server.
+  private ConcurrentHashMap<ServerLocation, Long> serverToId = new ConcurrentHashMap<ServerLocation, Long>();
+
+  private Pool pool;
+
+  public static final ThreadLocal<UserAttributes> userAttributes = new ThreadLocal<UserAttributes>();
+
+  public UserAttributes(Properties credentials, Pool pool) {
+    this.credentials = credentials;
+    this.pool = pool;
+  }
+
+  public void setCredentials(Properties credentials) {
+    this.credentials = credentials;
+  }
+
+  public Properties getCredentials() {
+    return credentials;
+  }
+
+  public void setServerToId(ServerLocation server, Long uniqueId) {
+    serverToId.put(server, uniqueId);
+  }
+
+  public ConcurrentHashMap<ServerLocation, Long> getServerToId() {
+    return serverToId;
+  }
+
+  public Pool getPool() {
+    return pool;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/ConnectionManagerImpl.dia
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/ConnectionManagerImpl.dia b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/ConnectionManagerImpl.dia
new file mode 100644
index 0000000..39d8f3d
Binary files /dev/null and b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/ConnectionManagerImpl.dia differ

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/ConnectionManagerImpl.png
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/ConnectionManagerImpl.png b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/ConnectionManagerImpl.png
new file mode 100644
index 0000000..773fabe
Binary files /dev/null and b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/ConnectionManagerImpl.png differ

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/PoolImpl.dia
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/PoolImpl.dia b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/PoolImpl.dia
new file mode 100644
index 0000000..731aeb0
Binary files /dev/null and b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/PoolImpl.dia differ

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/QueueManagerImpl.dia
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/QueueManagerImpl.dia b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/QueueManagerImpl.dia
new file mode 100644
index 0000000..8cc7d0e
Binary files /dev/null and b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/QueueManagerImpl.dia differ

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/QueueManagerImpl.png
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/QueueManagerImpl.png b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/QueueManagerImpl.png
new file mode 100644
index 0000000..d48bbf6
Binary files /dev/null and b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/QueueManagerImpl.png differ

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/client_static_diagram.png
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/client_static_diagram.png b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/client_static_diagram.png
new file mode 100644
index 0000000..afbeaa4
Binary files /dev/null and b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/doc-files/client_static_diagram.png differ

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/ClientConnectionRequest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/ClientConnectionRequest.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/ClientConnectionRequest.java
new file mode 100644
index 0000000..6c6c04b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/ClientConnectionRequest.java
@@ -0,0 +1,59 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal.locator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Set;
+
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+
+/**
+ * A request from a client to the locator asking for a
+ * server to connect to for client to server traffic.
+ * @author dsmith
+ *
+ */
+public class ClientConnectionRequest extends ServerLocationRequest {
+  Set/*<ServerLocation>*/ excludedServers;
+  
+  public ClientConnectionRequest() {
+    
+  }
+
+  public ClientConnectionRequest(Set/*<ServerLocation>*/ excludedServers, String serverGroup) {
+    super(serverGroup);
+    this.excludedServers = excludedServers;
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.excludedServers = SerializationHelper.readServerLocationSet(in);
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    SerializationHelper.writeServerLocationSet(this.excludedServers, out);
+  }
+
+  public Set getExcludedServers() {
+    return excludedServers;
+  }
+  
+  @Override
+  public String toString() {
+    return "ClientConnectionRequest{group=" + getServerGroup() + ", excluded=" + getExcludedServers() + "}";
+  }
+
+  public int getDSFID() {
+    return DataSerializableFixedID.CLIENT_CONNECTION_REQUEST;
+  }
+}


Mime
View raw message