geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [10/51] [partial] incubator-geode git commit: Init
Date Tue, 28 Apr 2015 21:40:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExplicitConnectionSourceImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExplicitConnectionSourceImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExplicitConnectionSourceImpl.java
new file mode 100644
index 0000000..a1bdc16
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExplicitConnectionSourceImpl.java
@@ -0,0 +1,341 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException; 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.util.EndpointDoesNotExistException;
+import com.gemstone.gemfire.cache.util.EndpointExistsException;
+import com.gemstone.gemfire.cache.util.EndpointInUseException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerQueueStatus;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.security.GemFireSecurityException;
+
+/**
+ * A connection source where the list of endpoints is specified explicitly. 
+ * @author dsmith
+ * @since 5.7
+ * 
+ * TODO - the UnusedServerMonitor basically will force the pool to
+ * have at least one connection to each server. Maybe we need to have it
+ * create connections that are outside the pool?
+ *
+ */
+public class ExplicitConnectionSourceImpl implements ConnectionSource {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  private List serverList;
+  private int nextServerIndex = 0;
+  private int nextQueueIndex = 0;
+  private InternalPool pool;
+  
+  /**
+   * A debug flag, which can be toggled by tests to disable/enable shuffling of
+   * the endpoints list
+   */
+  private boolean DISABLE_SHUFFLING = Boolean
+      .getBoolean("gemfire.bridge.disableShufflingOfEndpoints");
+
+  public ExplicitConnectionSourceImpl(List/*<InetSocketAddress>*/contacts) {
+    ArrayList serverList = new ArrayList(contacts.size());
+    for(int i = 0; i < contacts.size(); i++) {
+      InetSocketAddress addr = (InetSocketAddress)contacts.get(i);
+      serverList.add(new ServerLocation(addr.getHostName(), addr.getPort()));
+    }
+    shuffle(serverList);
+    this.serverList = Collections.unmodifiableList(serverList);
+  }
+
+  public synchronized void start(InternalPool pool) {
+    this.pool = pool;
+    pool.getStats().setInitialContacts(serverList.size());
+  }
+  
+  public void stop() {
+    //do nothing
+  }
+
+  public ServerLocation findReplacementServer(ServerLocation currentServer, Set/*<ServerLocation>*/ excludedServers) {
+    // at this time we always try to find a server other than currentServer
+    // and if we do return it. Otherwise return null;
+    // @todo grid: We could add balancing information to the explicit source
+    // so that clients would attempt to keep the same number of connections
+    // to each server but it would be a bit of work.
+    // Plus we need to make sure it would work ok for hardware load balancers.
+    HashSet excludedPlusCurrent = new HashSet(excludedServers);
+    excludedPlusCurrent.add(currentServer);
+    return findServer(excludedPlusCurrent);
+  }
+  
+  public synchronized ServerLocation findServer(Set excludedServers) {
+    if(PoolImpl.TEST_DURABLE_IS_NET_DOWN) {
+      return null;
+    }
+    ServerLocation nextServer;
+    int startIndex = nextServerIndex;
+    do {
+      nextServer = (ServerLocation) serverList.get(nextServerIndex);
+      if(++nextServerIndex >= serverList.size()) {
+        nextServerIndex = 0;
+      }
+      if(!excludedServers.contains(nextServer)) {
+        return nextServer;
+      }
+    } while(nextServerIndex != startIndex);
+    
+    return null;
+  }
+  
+  /**
+   * TODO - this algorithm could be cleaned up. Right now we have to
+   * connect to every server in the system to find where our durable
+   * queue lives.
+   */
+  public synchronized List findServersForQueue(Set excludedServers,
+      int numServers, ClientProxyMembershipID proxyId, boolean findDurableQueue) {
+    if(PoolImpl.TEST_DURABLE_IS_NET_DOWN) {
+      return new ArrayList();
+    }
+    if(numServers == -1) {
+      numServers = Integer.MAX_VALUE;
+    }
+    if(findDurableQueue && proxyId.isDurable()) {
+      return findDurableQueues(excludedServers, numServers);
+    } else {
+      return pickQueueServers(excludedServers, numServers);
+    }
+  }
+  
+  /**
+   * Remove an endpoint from this connection source.
+   * 
+   * @param host
+   * @param port
+   * @throws EndpointDoesNotExistException if the <code>Endpoint</code> to be
+   * removed doesn't exist.
+   */
+  public synchronized void removeEndpoint(String host,int port) throws EndpointInUseException,EndpointDoesNotExistException {
+    serverList = new ArrayList(serverList);
+    Iterator it = serverList.iterator();
+    boolean found = false;
+    host = lookupHostName(host);
+    while(it.hasNext()) {
+      ServerLocation loc = (ServerLocation)it.next();
+      if(loc.getHostName().equalsIgnoreCase(host)) {
+        if(loc.getPort()==port) {
+          EndpointManager em = pool.getEndpointManager();
+          if(em.getEndpointMap().containsKey(loc)) {
+            throw new EndpointInUseException("Endpoint in use cannot be removed:"+loc);
+          } else {
+            it.remove();
+            found = true;
+          }
+        }
+      }
+    }
+    serverList = Collections.unmodifiableList(serverList);
+    if(!found) {
+      throw new EndpointDoesNotExistException("endpointlist:"+serverList,host,port);
+    }
+  }
+  
+  /**
+   * Add an endpoint to this connection source.
+   * 
+   * @param host
+   * @param port
+   * @throws EndpointExistsException if the <code>Endpoint</code> to be
+   * added already exists.
+   */
+  public synchronized void addEndpoint(String host,int port) throws EndpointExistsException {
+    Iterator it = serverList.iterator();
+    host = lookupHostName(host);
+    while(it.hasNext()) {
+      ServerLocation loc = (ServerLocation)it.next();
+      if(loc.getHostName().equalsIgnoreCase(host)) {
+        if(loc.getPort()==port) {
+          throw new EndpointExistsException("Endpoint already exists host="+host+" port="+port);
+        }
+      }
+    }
+    serverList = new ArrayList(serverList);
+    serverList.add(new ServerLocation(host,port));
+    serverList = Collections.unmodifiableList(serverList);
+  }
+ 
+  /**
+   * When we create an ExplicitConnectionSource, we convert a the hostname of an
+   * endpoint from a string to an InetAddress and back. This method duplicates
+   * that process for endpoints that are added or removed after the fact.
+   */
+  private String lookupHostName(String host) {
+    try {
+      InetAddress hostAddr = InetAddress.getByName(host);
+      host = hostAddr.getHostName();
+    } catch (UnknownHostException cause) {
+      IllegalArgumentException ex = new IllegalArgumentException("Unknown host " + host);
+      ex.initCause(cause);
+      throw ex;
+    }
+    return host;
+  } 
+
+  public boolean isBalanced() {
+    return false;
+  }
+  
+  private List pickQueueServers(Set excludedServers,
+      int numServers) {
+    
+    ArrayList result = new ArrayList();
+    ServerLocation nextQueue;
+    int startIndex = nextQueueIndex;
+    do {
+      nextQueue= (ServerLocation) serverList.get(nextQueueIndex);
+      if(++nextQueueIndex >= serverList.size()) {
+        nextQueueIndex = 0;
+      }
+      if(!excludedServers.contains(nextQueue)) {
+        result.add(nextQueue);
+      }
+    } while(nextQueueIndex != startIndex && result.size() < numServers);
+    
+    return result;
+  }
+
+  /**
+   * a "fake" operation which just extracts the queue status from the connection
+   */
+  private static class HasQueueOp implements Op {
+    public static final HasQueueOp SINGLETON = new HasQueueOp();
+    public Object attempt(Connection cnx) throws Exception {
+      ServerQueueStatus status = cnx.getQueueStatus();
+      return status.isNonRedundant() ? Boolean.FALSE : Boolean.TRUE;
+    }
+    @Override
+    public boolean useThreadLocalConnection() {
+      return false;
+    }
+  }
+  
+  private List findDurableQueues(Set excludedServers,
+      int numServers) {
+    ArrayList durableServers = new ArrayList();
+    ArrayList otherServers = new ArrayList();
+    
+    logger.debug("ExplicitConnectionSource - looking for durable queue");
+    
+    for(Iterator itr = serverList.iterator(); itr.hasNext(); ) {
+      ServerLocation server = (ServerLocation) itr.next();
+      if(excludedServers.contains(server)) {
+        continue;
+      }
+      
+      //the pool will automatically create a connection to this server
+      //and store it for future use.
+      Boolean hasQueue;
+      try {
+        hasQueue = (Boolean) pool.executeOn(server, HasQueueOp.SINGLETON);
+      } catch(GemFireSecurityException e) {
+        throw e;
+      } catch(Exception e) {
+        if(e.getCause() instanceof GemFireSecurityException) {
+          throw (GemFireSecurityException)e.getCause();
+        }
+        if(logger.isDebugEnabled()) {
+          logger.debug("Unabled to check for durable queue on server {}: {}", server, e);
+        }
+        continue;
+      }
+      if(hasQueue != null) {
+        if(hasQueue.booleanValue()) {
+          if(logger.isDebugEnabled()) {
+            logger.debug("Durable queue found on {}", server);
+          }
+          durableServers.add(server);
+        } else {
+          if(logger.isDebugEnabled()) {
+            logger.debug("Durable queue was not found on {}", server);
+          }
+          otherServers.add(server);
+        }
+      }
+    }
+
+    int remainingServers = numServers - durableServers.size();
+    if(remainingServers > otherServers.size()) {
+      remainingServers = otherServers.size();
+    }
+    //note, we're always prefering the servers in the beginning of the list
+    //but that's ok because we already shuffled the list in our constructor.
+    if(remainingServers > 0) {
+      durableServers.addAll(otherServers.subList(0, remainingServers));
+      nextQueueIndex = remainingServers % serverList.size();
+    }
+    
+    if(logger.isDebugEnabled()) {
+      logger.debug("found {} servers out of {}", durableServers.size(), numServers);
+    }
+    
+    return durableServers;
+  }
+  
+  private void shuffle(List endpoints)
+  {
+    //this check was copied from ConnectionProxyImpl
+    if (endpoints.size() < 2 || DISABLE_SHUFFLING) {
+      /*
+       * It is not safe to shuffle an ArrayList of size 1
+       * java.lang.IndexOutOfBoundsException: Index: 1, Size: 1 at
+       * java.util.ArrayList.RangeCheck(Unknown Source) at
+       * java.util.ArrayList.get(Unknown Source) at
+       * java.util.Collections.swap(Unknown Source) at
+       * java.util.Collections.shuffle(Unknown Source)
+       */
+      return;
+    }
+    Collections.shuffle(endpoints);
+  }
+  
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("EndPoints[");
+    synchronized(this) {
+      Iterator it = serverList.iterator();
+      while(it.hasNext()) {
+        ServerLocation loc = (ServerLocation)it.next();
+        sb.append(loc.getHostName()+":"+loc.getPort());
+        if(it.hasNext()) {
+          sb.append(",");
+        }
+      }
+    }
+    sb.append("]");
+    return sb.toString();
+  }
+  
+  ArrayList<ServerLocation> getAllServers() {
+    ArrayList<ServerLocation> list = new ArrayList<ServerLocation>();
+    list.addAll(this.serverList);
+    return list;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetAllOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetAllOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetAllOp.java
new file mode 100644
index 0000000..14c55e5
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetAllOp.java
@@ -0,0 +1,233 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Does a region getAll on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class GetAllOp {
+  
+  private static final Logger logger = LogService.getLogger();
+  
+  /**
+   * Does a region getAll on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the getAll on
+   * @param keys list of keys to get
+   * @return the map of values found by the getAll if any
+   */
+  public static VersionedObjectList execute(ExecutablePool pool,
+                                               String region,
+                                               List keys,
+                                               Object callback)
+  {
+    AbstractOp op = new GetAllOpImpl(region, keys, callback);
+    op.initMessagePart();
+    return ((VersionedObjectList)pool.execute(op)).setKeys(keys);
+  }
+  
+  public static VersionedObjectList execute(ExecutablePool pool,
+      Region region, List keys, int retryAttempts, Object callback) {
+    AbstractOp op = new GetAllOpImpl(region.getFullPath(), keys, callback);
+    ClientMetadataService cms = ((LocalRegion)region).getCache()
+        .getClientMetadataService();
+
+    Map<ServerLocation, HashSet> serverToFilterMap = cms.getServerToFilterMap(
+        keys, region, true);
+    
+    if (serverToFilterMap == null || serverToFilterMap.isEmpty()) {
+      op.initMessagePart();
+      return ((VersionedObjectList)pool.execute(op)).setKeys(keys);
+    }
+    else {
+      VersionedObjectList result = null;
+      ServerConnectivityException se = null;
+      List retryList = new ArrayList();
+      List callableTasks = constructGetAllTasks(region.getFullPath(),
+          serverToFilterMap, (PoolImpl)pool, callback);
+      Map<ServerLocation, Object> results = SingleHopClientExecutor.submitGetAll(
+          serverToFilterMap, callableTasks, cms, (LocalRegion)region);
+      for (ServerLocation server : results.keySet()) {
+        Object serverResult = results.get(server);
+        if (serverResult instanceof ServerConnectivityException) {
+          se = (ServerConnectivityException)serverResult;
+          retryList.addAll(serverToFilterMap.get(server));
+        }
+        else {
+          if (result == null) {
+            result = (VersionedObjectList)serverResult;
+          } else {
+            result.addAll((VersionedObjectList)serverResult);
+          }
+        }
+      }
+      
+      if (se != null) {
+        if (retryAttempts == 0) {
+          throw se;
+        }
+        else {
+          VersionedObjectList retryResult = GetAllOp.execute(pool,
+              region.getFullPath(), retryList, callback);
+          if (result == null) {
+            result = retryResult;
+          } else {
+            result.addAll(retryResult);
+          }
+        }
+      }
+
+      return result;
+    }
+  }
+  
+  private GetAllOp() {
+    // no instances allowed
+  }
+  
+  static List constructGetAllTasks(String region,
+      final Map<ServerLocation, HashSet> serverToFilterMap, final PoolImpl pool, final Object callback) {
+    final List<SingleHopOperationCallable> tasks = new ArrayList<SingleHopOperationCallable>();
+    ArrayList<ServerLocation> servers = new ArrayList<ServerLocation>(
+        serverToFilterMap.keySet());
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Constructing tasks for the servers {}", servers);
+    }
+    for (ServerLocation server : servers) {
+      Set filterSet = serverToFilterMap.get(server);
+      AbstractOp getAllOp = new GetAllOpImpl(region, new ArrayList(filterSet), callback);
+
+      SingleHopOperationCallable task = new SingleHopOperationCallable(
+          new ServerLocation(server.getHostName(), server.getPort()), pool,
+          getAllOp,UserAttributes.userAttributes.get());
+      tasks.add(task);
+    }
+    return tasks;
+  }
+  
+  static class GetAllOpImpl extends AbstractOp {
+    
+    private List keyList;
+    private final Object callback;
+    
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public GetAllOpImpl(String region,
+                        List keys,
+                        Object callback)
+    {
+      super(callback != null ? MessageType.GET_ALL_WITH_CALLBACK : MessageType.GET_ALL_70, 3);
+      this.keyList = keys;
+      this.callback = callback;
+      getMessage().addStringPart(region);
+    }
+        
+    @Override
+    protected void initMessagePart() {
+      Object[] keysArray = new Object[this.keyList.size()];
+      this.keyList.toArray(keysArray);
+      getMessage().addObjPart(keysArray);
+      if (this.callback != null) {
+        getMessage().addObjPart(this.callback);
+      } else {
+        // using the old GET_ALL_70 command that expects an int saying we are not register interest
+        getMessage().addIntPart(0);
+      }
+    }
+    
+    public List getKeyList() {
+      return this.keyList;
+    }
+
+
+    @Override  
+    protected Message createResponseMessage() {
+      return new ChunkedMessage(1, Version.CURRENT);
+    }
+    
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override  
+    protected Object processResponse(Message msg, final Connection con) throws Exception {
+      final VersionedObjectList result = new VersionedObjectList(false);
+      final Exception[] exceptionRef = new Exception[1];
+      processChunkedResponse((ChunkedMessage)msg,
+                             "getAll",
+                             new ChunkHandler() {
+                               public void handle(ChunkedMessage cm) throws Exception {
+                                 Part part = cm.getPart(0);
+                                 try {
+                                   Object o = part.getObject();
+                                   if (o instanceof Throwable) {
+                                     String s = "While performing a remote getAll";
+                                     exceptionRef[0] = new ServerOperationException(s, (Throwable)o);
+                                   } else {
+                                     VersionedObjectList chunk = (VersionedObjectList)o;
+                                     chunk.replaceNullIDs(con.getEndpoint().getMemberId());
+                                     result.addAll(chunk);
+                                   }
+                                 } catch(Exception e) {
+                                   exceptionRef[0] = new ServerOperationException("Unable to deserialize value" , e);
+                                 }
+                               }
+                             });
+      if (exceptionRef[0] != null) {
+        throw exceptionRef[0];
+      } else {
+        return result;
+      }
+    }
+
+    @Override  
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.GET_ALL_DATA_ERROR;
+    }
+
+    @Override  
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startGetAll();
+    }
+    @Override  
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endGetAllSend(start, hasFailed());
+    }
+    @Override  
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endGetAll(start, hasTimedOut(), hasFailed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java
new file mode 100755
index 0000000..b334ea0
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java
@@ -0,0 +1,162 @@
+/*
+ * ========================================================================= 
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. 
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ * =========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Retrieves {@link ClientPartitionAdvisor} for the specified PartitionedRegion from
+ * one of the servers
+ * 
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ * 
+ * @since 6.5
+ */
+public class GetClientPRMetaDataOp {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  private GetClientPRMetaDataOp() {
+    // no instances allowed
+  }
+
+  public static void execute(ExecutablePool pool, String regionFullPath,
+      ClientMetadataService cms) {
+    AbstractOp op = new GetClientPRMetaDataOpImpl(regionFullPath, cms);
+    if (logger.isDebugEnabled()) {
+      logger.debug("GetClientPRMetaDataOp#execute : Sending GetClientPRMetaDataOp Message: {} to server using pool: {}", op.getMessage(), pool);
+    }
+    pool.execute(op);
+  }
+
+  static class GetClientPRMetaDataOpImpl extends AbstractOp {
+
+    String regionFullPath = null;
+
+    ClientMetadataService cms = null;
+
+    public GetClientPRMetaDataOpImpl(String regionFullPath, ClientMetadataService cms) {
+      super(MessageType.GET_CLIENT_PR_METADATA, 1);
+      this.regionFullPath = regionFullPath;
+      this.cms = cms;
+      getMessage().addStringPart(regionFullPath);
+    }
+
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      switch (msg.getMessageType()) {
+        case MessageType.GET_CLIENT_PR_METADATA_ERROR:
+          String errorMsg = msg.getPart(0).getString();
+          if (logger.isDebugEnabled()) {
+            logger.debug(errorMsg);
+          }
+          throw new ServerOperationException(errorMsg);
+        case MessageType.RESPONSE_CLIENT_PR_METADATA:
+          final boolean isDebugEnabled = logger.isDebugEnabled();
+          if (isDebugEnabled) {
+            logger.debug("GetClientPRMetaDataOpImpl#processResponse: received message of type : {}" + MessageType.getString(msg.getMessageType()));
+          }
+          int numParts = msg.getNumberOfParts();
+          ClientPartitionAdvisor advisor = cms
+              .getClientPartitionAdvisor(regionFullPath);
+          for (int i = 0; i < numParts; i++) {
+            Object result = msg.getPart(i).getObject();
+            List<BucketServerLocation66> locations = (List<BucketServerLocation66>)result;
+          if (!locations.isEmpty()) {
+            int bucketId = locations.get(0).getBucketId();
+            if (isDebugEnabled) {
+              logger.debug("GetClientPRMetaDataOpImpl#processResponse: for bucketId : {} locations are {}", bucketId, locations);
+            }
+            advisor.updateBucketServerLocations(bucketId, locations, cms);
+            
+            Set<ClientPartitionAdvisor> cpas = cms
+                .getColocatedClientPartitionAdvisor(regionFullPath);
+            if (cpas != null && !cpas.isEmpty()) {
+              for (ClientPartitionAdvisor colCPA : cpas) {
+                colCPA.updateBucketServerLocations(bucketId, locations, cms);
+              }
+            }
+          }
+          }
+          if (isDebugEnabled) {
+            logger.debug("GetClientPRMetaDataOpImpl#processResponse: received ClientPRMetadata from server successfully.");
+          }
+          return null;
+        case MessageType.EXCEPTION:
+          if (logger.isDebugEnabled()) {
+            logger.debug("GetClientPRMetaDataOpImpl#processResponse: received message of type EXCEPTION");
+          }
+          Part part = msg.getPart(0);
+          Object obj = part.getObject();
+          String s = "While performing  GetClientPRMetaDataOp "
+              + ((Throwable)obj).getMessage();
+          throw new ServerOperationException(s, (Throwable)obj);
+        default:
+          throw new InternalGemFireError(
+              LocalizedStrings.Op_UNKNOWN_MESSAGE_TYPE_0
+                  .toLocalizedString(Integer.valueOf(msg.getMessageType())));
+      }
+    }
+
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startGetClientPRMetadata();
+    }
+
+    protected String getOpName() {
+      return "GetClientPRMetaDataOp";
+    }
+
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endGetClientPRMetadataSend(start, hasFailed());
+    }
+
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endGetClientPRMetadata(start, hasTimedOut(), hasFailed());
+    }
+
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return false;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPartitionAttributesOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPartitionAttributesOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPartitionAttributesOp.java
new file mode 100755
index 0000000..7e96f4d
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPartitionAttributesOp.java
@@ -0,0 +1,169 @@
+/*
+ * ========================================================================= 
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. 
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ * =========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.FixedPartitionAttributes;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * 
+ * Retrieves {@link ClientPartitionAdvisor} related information for the
+ * specified PartitionedRegion from one of the servers
+ * 
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ * 
+ * @since 6.5
+ * 
+ */
+public class GetClientPartitionAttributesOp {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  private GetClientPartitionAttributesOp() {
+    // no instances allowed
+  }
+
+  @SuppressWarnings("unchecked")
+  public static ClientPartitionAdvisor execute(ExecutablePool pool, String regionFullPath) {
+    AbstractOp op = new GetClientPartitionAttributesOpImpl(regionFullPath);
+    if (logger.isDebugEnabled()) {
+      logger.debug("GetClientPartitionAttributesOp#execute : Sending GetClientPartitionAttributesOp Message: {} for region: {} to server using pool: {}", op.getMessage(), regionFullPath, pool);
+    }
+    
+    ClientPartitionAdvisor advisor = (ClientPartitionAdvisor)pool.execute(op);
+
+    if (advisor != null) {
+      advisor.setServerGroup(((PoolImpl)pool).getServerGroup());
+    }
+    
+    return advisor;
+  }
+
+  static class GetClientPartitionAttributesOpImpl extends AbstractOp {
+
+    String regionFullPath = null;
+
+    public GetClientPartitionAttributesOpImpl(String regionFullPath) {
+      super(MessageType.GET_CLIENT_PARTITION_ATTRIBUTES, 1);
+      this.regionFullPath = regionFullPath;
+      getMessage().addStringPart(regionFullPath);
+    }
+
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      switch (msg.getMessageType()) {
+        case MessageType.GET_CLIENT_PARTITION_ATTRIBUTES_ERROR:
+          String errorMsg = msg.getPart(0).getString();
+          if (logger.isDebugEnabled()) {
+            logger.debug(errorMsg);
+          }
+          throw new ServerOperationException(errorMsg);
+        case MessageType.RESPONSE_CLIENT_PARTITION_ATTRIBUTES:
+          final boolean isDebugEnabled = logger.isDebugEnabled();
+          if (isDebugEnabled) {
+            logger.debug("GetClientPartitionAttributesOpImpl#processResponse: received message of type : {}", MessageType.getString(msg.getMessageType()));
+          }
+          int bucketCount;
+          String colocatedWith;
+          String partitionResolverName = null;
+          Set<FixedPartitionAttributes> fpaSet = null; 
+          bucketCount = (Integer)msg.getPart(0).getObject();
+          colocatedWith = (String)msg.getPart(1).getObject();
+          if (msg.getNumberOfParts() == 4) {
+            partitionResolverName = (String)msg.getPart(2).getObject();
+            fpaSet = (Set<FixedPartitionAttributes>)msg.getPart(3).getObject();
+          }
+          else if (msg.getNumberOfParts() == 3) {
+            Object obj = msg.getPart(2).getObject();
+            if(obj instanceof String){
+              partitionResolverName = (String)obj;
+            }else{
+              fpaSet = (Set<FixedPartitionAttributes>)obj;
+            }
+          }
+          else if(bucketCount==-1){              
+              return null;
+          }
+          if (isDebugEnabled) {
+            logger.debug("GetClientPartitionAttributesOpImpl#processResponse: received all the results from server successfully.");
+          }
+          ClientPartitionAdvisor advisor = new ClientPartitionAdvisor(bucketCount, colocatedWith,
+                partitionResolverName, fpaSet);
+          return advisor;
+
+        case MessageType.EXCEPTION:
+          if (logger.isDebugEnabled()) {
+            logger.debug("GetClientPartitionAttributesOpImpl#processResponse: received message of type EXCEPTION");
+          }
+          Part part = msg.getPart(0);
+          Object obj = part.getObject();
+          String s = "While performing  GetClientPartitionAttributesOp "+  ((Throwable)obj).getMessage();
+          throw new ServerOperationException(s, (Throwable) obj);
+        default:
+          throw new InternalGemFireError(
+              LocalizedStrings.Op_UNKNOWN_MESSAGE_TYPE_0
+                  .toLocalizedString(Integer.valueOf(msg.getMessageType())));
+      }
+    }
+
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startGetClientPartitionAttributes();
+    }
+
+    protected String getOpName() {
+      return "GetClientPartitionAttributesOp";
+    }
+
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endGetClientPartitionAttributesSend(start, hasFailed());
+    }
+
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endGetClientPartitionAttributes(start, hasTimedOut(), hasFailed());
+    }
+
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return false;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetEntryOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetEntryOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetEntryOp.java
new file mode 100644
index 0000000..9b6cd09
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetEntryOp.java
@@ -0,0 +1,73 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.EntrySnapshot;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * does getEntry on the server
+ * @author sbawaska
+ */
+public class GetEntryOp {
+
+  /**
+   * Does a region.getEntry on the server using the given pool
+   * @param pool
+   * @param region
+   * @param key
+   * @return an {@link EntrySnapshot} for the given key
+   */
+  public static Object execute(ExecutablePool pool, LocalRegion region,
+      Object key) {
+    AbstractOp op = new GetEntryOpImpl(region, key);
+    return pool.execute(op);
+  }
+  
+  static class GetEntryOpImpl extends AbstractOp {
+
+    private LocalRegion region;
+    private Object key;
+    public GetEntryOpImpl(LocalRegion region, Object key) {
+      super(MessageType.GET_ENTRY, 2);
+      this.region = region;
+      this.key = key;
+      getMessage().addStringPart(region.getFullPath());
+      getMessage().addStringOrObjPart(key);
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      EntrySnapshot snap = (EntrySnapshot) processObjResponse(msg, "getEntry");
+      if (snap != null) {
+        snap.region = region;
+      }
+      return snap;
+    }
+
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.REQUESTDATAERROR;
+    }
+
+    @Override  
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startGetEntry();
+    }
+    @Override  
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endGetEntrySend(start, hasFailed());
+    }
+    @Override  
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endGetEntry(start, hasTimedOut(), hasFailed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetEventValueOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetEventValueOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetEventValueOp.java
new file mode 100755
index 0000000..e1ebe57
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetEventValueOp.java
@@ -0,0 +1,113 @@
+/*
+ * ========================================================================= 
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. 
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ * =========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+
+/**
+ * Gets (full) value (unlike GetOp, which may get either a full value or a delta
+ * depending upon delta flag) of a given event from the ha container on server.
+ * 
+ * @since 6.1
+ */
+public class GetEventValueOp {
+  /**
+   * Does a get on the primary server using connections from the given pool
+   * @param pool the pool to use to communicate with the server.
+   * @param event the eventid to do the get on
+   * @param callbackArg an optional callback arg to pass to any cache callbacks
+   * @return the entry value found by the get if any
+   */
+  public static Object executeOnPrimary(ExecutablePool pool, EventID event,
+      Object callbackArg) {
+    AbstractOp op = new GetEventValueOpImpl(event, callbackArg);
+    return pool.executeOnPrimary(op);
+  }
+
+
+  private GetEventValueOp() {
+    // no instances allowed
+  }
+
+  static class GetEventValueOpImpl extends AbstractOp {
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public GetEventValueOpImpl(EventID event, Object callbackArg) {
+      super(MessageType.REQUEST_EVENT_VALUE, callbackArg != null ? 2 : 1);
+      getMessage().addObjPart(event);
+      if (callbackArg != null) {
+        getMessage().addObjPart(callbackArg);
+      }
+    }
+
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      Part part = msg.getPart(0);
+      final int msgType = msg.getMessageType();
+      if (msgType == MessageType.RESPONSE) {
+        return part;
+      } else {
+        if (msgType == MessageType.REQUEST_EVENT_VALUE_ERROR) {
+          // Value not found in haContainer.
+          return null;
+        }
+        else if (msgType == MessageType.EXCEPTION) {
+          String s = "While performing a remote " + "getFullValue";
+          throw new ServerOperationException(s, (Throwable) part.getObject());
+          // Get the exception toString part.
+          // This was added for c++ thin client and not used in java
+          // Part exceptionToStringPart = msg.getPart(1);
+        } else if (isErrorResponse(msgType)) {
+          throw new ServerOperationException(part.getString());
+        } else {
+          throw new InternalGemFireError("Unexpected message type "
+                                         + MessageType.getString(msgType));
+        }
+      }
+    }
+
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.REQUESTDATAERROR;
+    }
+
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startGet();
+    }
+
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endGetSend(start, hasFailed());
+    }
+
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endGet(start, hasTimedOut(), hasFailed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetFunctionAttributeOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetFunctionAttributeOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetFunctionAttributeOp.java
new file mode 100644
index 0000000..b727f4e
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetFunctionAttributeOp.java
@@ -0,0 +1,75 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+public class GetFunctionAttributeOp {
+
+  public static Object execute(ExecutablePool pool, String functionId) {
+    AbstractOp op = new GetFunctionAttributeOpImpl(functionId);
+    return pool.execute(op);
+  }
+
+  private GetFunctionAttributeOp() {
+    // no instances allowed
+  }
+
+  static class GetFunctionAttributeOpImpl extends AbstractOp {
+
+    private String functionId = null;
+
+    public GetFunctionAttributeOpImpl(String functionId) {
+      super(MessageType.GET_FUNCTION_ATTRIBUTES, 1);
+      this.functionId = functionId;
+      getMessage().addStringPart(this.functionId);
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      return processObjResponse(msg, "getFunctionAttribute");
+    }
+
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.REQUESTDATAERROR;
+    }
+
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startGet();
+    }
+
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endGetSend(start, hasFailed());
+    }
+
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endGet(start, hasTimedOut(), hasFailed());
+    }
+
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetOp.java
new file mode 100644
index 0000000..019da2e
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetOp.java
@@ -0,0 +1,241 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.CacheLoaderException;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.client.AllConnectionsInUseException;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.util.BridgeWriterException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.Token;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Does a region get on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class GetOp {
+  private static final Logger logger = LogService.getLogger();
+  
+  public static final int HAS_CALLBACK_ARG = 0x01;
+  public static final int HAS_VERSION_TAG = 0x02;
+  public static final int KEY_NOT_PRESENT = 0x04;
+  public static final int VALUE_IS_INVALID = 0x08; // Token.INVALID
+
+  /**
+   * Does a region get on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the region to do the get on
+   * @param key the entry key to do the get on
+   * @param callbackArg an optional callback arg to pass to any cache callbacks
+   * @param clientEvent holder for returning version information
+   * @return the entry value found by the get if any
+   */
+  public static Object execute(ExecutablePool pool, LocalRegion region,
+      Object key, Object callbackArg, boolean prSingleHopEnabled, EntryEventImpl clientEvent) {
+    ClientMetadataService cms = ((GemFireCacheImpl)region.getCache())
+        .getClientMetadataService();
+    AbstractOp op = new GetOpImpl(region, key, callbackArg,
+        prSingleHopEnabled, clientEvent);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("GetOp invoked for key {}", key);
+    }
+    if (prSingleHopEnabled) {
+      ServerLocation server = cms.getBucketServerLocation(region,
+          Operation.GET, key, null, callbackArg);
+        if (server != null) {
+          try {
+            PoolImpl poolImpl = (PoolImpl)pool;
+            boolean onlyUseExistingCnx = ((poolImpl.getMaxConnections() != -1 && poolImpl
+                .getConnectionCount() >= poolImpl.getMaxConnections()) ? true
+                : false);
+            return pool.executeOn(new ServerLocation(server.getHostName(),
+                server.getPort()), op, true, onlyUseExistingCnx);
+          }
+          catch (AllConnectionsInUseException e) {
+          }
+          catch (ServerConnectivityException e) {
+            if (e instanceof ServerOperationException) {
+              throw e; // fixed 44656
+            }
+            cms.removeBucketServerLocation(server);
+          }
+          catch (BridgeWriterException e) {
+            if (e.getCause() instanceof ServerConnectivityException)
+              cms.removeBucketServerLocation(server);
+          }
+          catch (CacheLoaderException e) {
+            if (e.getCause() instanceof ServerConnectivityException)
+              cms.removeBucketServerLocation(server);
+          }
+        }
+    }
+    return pool.execute(op);
+  }
+
+                                                               
+  private GetOp() {
+    // no instances allowed
+  }
+  
+  static class GetOpImpl extends AbstractOp {
+    
+    private LocalRegion region=null ;
+    
+    private boolean prSingleHopEnabled = false;
+
+    private Object key;
+
+    private Object callbackArg;
+
+    private EntryEventImpl clientEvent;
+    
+    public String toString() {
+      return "GetOpImpl(key="+key+")";
+    }
+    
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public GetOpImpl(LocalRegion region,
+                     Object key,
+                     Object callbackArg,
+                     boolean prSingleHopEnabled, 
+                     EntryEventImpl clientEvent) {
+      super(MessageType.REQUEST, callbackArg != null ? 3 : 2);
+      if (logger.isDebugEnabled()) {
+        logger.debug("constructing a GetOp for key {}", key/*, new Exception("stack trace")*/);
+      }
+      this.region = region ;
+      this.prSingleHopEnabled = prSingleHopEnabled;
+      this.key = key ;
+      this.callbackArg = callbackArg;
+      this.clientEvent = clientEvent;
+      getMessage().addStringPart(region.getFullPath());
+      getMessage().addStringOrObjPart(key);
+      if (callbackArg != null) {
+        getMessage().addObjPart(callbackArg);
+      }
+    }
+    
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      throw new UnsupportedOperationException(); // version tag processing requires the connection
+    }
+    
+    @Override
+    protected Object processResponse(Message msg, Connection con) throws Exception {
+      Object object = processObjResponse(msg, "get");
+      if (msg.getNumberOfParts() > 1) {
+        int partIdx = 1;
+        int flags = msg.getPart(partIdx++).getInt();
+        if ((flags & HAS_CALLBACK_ARG) != 0) {
+          msg.getPart(partIdx++).getObject(); // callbackArg
+        }
+        // if there's a version tag
+        if ((object == null)  &&  ((flags & VALUE_IS_INVALID) != 0)) {
+          object = Token.INVALID;
+        }
+        if ((flags & HAS_VERSION_TAG) != 0) {
+          VersionTag tag = (VersionTag)msg.getPart(partIdx++).getObject();
+          assert con != null; // for debugging
+          assert con.getEndpoint() != null; //for debugging
+          assert tag != null; // for debugging
+          tag.replaceNullIDs((InternalDistributedMember) con.getEndpoint().getMemberId());
+          if (this.clientEvent != null) {
+            this.clientEvent.setVersionTag(tag);
+          }
+          if ((flags & KEY_NOT_PRESENT) != 0) {
+            object = Token.TOMBSTONE;
+          }
+        }
+        if (prSingleHopEnabled && msg.getNumberOfParts() > partIdx) {
+          byte version = 0;
+          int noOfMsgParts = msg.getNumberOfParts();
+          if (noOfMsgParts == partIdx+1) {
+            Part part = msg.getPart(partIdx++);
+            if (part.isBytes()) {
+              byte[] bytesReceived = part.getSerializedForm();
+              if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION
+                  && bytesReceived.length == ClientMetadataService.SIZE_BYTES_ARRAY_RECEIVED) {
+                ClientMetadataService cms;
+                try {
+                  cms = region.getCache().getClientMetadataService();
+                  version = cms.getMetaDataVersion(region, Operation.UPDATE, key,
+                      null, callbackArg);
+                }
+                catch (CacheClosedException e) {
+                  return null;
+                }
+                if (bytesReceived[0] != version) {
+                  cms.scheduleGetPRMetaData(region, false,bytesReceived[1]);
+                }
+              }
+            }
+          }
+          else if (noOfMsgParts == partIdx+2) {
+            msg.getPart(partIdx++).getObject(); // callbackArg
+            Part part = msg.getPart(partIdx++);
+            if (part.isBytes()) {
+              byte[] bytesReceived = part.getSerializedForm();
+              if (this.region != null
+                  && bytesReceived.length == ClientMetadataService.SIZE_BYTES_ARRAY_RECEIVED) {
+                ClientMetadataService cms;
+                try {
+                  cms = region.getCache().getClientMetadataService();
+                  version = cms.getMetaDataVersion(region, Operation.UPDATE, key,
+                      null, callbackArg);
+                }
+                catch (CacheClosedException e) {
+                  return null;
+                }
+                if (bytesReceived[0] != version) {
+                  cms.scheduleGetPRMetaData(region, false,bytesReceived[1]);
+                }
+              }
+            }
+          }
+        }
+      }
+      return object;
+    }
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.REQUESTDATAERROR;
+    }
+
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startGet();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endGetSend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endGet(start, hasTimedOut(), hasFailed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumByIdOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumByIdOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumByIdOp.java
new file mode 100644
index 0000000..c2a8f34
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumByIdOp.java
@@ -0,0 +1,85 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014, Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.pdx.internal.EnumInfo;
+
+/**
+ * Retrieve the PDXType, given an integer PDX id, from a server.
+ * @author darrel
+ * @since 6.6.2
+ */
+public class GetPDXEnumByIdOp {
+  /**
+   * Get a enum from the given pool.
+   * @param pool the pool to use to communicate with the server.
+   */
+  public static EnumInfo execute(ExecutablePool pool,
+                             int enumId)
+  {
+    AbstractOp op = new GetPDXEnumByIdOpImpl(enumId);
+    return (EnumInfo) pool.execute(op);
+  }
+                                                               
+  private GetPDXEnumByIdOp() {
+    // no instances allowed
+  }
+  
+  private static class GetPDXEnumByIdOpImpl extends AbstractOp {
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public GetPDXEnumByIdOpImpl(int enumId) {
+      super(MessageType.GET_PDX_ENUM_BY_ID, 1);
+      getMessage().addIntPart(enumId);
+    }
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      return processObjResponse(msg, "getPDXEnumById");
+    }
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return false;
+    }
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startGetPDXTypeById(); // reuse PDXType stats instead of adding new enum ones
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endGetPDXTypeByIdSend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endGetPDXTypeById(start, hasTimedOut(), hasFailed());
+    }
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+    //Don't send the transaction id for this message type.
+    @Override
+    protected boolean participateInTransaction() {
+      return false;
+    }
+    
+    //TODO - no idea what this mumbo jumbo means, but it's on
+    //most of the other messages like this.
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumsOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumsOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumsOp.java
new file mode 100644
index 0000000..3326733
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXEnumsOp.java
@@ -0,0 +1,103 @@
+/*=========================================================================
+ * Copyright (c) 2012 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Map;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.pdx.internal.EnumInfo;
+
+/**
+ * Retrieve all known PDX types.
+ * 
+ * @author bakera
+ * @since 7.0
+ */
+public class GetPDXEnumsOp {
+
+  public static Map<Integer, EnumInfo> execute(ExecutablePool pool) {
+    AbstractOp op = new GetPDXEnumsOpImpl();
+    return (Map<Integer, EnumInfo>) pool.execute(op);
+  }
+                                                               
+  private GetPDXEnumsOp() {
+    // no instances allowed
+  }
+  
+  private static class GetPDXEnumsOpImpl extends AbstractOp {
+    public GetPDXEnumsOpImpl() {
+      super(MessageType.GET_PDX_ENUMS, 1);
+      getMessage().addIntPart(0); // must have at least one part
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      Part part = msg.getPart(0);
+      int msgType = msg.getMessageType();
+      if (msgType == MessageType.RESPONSE) {
+        return (Map<Integer, EnumInfo>) part.getObject();
+
+      } else {
+        if (msgType == MessageType.EXCEPTION) {
+          String s = "While performing a remote " + "getPdxEnums";
+          throw new ServerOperationException(s, (Throwable) part.getObject());
+
+        } else if (isErrorResponse(msgType)) {
+          throw new ServerOperationException(part.getString());
+
+        } else {
+          throw new InternalGemFireError("Unexpected message type "
+              + MessageType.getString(msgType));
+        }
+      }
+    }
+
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return false;
+    }
+
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return 0;
+    }
+
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+    }
+
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+    }
+    
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+
+    @Override
+    protected boolean participateInTransaction() {
+      return false;
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForEnumOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForEnumOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForEnumOp.java
new file mode 100644
index 0000000..4379823
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForEnumOp.java
@@ -0,0 +1,106 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014, Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.pdx.internal.EnumInfo;
+
+/**
+ * Retrieve the PDXType, given an integer PDX id, from a server.
+ * @author darrel
+ * @since 6.6.2
+ */
+public class GetPDXIdForEnumOp {
+  /**
+   * Register a bunch of instantiators on a server
+   * using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   */
+  public static int execute(ExecutablePool pool,
+                             EnumInfo ei)
+  {
+    AbstractOp op = new GetPDXIdForEnumOpImpl(ei);
+    return ((Integer) pool.execute(op)).intValue();
+  }
+                                                               
+  private GetPDXIdForEnumOp() {
+    // no instances allowed
+  }
+  
+  private static class GetPDXIdForEnumOpImpl extends AbstractOp {
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public GetPDXIdForEnumOpImpl(EnumInfo ei) {
+      super(MessageType.GET_PDX_ID_FOR_ENUM, 1);
+      getMessage().addObjPart(ei);
+    }
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      Part part = msg.getPart(0);
+      final int msgType = msg.getMessageType();
+      if (msgType == MessageType.RESPONSE) {
+        return Integer.valueOf(part.getInt());
+      } else {
+        if (msgType == MessageType.EXCEPTION) {
+          String s = "While performing a remote " + "getPdxIdForEnum";
+          throw new ServerOperationException(s, (Throwable) part.getObject());
+          // Get the exception toString part.
+          // This was added for c++ thin client and not used in java
+          // Part exceptionToStringPart = msg.getPart(1);
+        } else if (isErrorResponse(msgType)) {
+          throw new ServerOperationException(part.getString());
+        } else {
+          throw new InternalGemFireError("Unexpected message type "
+                                         + MessageType.getString(msgType));
+        }
+      }
+    }
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return false;
+    }
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startGetPDXTypeById();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endGetPDXTypeByIdSend(start, hasFailed()); /* reusing type stats instead of adding enum ones */
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endGetPDXTypeById(start, hasTimedOut(), hasFailed());
+    }
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+    //Don't send the transaction id for this message type.
+    @Override
+    protected boolean participateInTransaction() {
+      return false;
+    }
+    //TODO - no idea what this mumbo jumbo means, but it's on
+    //most of the other messages like this.
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForTypeOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForTypeOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForTypeOp.java
new file mode 100644
index 0000000..f4ea18e
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXIdForTypeOp.java
@@ -0,0 +1,106 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.pdx.internal.PdxType;
+
+/**
+ * Retrieve the PDXType, given an integer PDX id, from a server.
+ * @author dsmith
+ * @since 6.6
+ */
+public class GetPDXIdForTypeOp {
+  /**
+   * Register a bunch of instantiators on a server
+   * using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   */
+  public static int execute(ExecutablePool pool,
+                             PdxType type)
+  {
+    AbstractOp op = new GetPDXIdForTypeOpImpl(type);
+    return ((Integer) pool.execute(op)).intValue();
+  }
+                                                               
+  private GetPDXIdForTypeOp() {
+    // no instances allowed
+  }
+  
+  private static class GetPDXIdForTypeOpImpl extends AbstractOp {
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public GetPDXIdForTypeOpImpl(PdxType type) {
+      super(MessageType.GET_PDX_ID_FOR_TYPE, 1);
+      getMessage().addObjPart(type);
+    }
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      Part part = msg.getPart(0);
+      final int msgType = msg.getMessageType();
+      if (msgType == MessageType.RESPONSE) {
+        return Integer.valueOf(part.getInt());
+      } else {
+        if (msgType == MessageType.EXCEPTION) {
+          String s = "While performing a remote " + "getPdxIdForType";
+          throw new ServerOperationException(s, (Throwable) part.getObject());
+          // Get the exception toString part.
+          // This was added for c++ thin client and not used in java
+          // Part exceptionToStringPart = msg.getPart(1);
+        } else if (isErrorResponse(msgType)) {
+          throw new ServerOperationException(part.getString());
+        } else {
+          throw new InternalGemFireError("Unexpected message type "
+                                         + MessageType.getString(msgType));
+        }
+      }
+    }
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return false;
+    }
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startGetPDXTypeById();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endGetPDXTypeByIdSend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endGetPDXTypeById(start, hasTimedOut(), hasFailed());
+    }
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+    //Don't send the transaction id for this message type.
+    @Override
+    protected boolean participateInTransaction() {
+      return false;
+    }
+    //TODO - no idea what this mumbo jumbo means, but it's on
+    //most of the other messages like this.
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypeByIdOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypeByIdOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypeByIdOp.java
new file mode 100644
index 0000000..6ea5e3d
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypeByIdOp.java
@@ -0,0 +1,85 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.pdx.internal.PdxType;
+
+/**
+ * Retrieve the PDXType, given an integer PDX id, from a server.
+ * @author dsmith
+ * @since 6.6
+ */
+public class GetPDXTypeByIdOp {
+  /**
+   * Get a PdxType from the given pool.
+   * @param pool the pool to use to communicate with the server.
+   */
+  public static PdxType execute(ExecutablePool pool,
+                             int pdxId)
+  {
+    AbstractOp op = new GetPDXTypeByIdOpImpl(pdxId);
+    return (PdxType) pool.execute(op);
+  }
+                                                               
+  private GetPDXTypeByIdOp() {
+    // no instances allowed
+  }
+  
+  private static class GetPDXTypeByIdOpImpl extends AbstractOp {
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public GetPDXTypeByIdOpImpl(int pdxId) {
+      super(MessageType.GET_PDX_TYPE_BY_ID, 1);
+      getMessage().addIntPart(pdxId);
+    }
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      return processObjResponse(msg, "getPDXTypeById");
+    }
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return false;
+    }
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startGetPDXTypeById();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endGetPDXTypeByIdSend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endGetPDXTypeById(start, hasTimedOut(), hasFailed());
+    }
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+    //Don't send the transaction id for this message type.
+    @Override
+    protected boolean participateInTransaction() {
+      return false;
+    }
+    
+    //TODO - no idea what this mumbo jumbo means, but it's on
+    //most of the other messages like this.
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypesOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypesOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypesOp.java
new file mode 100644
index 0000000..162cd35
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetPDXTypesOp.java
@@ -0,0 +1,103 @@
+/*=========================================================================
+ * Copyright (c) 2012 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Map;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.pdx.internal.PdxType;
+
+/**
+ * Retrieve all known PDX types.
+ * 
+ * @author bakera
+ * @since 7.0
+ */
+public class GetPDXTypesOp {
+
+  public static Map<Integer, PdxType> execute(ExecutablePool pool) {
+    AbstractOp op = new GetPDXTypesOpImpl();
+    return (Map<Integer, PdxType>) pool.execute(op);
+  }
+                                                               
+  private GetPDXTypesOp() {
+    // no instances allowed
+  }
+  
+  private static class GetPDXTypesOpImpl extends AbstractOp {
+    public GetPDXTypesOpImpl() {
+      super(MessageType.GET_PDX_TYPES, 1);
+      getMessage().addIntPart(0); // must have at least one part
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      Part part = msg.getPart(0);
+      int msgType = msg.getMessageType();
+      if (msgType == MessageType.RESPONSE) {
+        return (Map<Integer, PdxType>) part.getObject();
+
+      } else {
+        if (msgType == MessageType.EXCEPTION) {
+          String s = "While performing a remote " + "getPdxTypes";
+          throw new ServerOperationException(s, (Throwable) part.getObject());
+
+        } else if (isErrorResponse(msgType)) {
+          throw new ServerOperationException(part.getString());
+
+        } else {
+          throw new InternalGemFireError("Unexpected message type "
+              + MessageType.getString(msgType));
+        }
+      }
+    }
+
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return false;
+    }
+
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return 0;
+    }
+
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+    }
+
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+    }
+    
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+
+    @Override
+    protected boolean participateInTransaction() {
+      return false;
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/InstantiatorRecoveryListener.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/InstantiatorRecoveryListener.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/InstantiatorRecoveryListener.java
new file mode 100644
index 0000000..eeb8e1b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/InstantiatorRecoveryListener.java
@@ -0,0 +1,150 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl.PoolTask;
+import com.gemstone.gemfire.internal.InternalInstantiator;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * A listener which will try to resend the instantiators to all servers if the
+ * entire server distributed system was lost and came back one line. This
+ * listener also takes care of sending the initial list of instantiators to the servers <br>
+ * <br> 
+ * TODO - There is a window in which all of the servers could crash and come
+ * back up and we would connect to a new server before realizing that all the
+ * servers crashed. To fix this, we would need to get some kind of birthdate of
+ * the server ds we connect and use that to decide if we need to recover
+ * instantiators. As it is, the window is not very large.
+ * 
+ * 
+ * @author dsmith
+ * 
+ */
+public class InstantiatorRecoveryListener extends EndpointManager.EndpointListenerAdapter {
+  private static final Logger logger = LogService.getLogger();
+  
+  private final AtomicInteger endpointCount = new AtomicInteger();
+  protected final InternalPool pool;
+  protected final ScheduledExecutorService background;
+  protected final long pingInterval;
+  protected final Object recoveryScheduledLock = new Object();
+  protected boolean recoveryScheduled;
+  
+  public InstantiatorRecoveryListener(ScheduledExecutorService background, InternalPool pool) {
+    this.pool = pool;
+    this.pingInterval = pool.getPingInterval();
+    this.background = background;
+  }
+  
+  @Override
+  public void endpointCrashed(Endpoint endpoint) {
+    int count = endpointCount.decrementAndGet();
+    if (logger.isDebugEnabled()) {
+      logger.debug("InstantiatorRecoveryTask - EndpointCrashed. Now have {} endpoints", count);
+    }
+  }
+
+  @Override
+  public void endpointNoLongerInUse(Endpoint endpoint) {
+    int count = endpointCount.decrementAndGet();
+    if (logger.isDebugEnabled()) {
+      logger.debug("InstantiatorRecoveryTask - EndpointNoLongerInUse. Now have {} endpoints", count);
+    }
+  }
+
+  @Override
+  public void endpointNowInUse(Endpoint endpoint) {
+    int count  = endpointCount.incrementAndGet();
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (isDebugEnabled) {
+      logger.debug("InstantiatorRecoveryTask - EndpointNowInUse. Now have {} endpoints", count);
+    }
+    if(count == 1) {
+      synchronized(recoveryScheduledLock) {
+        if(!recoveryScheduled) {
+          try {
+            recoveryScheduled = true;
+            background.execute(new RecoveryTask());
+            if (isDebugEnabled) {
+              logger.debug("InstantiatorRecoveryTask - Scheduled Recovery Task");
+            }
+          } catch(RejectedExecutionException e) {
+            //ignore, the timer has been cancelled, which means we're shutting down.
+          }
+        }
+      }
+    }
+  }
+  
+  protected class RecoveryTask extends PoolTask {
+
+    @Override
+    public void run2() {
+      if (pool.getCancelCriterion().cancelInProgress() != null) {
+        return;
+      }
+      synchronized(recoveryScheduledLock) {
+        recoveryScheduled = false;
+      }
+      Object[] objects = InternalInstantiator
+          .getInstantiatorsForSerialization();
+      if (objects.length == 0) {
+        return;
+      }
+      EventID eventId = InternalInstantiator.generateEventId();
+      //Fix for bug:40930
+      if (eventId == null) {
+        background.schedule(new RecoveryTask(), pingInterval,
+            TimeUnit.MILLISECONDS);
+        recoveryScheduled = true;
+      }
+      else {
+        try {
+          RegisterInstantiatorsOp.execute(pool, objects, eventId);
+        } 
+        catch (CancelException e) {
+          throw e;
+        }
+        catch (RejectedExecutionException e) {
+          // This is probably because we've started to shut down.
+          pool.getCancelCriterion().checkCancelInProgress(e);
+          throw e; // weird
+        }
+        catch(Exception e) {
+          pool.getCancelCriterion().checkCancelInProgress(e);
+          
+          // If an exception occurred on the server, don't retry
+          Throwable cause = e.getCause();
+          if (cause instanceof ClassNotFoundException) {
+            logger.warn(LocalizedMessage.create(
+                LocalizedStrings.InstantiatorRecoveryListener_INSTANTIATORRECOVERYTASK_ERROR_CLASSNOTFOUNDEXCEPTION,
+                cause.getMessage()));
+          } else {
+            logger.warn(LocalizedMessage.create(
+              LocalizedStrings.InstantiatorRecoveryListener_INSTANTIATORRECOVERYTASK_ERROR_RECOVERING_INSTANTIATORS),
+              e);
+          }
+        } finally {
+          pool.releaseThreadLocalConnection();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/InternalPool.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/InternalPool.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/InternalPool.java
new file mode 100644
index 0000000..ce6b599
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/InternalPool.java
@@ -0,0 +1,34 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Map;
+
+import java.util.concurrent.ScheduledExecutorService;
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.internal.cache.PoolStats;
+
+/**
+ * The contract between a connection source and a connection pool.
+ * Provides methods for the connection source to access the cache
+ * and update the list of endpoints on the connection pool.
+ * @author dsmith
+ * @since 5.7
+ *
+ */
+public interface InternalPool extends Pool, ExecutablePool {
+  PoolStats getStats();
+  Map getEndpointMap();
+  EndpointManager getEndpointManager();
+  ScheduledExecutorService getBackgroundProcessor();
+  CancelCriterion getCancelCriterion();
+  boolean isDurableClient();
+  void detach();
+  String getPoolOrCacheCancelInProgress();
+}


Mime
View raw message