geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echobr...@apache.org
Subject [08/27] geode-native git commit: GEODE-2729: Remove global variables
Date Thu, 10 Aug 2017 15:20:16 GMT
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrEndpoint.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TcrEndpoint.cpp b/src/cppcache/src/TcrEndpoint.cpp
index 8e99010..ac480ea 100644
--- a/src/cppcache/src/TcrEndpoint.cpp
+++ b/src/cppcache/src/TcrEndpoint.cpp
@@ -42,15 +42,16 @@ This is replaced by the connect-timeout (times 3) system property for SR # 6525.
 */
 const char* TcrEndpoint::NC_Notification = "NC Notification";
 
-TcrEndpoint::TcrEndpoint(const std::string& name, CacheImpl* cache,
+TcrEndpoint::TcrEndpoint(const std::string& name, CacheImpl* cacheImpl,
                          ACE_Semaphore& failoverSema,
                          ACE_Semaphore& cleanupSema,
                          ACE_Semaphore& redundancySema, ThinClientBaseDM* DM,
                          bool isMultiUserMode)
     : m_needToConnectInLock(false),
       m_connectLockCond(m_connectLock),
-      m_maxConnections(
-          DistributedSystem::getSystemProperties()->javaConnectionPoolSize()),
+      m_maxConnections(cacheImpl->getDistributedSystem()
+                           .getSystemProperties()
+                           .javaConnectionPoolSize()),
       m_notifyConnection(0),
       m_notifyReceiver(0),
       m_numRegionListener(0),
@@ -67,7 +68,7 @@ TcrEndpoint::TcrEndpoint(const std::string& name, CacheImpl* cache,
       m_numRegions(0),
       m_pingTimeouts(0),
       m_notifyCount(0),
-      m_cache(cache),
+      m_cacheImpl(cacheImpl),
       m_failoverSema(failoverSema),
       m_cleanupSema(cleanupSema),
       m_notificationCleanupSema(0),
@@ -115,7 +116,9 @@ TcrEndpoint::~TcrEndpoint() {
 
 inline bool TcrEndpoint::needtoTakeConnectLock() {
 #ifdef __linux
-  if (DistributedSystem::getSystemProperties()->connectWaitTimeout() > 0) {
+  if (m_cacheImpl->getDistributedSystem()
+          .getSystemProperties()
+          .connectWaitTimeout() > 0) {
     return m_needToConnectInLock;  // once pipe or other socket error will take
                                    // lock to connect.
   }
@@ -130,9 +133,10 @@ GfErrType TcrEndpoint::createNewConnectionWL(TcrConnection*& newConn,
                                              bool isSecondary,
                                              uint32_t connectTimeout) {
   LOGFINE("TcrEndpoint::createNewConnectionWL");
-  uint32_t connectWaitTimeout =
-      DistributedSystem::getSystemProperties()->connectWaitTimeout() *
-      1000;  // need to change
+  uint32_t connectWaitTimeout = m_cacheImpl->getDistributedSystem()
+                                    .getSystemProperties()
+                                    .connectWaitTimeout() *
+                                1000;  // need to change
   ACE_Time_Value interval(0, connectWaitTimeout);
   ACE_Time_Value stopAt(ACE_OS::gettimeofday());
   stopAt += interval;
@@ -148,7 +152,8 @@ GfErrType TcrEndpoint::createNewConnectionWL(TcrConnection*& newConn,
     if (ret != -1) {  // got lock
       try {
         LOGFINE("TcrEndpoint::createNewConnectionWL got lock");
-        newConn = new TcrConnection(m_connected);
+        newConn =
+            new TcrConnection(m_cacheImpl->tcrConnectionManager(), m_connected);
         newConn->InitTcrConnection(this, m_name.c_str(), m_ports,
                                    isClientNotification, isSecondary,
                                    connectTimeout);
@@ -204,7 +209,8 @@ GfErrType TcrEndpoint::createNewConnection(
     try {
       if (newConn == nullptr) {
         if (!needtoTakeConnectLock() || !appThreadRequest) {
-          newConn = new TcrConnection(m_connected);
+          newConn = new TcrConnection(m_cacheImpl->tcrConnectionManager(),
+                                      m_connected);
           bool authenticate = newConn->InitTcrConnection(
               this, m_name.c_str(), m_ports, isClientNotification, isSecondary,
               connectTimeout);
@@ -231,6 +237,10 @@ GfErrType TcrEndpoint::createNewConnection(
           LOGFINE("Sending update notification message to endpoint %s",
                   m_name.c_str());
           TcrMessageUpdateClientNotification updateNotificationMsg(
+              newConn->getConnectionManager()
+                  .getCacheImpl()
+                  ->getCache()
+                  ->createDataOutput(),
               static_cast<int32_t>(newConn->getPort()));
           newConn->send(updateNotificationMsg.getMsgData(),
                         updateNotificationMsg.getMsgLength());
@@ -290,12 +300,12 @@ void TcrEndpoint::authenticateEndpoint(TcrConnection*& conn) {
   LOGDEBUG(
       "TcrEndpoint::authenticateEndpoint m_isAuthenticated  = %d "
       "this->m_baseDM = %d",
-      m_isAuthenticated, this->m_baseDM);
-  if (!m_isAuthenticated && this->m_baseDM) {
+      m_isAuthenticated, m_baseDM);
+  if (!m_isAuthenticated && m_baseDM) {
     this->setConnected();
     ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_endpointAuthenticationLock);
     GfErrType err = GF_NOERR;
-    PropertiesPtr creds = this->getCredentials();
+    PropertiesPtr creds = getCredentials();
 
     if (creds != nullptr) {
       LOGDEBUG("TcrEndpoint::authenticateEndpoint got creds from app = %d",
@@ -304,7 +314,8 @@ void TcrEndpoint::authenticateEndpoint(TcrConnection*& conn) {
       LOGDEBUG("TcrEndpoint::authenticateEndpoint no creds from app ");
     }
 
-    TcrMessageUserCredential request(creds, this->m_baseDM);
+    TcrMessageUserCredential request(
+        m_cacheImpl->getCache()->createDataOutput(), creds, m_baseDM);
 
     LOGDEBUG("request is created");
     TcrMessageReply reply(true, this->m_baseDM);
@@ -339,27 +350,17 @@ void TcrEndpoint::authenticateEndpoint(TcrConnection*& conn) {
 }
 
 PropertiesPtr TcrEndpoint::getCredentials() {
-  PropertiesPtr tmpSecurityProperties =
-      DistributedSystem::getSystemProperties()->getSecurityProperties();
+  const auto& distributedSystem = m_cacheImpl->getDistributedSystem();
+  const auto& tmpSecurityProperties =
+      distributedSystem.getSystemProperties().getSecurityProperties();
 
-  AuthInitializePtr authInitialize = DistributedSystem::m_impl->getAuthLoader();
-
-  if (authInitialize != nullptr) {
+  if (const auto& authInitialize = distributedSystem.m_impl->getAuthLoader()) {
     LOGFINER(
         "Acquired handle to AuthInitialize plugin, "
         "getting credentials for %s",
         m_name.c_str());
-    /* adongre
-     * CID 28899: Copy into fixed size buffer (STRING_OVERFLOW)
-     * You might overrun the 100 byte fixed-size string "tmpEndpoint" by copying
-     * the return value of
-     * "stlp_std::basic_string<char, stlp_std::char_traits<char>,
-     * stlp_std::allocator<char> >::c_str() const" without checking the length.
-     */
-    // char tmpEndpoint[100] = { '\0' } ;
-    // strcpy(tmpEndpoint, m_name.c_str());
-    PropertiesPtr tmpAuthIniSecurityProperties = authInitialize->getCredentials(
-        tmpSecurityProperties, /*tmpEndpoint*/ m_name.c_str());
+    const auto& tmpAuthIniSecurityProperties =
+        authInitialize->getCredentials(tmpSecurityProperties, m_name.c_str());
     LOGFINER("Done getting credentials");
     return tmpAuthIniSecurityProperties;
   }
@@ -372,9 +373,10 @@ ServerQueueStatus TcrEndpoint::getFreshServerQueueStatus(
   TcrConnection* newConn;
   ServerQueueStatus status = NON_REDUNDANT_SERVER;
 
-  err = createNewConnection(
-      newConn, false, false,
-      DistributedSystem::getSystemProperties()->connectTimeout());
+  err = createNewConnection(newConn, false, false,
+                            m_cacheImpl->getDistributedSystem()
+                                .getSystemProperties()
+                                .connectTimeout());
   if (err == GF_NOERR) {
     status = newConn->getServerQueueStatus(queueSize);
 
@@ -438,10 +440,11 @@ GfErrType TcrEndpoint::registerDM(bool clientNotification, bool isSecondary,
               m_name.c_str());
       for (int connNum = 0; connNum < maxConnections; ++connNum) {
         TcrConnection* newConn;
-        if ((err = createNewConnection(
-                 newConn, false, false,
-                 DistributedSystem::getSystemProperties()->connectTimeout(), 0,
-                 m_connected)) != GF_NOERR) {
+        if ((err = createNewConnection(newConn, false, false,
+                                       m_cacheImpl->getDistributedSystem()
+                                           .getSystemProperties()
+                                           .connectTimeout(),
+                                       0, m_connected)) != GF_NOERR) {
           m_connected = false;
           m_isActiveEndpoint = false;
           closeConnections();
@@ -471,10 +474,12 @@ GfErrType TcrEndpoint::registerDM(bool clientNotification, bool isSecondary,
       // setup notification channel for the first region
       ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock);
       if (m_numRegionListener == 0) {
-        if ((err = createNewConnection(
-                 m_notifyConnection, true, isSecondary,
-                 DistributedSystem::getSystemProperties()->connectTimeout() * 3,
-                 0)) != GF_NOERR) {
+        if ((err = createNewConnection(m_notifyConnection, true, isSecondary,
+                                       m_cacheImpl->getDistributedSystem()
+                                               .getSystemProperties()
+                                               .connectTimeout() *
+                                           3,
+                                       0)) != GF_NOERR) {
           m_connected = false;
           m_isActiveEndpoint = false;
           closeConnections();
@@ -545,7 +550,8 @@ void TcrEndpoint::pingServer(ThinClientPoolDM* poolDM) {
   }
 
   if (!m_msgSent && !m_pingSent) {
-    TcrMessagePing* pingMsg = TcrMessage::getPingMessage();
+    TcrMessagePing* pingMsg =
+        TcrMessage::getPingMessage(m_cacheImpl->getCache());
     TcrMessageReply reply(true, nullptr);
     LOGFINEST("Sending ping message to endpoint %s", m_name.c_str());
     GfErrType error;
@@ -584,19 +590,17 @@ void TcrEndpoint::pingServer(ThinClientPoolDM* poolDM) {
 }
 
 bool TcrEndpoint::checkDupAndAdd(EventIdPtr eventid) {
-  return m_cache->tcrConnectionManager().checkDupAndAdd(eventid);
+  return m_cacheImpl->tcrConnectionManager().checkDupAndAdd(eventid);
 }
 
 int TcrEndpoint::receiveNotification(volatile bool& isRunning) {
-  char* data = 0;
-
   LOGFINE("Started subscription channel for endpoint %s", m_name.c_str());
   while (isRunning) {
     TcrMessageReply* msg = nullptr;
     try {
       size_t dataLen;
       ConnErrType opErr = CONN_NOERR;
-      data = m_notifyConnection->receive(&dataLen, &opErr, 5);
+      auto data = m_notifyConnection->receive(&dataLen, &opErr, 5);
 
       if (opErr == CONN_IOERR) {
         // Endpoint is disconnected, this exception is expected
@@ -616,11 +620,12 @@ int TcrEndpoint::receiveNotification(volatile bool& isRunning) {
       }
 
       if (data) {
-        msg = new TcrMessageReply(true, nullptr);
+        msg = new TcrMessageReply(true, m_baseDM);
         msg->initCqMap();
         msg->setData(data, static_cast<int32_t>(dataLen),
-                     this->getDistributedMemberID());
-        data = nullptr;  // memory is released by TcrMessage setData().
+                     this->getDistributedMemberID(),
+                     *(m_cacheImpl->getSerializationRegistry()),
+                     *(m_cacheImpl->getMemberListForVersionStamp()));
         handleNotificationStats(static_cast<int64_t>(dataLen));
         LOGDEBUG("receive notification %d", msg->getMessageType());
 
@@ -644,7 +649,7 @@ int TcrEndpoint::receiveNotification(volatile bool& isRunning) {
           if (msg->getMessageType() != TcrMessage::CLIENT_MARKER) {
             const std::string& regionFullPath1 = msg->getRegionName();
             RegionPtr region1;
-            m_cache->getRegion(regionFullPath1.c_str(), region1);
+            m_cacheImpl->getRegion(regionFullPath1.c_str(), region1);
             if (region1 != nullptr &&
                 !static_cast<ThinClientRegion*>(region1.get())
                      ->getDistMgr()
@@ -670,7 +675,7 @@ int TcrEndpoint::receiveNotification(volatile bool& isRunning) {
 
         if (isMarker) {
           LOGFINE("Got a marker message on endpont %s", m_name.c_str());
-          m_cache->processMarker();
+          m_cacheImpl->processMarker();
           processMarker();
           GF_SAFE_DELETE(msg);
         } else {
@@ -678,14 +683,14 @@ int TcrEndpoint::receiveNotification(volatile bool& isRunning) {
           {
             const std::string& regionFullPath = msg->getRegionName();
             RegionPtr region;
-            m_cache->getRegion(regionFullPath.c_str(), region);
+            m_cacheImpl->getRegion(regionFullPath.c_str(), region);
             if (region != nullptr) {
               static_cast<ThinClientRegion*>(region.get())
                   ->receiveNotification(msg);
             } else {
               LOGWARN(
                   "Notification for region %s that does not exist in "
-                  "client cache.",
+                  "client cacheImpl.",
                   regionFullPath.c_str());
             }
           } else {
@@ -842,13 +847,16 @@ GfErrType TcrEndpoint::sendRequestConn(const TcrMessage& request,
     }
     size_t dataLen;
     LOGDEBUG("sendRequestConn: calling sendRequest");
-    char* data = conn->sendRequest(
-        request.getMsgData(), request.getMsgLength(), &dataLen,
-        request.getTimeout(), reply.getTimeout(), request.getMessageType());
+    auto data = conn->sendRequest(request.getMsgData(), request.getMsgLength(),
+                                  &dataLen, request.getTimeout(),
+                                  reply.getTimeout(), request.getMessageType());
     reply.setMessageTypeRequest(type);
-    reply.setData(data, static_cast<int32_t>(dataLen),
-                  this->getDistributedMemberID());  // memory is released by
-                                                    // TcrMessage setData().
+    reply.setData(
+        data, static_cast<int32_t>(dataLen), this->getDistributedMemberID(),
+        *(m_cacheImpl->getSerializationRegistry()),
+        *(m_cacheImpl
+              ->getMemberListForVersionStamp()));  // memory is released by
+                                                   // TcrMessage setData().
   }
 
   // reset idle timeout of the connection for pool connection manager
@@ -923,10 +931,11 @@ GfErrType TcrEndpoint::sendRequestWithRetry(
           LOGFINE(
               "Creating a new connection when connection-pool-size system "
               "property set to 0");
-          if ((error =
-                   createNewConnection(conn, false, false,
-                                       DistributedSystem::getSystemProperties()
-                                           ->connectTimeout())) != GF_NOERR) {
+          if ((error = createNewConnection(conn, false, false,
+                                           m_cacheImpl->getDistributedSystem()
+                                               .getSystemProperties()
+                                               .connectTimeout())) !=
+              GF_NOERR) {
             epFailure = true;
             continue;
           }
@@ -940,11 +949,12 @@ GfErrType TcrEndpoint::sendRequestWithRetry(
       createNewConn = false;
       if (!m_connected) {
         return GF_NOTCON;
-      } else if ((error = createNewConnection(
-                      conn, false, false,
-                      DistributedSystem::getSystemProperties()
-                          ->connectTimeout(),
-                      0, true)) != GF_NOERR) {
+      } else if ((error =
+                      createNewConnection(conn, false, false,
+                                          m_cacheImpl->getDistributedSystem()
+                                              .getSystemProperties()
+                                              .connectTimeout(),
+                                          0, true)) != GF_NOERR) {
         epFailure = true;
         continue;
       }
@@ -1227,8 +1237,9 @@ void TcrEndpoint::closeConnection(TcrConnection*& conn) {
 void TcrEndpoint::closeConnections() {
   m_opConnections.close();
   m_ports.clear();
-  m_maxConnections =
-      DistributedSystem::getSystemProperties()->javaConnectionPoolSize();
+  m_maxConnections = m_cacheImpl->getDistributedSystem()
+                         .getSystemProperties()
+                         .javaConnectionPoolSize();
 }
 
 /*
@@ -1245,7 +1256,7 @@ void TcrEndpoint::closeNotification() {
   LOGFINEST("Closing subscription channel for endpoint %s", m_name.c_str());
   m_notifyConnection->close();
   m_notifyReceiver->stopNoblock();
-  TcrConnectionManager& tccm = m_cache->tcrConnectionManager();
+  TcrConnectionManager& tccm = m_cacheImpl->tcrConnectionManager();
   tccm.addNotificationForDeletion(m_notifyReceiver, m_notifyConnection,
                                   m_notificationCleanupSema);
   m_notifyCount++;
@@ -1328,11 +1339,11 @@ void TcrEndpoint::setServerQueueStatus(ServerQueueStatus queueStatus,
 
 bool TcrEndpoint::isQueueHosted() { return m_isQueueHosted; }
 void TcrEndpoint::processMarker() {
-  m_cache->tcrConnectionManager().processMarker();
+  m_cacheImpl->tcrConnectionManager().processMarker();
 }
 
 QueryServicePtr TcrEndpoint::getQueryService() {
-  return m_cache->getQueryService(true);
+  return m_cacheImpl->getQueryService(true);
 }
 void TcrEndpoint::sendRequestForChunkedResponse(const TcrMessage& request,
                                                 TcrMessageReply& reply,

http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrEndpoint.hpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TcrEndpoint.hpp b/src/cppcache/src/TcrEndpoint.hpp
index 117535b..e259d40 100644
--- a/src/cppcache/src/TcrEndpoint.hpp
+++ b/src/cppcache/src/TcrEndpoint.hpp
@@ -44,9 +44,9 @@ class ThinClientPoolDM;
 class CPPCACHE_EXPORT TcrEndpoint {
  public:
   TcrEndpoint(
-      const std::string& name, CacheImpl* cache, ACE_Semaphore& failoverSema,
-      ACE_Semaphore& cleanupSema, ACE_Semaphore& redundancySema,
-      ThinClientBaseDM* dm = nullptr,
+      const std::string& name, CacheImpl* cacheImpl,
+      ACE_Semaphore& failoverSema, ACE_Semaphore& cleanupSema,
+      ACE_Semaphore& redundancySema, ThinClientBaseDM* dm = nullptr,
       bool isMultiUserMode = false);  // TODO: need to look for endpoint case
 
   /* adongre
@@ -206,6 +206,7 @@ class CPPCACHE_EXPORT TcrEndpoint {
   ACE_Recursive_Thread_Mutex m_notifyReceiverLock;
   virtual bool handleIOException(const std::string& message,
                                  TcrConnection*& conn, bool isBgThread = false);
+  CacheImpl* m_cacheImpl;
 
  private:
   int64_t m_uniqueId;
@@ -232,7 +233,6 @@ class CPPCACHE_EXPORT TcrEndpoint {
 
   int m_notifyCount;
 
-  CacheImpl* m_cache;
   ACE_Semaphore& m_failoverSema;
   ACE_Semaphore& m_cleanupSema;
   ACE_Semaphore m_notificationCleanupSema;

http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrMessage.cpp
----------------------------------------------------------------------
diff --git a/src/cppcache/src/TcrMessage.cpp b/src/cppcache/src/TcrMessage.cpp
index cfba9c7..10f4986 100644
--- a/src/cppcache/src/TcrMessage.cpp
+++ b/src/cppcache/src/TcrMessage.cpp
@@ -32,6 +32,7 @@
 #include "TXState.hpp"
 #include "DiskStoreId.hpp"
 #include "DiskVersionTag.hpp"
+#include "CacheRegionHelper.hpp"
 
 using namespace apache::geode::client;
 static const uint32_t REGULAR_EXPRESSION =
@@ -42,49 +43,28 @@ uint32_t g_headerLen = 17;
 }  // namespace
 
 // AtomicInc TcrMessage::m_transactionId = 0;
-TcrMessagePing* TcrMessage::m_pingMsg = nullptr;
-TcrMessage* TcrMessage::m_closeConnMsg = nullptr;
-TcrMessage* TcrMessage::m_allEPDisconnected = nullptr;
 uint8_t* TcrMessage::m_keepalive = nullptr;
 const int TcrMessage::m_flag_empty = 0x01;
 const int TcrMessage::m_flag_concurrency_checks = 0x02;
 
-bool TcrMessage::init() {
-  bool ret = true;
-  if (m_pingMsg == nullptr) {
-    try {
-      m_pingMsg = new TcrMessagePing(true);
-      m_closeConnMsg = new TcrMessageCloseConnection(true);
-
-    } catch (std::exception& ex) {
-      ret = false;
-      LOGERROR(ex.what());
-    } catch (Exception& ex) {
-      ret = false;
-      LOGERROR(ex.getMessage());
-    } catch (...) {
-      ret = false;
-      LOGERROR("unknown exception");
-    }
-  }
-  if (m_allEPDisconnected == nullptr) {
-    m_allEPDisconnected = new TcrMessageReply(true, nullptr);
-  }
-  return ret;
+TcrMessagePing* TcrMessage::getPingMessage(Cache* cache) {
+  static auto pingMsg = new TcrMessagePing(cache->createDataOutput(), true);
+  return pingMsg;
 }
 
-void TcrMessage::cleanup() {
-  GF_SAFE_DELETE(m_pingMsg);
-  GF_SAFE_DELETE(m_closeConnMsg);
+TcrMessage* TcrMessage::getAllEPDisMess() {
+  static auto allEPDisconnected = new TcrMessageReply(true, nullptr);
+  return allEPDisconnected;
 }
 
-/* we need a static method to generate ping */
-TcrMessagePing* TcrMessage::getPingMessage() { return m_pingMsg; }
-
-TcrMessage* TcrMessage::getAllEPDisMess() { return m_allEPDisconnected; }
-TcrMessage* TcrMessage::getCloseConnMessage() { return m_closeConnMsg; }
+TcrMessage* TcrMessage::getCloseConnMessage(Cache* cache) {
+  static auto closeConnMsg =
+      new TcrMessageCloseConnection(cache->createDataOutput(), true);
+  return closeConnMsg;
+}
 
 void TcrMessage::setKeepAlive(bool keepalive) {
+  // TODO global
   if (TcrMessage::m_keepalive != nullptr) {
     *TcrMessage::m_keepalive = keepalive ? 1 : 0;
   }
@@ -170,8 +150,9 @@ void TcrMessage::readPrMetaData(DataInput& input) {
   }
 }
 
-VersionTagPtr TcrMessage::readVersionTagPart(DataInput& input,
-                                             uint16_t endpointMemId) {
+VersionTagPtr TcrMessage::readVersionTagPart(
+    DataInput& input, uint16_t endpointMemId,
+    MemberListForVersionStamp& memberListForVersionStamp) {
   int8_t isObj;
   input.read(&isObj);
   VersionTagPtr versionTag;
@@ -179,7 +160,7 @@ VersionTagPtr TcrMessage::readVersionTagPart(DataInput& input,
   if (isObj == GeodeTypeIds::NullObj) return versionTag;
 
   if (isObj == GeodeTypeIdsImpl::FixedIDByte) {
-    versionTag = std::make_shared<VersionTag>();
+    versionTag = std::make_shared<VersionTag>(memberListForVersionStamp);
     int8_t fixedId;
     input.read(&fixedId);
     if (fixedId == GeodeTypeIdsImpl::VersionTag) {
@@ -191,7 +172,7 @@ VersionTagPtr TcrMessage::readVersionTagPart(DataInput& input,
     int16_t fixedId;
     input.readInt(&fixedId);
     if (fixedId == GeodeTypeIdsImpl::DiskVersionTag) {
-      DiskVersionTag* disk = new DiskVersionTag();
+      DiskVersionTag* disk = new DiskVersionTag(memberListForVersionStamp);
       disk->fromData(input);
       versionTag.reset(disk);
       return versionTag;
@@ -200,14 +181,17 @@ VersionTagPtr TcrMessage::readVersionTagPart(DataInput& input,
   return versionTag;
 }
 
-void TcrMessage::readVersionTag(DataInput& input, uint16_t endpointMemId) {
+void TcrMessage::readVersionTag(
+    DataInput& input, uint16_t endpointMemId,
+    MemberListForVersionStamp& memberListForVersionStamp) {
   int32_t lenObj;
   int8_t isObj;
   input.readInt(&lenObj);
   input.read(&isObj);
 
   if (lenObj == 0) return;
-  auto versionTag = TcrMessage::readVersionTagPart(input, endpointMemId);
+  auto versionTag = TcrMessage::readVersionTagPart(input, endpointMemId,
+                                                   memberListForVersionStamp);
   this->setVersionTag(versionTag);
 }
 
@@ -386,9 +370,10 @@ void TcrMessage::readUniqueIDObjectPart(DataInput& input) {
 int64_t TcrMessage::getConnectionId(TcrConnection* conn) {
   if (m_connectionIDBytes != nullptr) {
     CacheableBytesPtr tmp = conn->decryptBytes(m_connectionIDBytes);
-    DataInput di(tmp->value(), tmp->length());
+    auto di = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataInput(
+              tmp->value(), tmp->length());
     int64_t connid;
-    di.readInt(&connid);
+    di->readInt(&connid);
     return connid;
   } else {
     LOGWARN("Returning 0 as internal connection ID msgtype = %d ", m_msgType);
@@ -402,9 +387,10 @@ int64_t TcrMessage::getUniqueId(TcrConnection* conn) {
 
     CacheableBytesPtr tmp = conn->decryptBytes(encryptBytes);
 
-    DataInput di(tmp->value(), tmp->length());
+    auto di = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataInput(
+              tmp->value(), tmp->length());
     int64_t uniqueid;
-    di.readInt(&uniqueid);
+    di->readInt(&uniqueid);
 
     return uniqueid;
   }
@@ -859,7 +845,8 @@ void TcrMessage::processChunk(const uint8_t* bytes, int32_t len,
       } else if (m_msgTypeRequest == TcrMessage::PUTALL ||
                  m_msgTypeRequest == TcrMessage::PUT_ALL_WITH_CALLBACK) {
         TcrChunkedContext* chunk = new TcrChunkedContext(
-            bytes, len, m_chunkedResult, isLastChunkAndisSecurityHeader);
+            bytes, len, m_chunkedResult, isLastChunkAndisSecurityHeader,
+            m_tcdm->getConnectionManager().getCacheImpl()->getCache());
         m_chunkedResult->setEndpointMemId(endpointmemId);
         m_tcdm->queueChunk(chunk);
         if (bytes == nullptr) {
@@ -882,7 +869,8 @@ void TcrMessage::processChunk(const uint8_t* bytes, int32_t len,
       if (m_chunkedResult != nullptr) {
         LOGDEBUG("tcrmessage in case22 ");
         TcrChunkedContext* chunk = new TcrChunkedContext(
-            bytes, len, m_chunkedResult, isLastChunkAndisSecurityHeader);
+            bytes, len, m_chunkedResult, isLastChunkAndisSecurityHeader,
+            m_tcdm->getConnectionManager().getCacheImpl()->getCache());
         m_chunkedResult->setEndpointMemId(endpointmemId);
         m_tcdm->queueChunk(chunk);
         if (bytes == nullptr) {
@@ -930,9 +918,10 @@ void TcrMessage::processChunk(const uint8_t* bytes, int32_t len,
     case TcrMessage::EXCEPTION: {
       if (bytes != nullptr) {
         DeleteArray<const uint8_t> delChunk(bytes);
-        DataInput input(bytes, len);
-        readExceptionPart(input, isLastChunkAndisSecurityHeader);
-        readSecureObjectPart(input, false, true,
+        auto input = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataInput(
+                  bytes, len);
+        readExceptionPart(*input, isLastChunkAndisSecurityHeader);
+        readSecureObjectPart(*input, false, true,
                              isLastChunkAndisSecurityHeader);
       }
       break;
@@ -995,27 +984,31 @@ void TcrMessage::chunkSecurityHeader(int skipPart, const uint8_t* bytes,
                                      uint8_t isLastChunkAndSecurityHeader) {
   LOGDEBUG("TcrMessage::chunkSecurityHeader:: skipParts = %d", skipPart);
   if ((isLastChunkAndSecurityHeader & 0x3) == 0x3) {
-    DataInput di(bytes, len);
-    skipParts(di, skipPart);
-    readSecureObjectPart(di, false, true, isLastChunkAndSecurityHeader);
+    auto di = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataInput(
+              bytes, len);
+    skipParts(*di, skipPart);
+    readSecureObjectPart(*di, false, true, isLastChunkAndSecurityHeader);
   }
 }
 
-void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len,
-                                         uint16_t endpointMemId) {
-  DataInput input((uint8_t*)bytearray, len);
+void TcrMessage::handleByteArrayResponse(
+    const char* bytearray, int32_t len, uint16_t endpointMemId,
+    const SerializationRegistry& serializationRegistry,
+    MemberListForVersionStamp& memberListForVersionStamp) {
+  auto input = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataInput(
+                  (uint8_t*)bytearray, len);
   // TODO:: this need to make sure that pool is there
   //  if(m_tcdm == nullptr)
   //  throw IllegalArgumentException("Pool is nullptr in TcrMessage");
-  input.setPoolName(getPoolName());
-  input.readInt(&m_msgType);
+  input->setPoolName(getPoolName());
+  input->readInt(&m_msgType);
   int32_t msglen;
-  input.readInt(&msglen);
+  input->readInt(&msglen);
   int32_t numparts;
-  input.readInt(&numparts);
-  input.readInt(&m_txId);
+  input->readInt(&numparts);
+  input->readInt(&m_txId);
   int8_t earlyack;
-  input.read(&earlyack);
+  input->read(&earlyack);
   LOGDEBUG(
       "handleByteArrayResponse m_msgType = %d isSecurityOn = %d requesttype "
       "=%d",
@@ -1032,46 +1025,46 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len,
   switch (m_msgType) {
     case TcrMessage::RESPONSE: {
       if (m_msgTypeRequest == TcrMessage::CONTAINS_KEY) {
-        readBooleanPartAsObject(input, &m_boolValue);
+        readBooleanPartAsObject(*input, &m_boolValue);
       } else if (m_msgTypeRequest == TcrMessage::USER_CREDENTIAL_MESSAGE) {
-        readUniqueIDObjectPart(input);
+        readUniqueIDObjectPart(*input);
       } else if (m_msgTypeRequest == TcrMessage::GET_PDX_ID_FOR_TYPE ||
                  m_msgTypeRequest == TcrMessage::GET_PDX_ID_FOR_ENUM) {
         // int will come in response
         uint32_t typeId;
-        readIntPart(input, &typeId);
+        readIntPart(*input, &typeId);
         m_value = CacheableInt32::create(typeId);
       } else if (m_msgTypeRequest == TcrMessage::GET_PDX_TYPE_BY_ID) {
         // PdxType will come in response
-        input.advanceCursor(5);  // part header
+        input->advanceCursor(5);  // part header
         m_value =
-            SerializationRegistry::deserialize(input, GeodeTypeIds::PdxType);
+            serializationRegistry.deserialize(*input, GeodeTypeIds::PdxType);
       } else if (m_msgTypeRequest == TcrMessage::GET_PDX_ENUM_BY_ID) {
         // PdxType will come in response
-        input.advanceCursor(5);  // part header
-        m_value = SerializationRegistry::deserialize(input);
+        input->advanceCursor(5);  // part header
+        m_value = serializationRegistry.deserialize(*input);
       } else if (m_msgTypeRequest == TcrMessage::GET_FUNCTION_ATTRIBUTES) {
         int32_t lenObj;
-        input.readInt(&lenObj);
+        input->readInt(&lenObj);
         int8_t isObj;
-        input.read(&isObj);
+        input->read(&isObj);
         int8_t hR;
-        input.read(&hR);
+        input->read(&hR);
         int8_t isHA;
-        input.read(&isHA);
+        input->read(&isHA);
         int8_t oFW;
-        input.read(&oFW);
+        input->read(&oFW);
         m_functionAttributes = new std::vector<int8_t>();
         m_functionAttributes->push_back(hR);
         m_functionAttributes->push_back(isHA);
         m_functionAttributes->push_back(oFW);
       } else if (m_msgTypeRequest == TcrMessage::REQUEST) {
         int32_t receivednumparts = 2;
-        readObjectPart(input);
+        readObjectPart(*input);
         uint32_t flag = 0;
-        readIntPart(input, &flag);
+        readIntPart(*input, &flag);
         if (flag & 0x01) {
-          readCallbackObjectPart(input);
+          readCallbackObjectPart(*input);
           receivednumparts++;
         }
 
@@ -1080,7 +1073,7 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len,
         }
 
         if (flag & 0x02) {
-          readVersionTag(input, endpointMemId);
+          readVersionTag(*input, endpointMemId, memberListForVersionStamp);
           receivednumparts++;
         }
 
@@ -1088,37 +1081,37 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len,
           m_value = CacheableToken::tombstone();
         }
 
-        if (numparts > receivednumparts) readPrMetaData(input);
+        if (numparts > receivednumparts) readPrMetaData(*input);
 
       } else if (m_decodeAll) {
-        readObjectPart(input);
+        readObjectPart(*input);
         if (numparts == 2) {
           if (m_isCallBackArguement) {
-            readCallbackObjectPart(input);
+            readCallbackObjectPart(*input);
           } else {
             int32_t lenObj;
-            input.readInt(&lenObj);
+            input->readInt(&lenObj);
             bool isObj;
-            input.readBoolean(&isObj);
-            input.read(&m_metaDataVersion);
+            input->readBoolean(&isObj);
+            input->read(&m_metaDataVersion);
             if (lenObj == 2) {
-              input.read(&m_serverGroupVersion);
+              input->read(&m_serverGroupVersion);
               LOGDEBUG(
                   "Single-hop m_serverGroupVersion in message response is %d",
                   m_serverGroupVersion);
             }
           }
         } else if (numparts > 2) {
-          skipParts(input, 1);
+          skipParts(*input, 1);
           int32_t lenObj;
-          input.readInt(&lenObj);
+          input->readInt(&lenObj);
           bool isObj;
-          input.readBoolean(&isObj);
-          input.read(&m_metaDataVersion);
+          input->readBoolean(&isObj);
+          input->read(&m_metaDataVersion);
           LOGFINE("Single-hop metadata version in message response is %d",
                   m_metaDataVersion);
           if (lenObj == 2) {
-            input.read(&m_serverGroupVersion);
+            input->read(&m_serverGroupVersion);
             LOGDEBUG(
                 "Single-hop m_serverGroupVersion in message response is %d",
                 m_serverGroupVersion);
@@ -1131,16 +1124,16 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len,
     case TcrMessage::EXCEPTION: {
       uint8_t lastChunk = static_cast<uint8_t>(numparts);
       lastChunk = (lastChunk << 5);
-      readExceptionPart(input, lastChunk);
+      readExceptionPart(*input, lastChunk);
       // if (isSecurityOn)
-      // readSecureObjectPart( input );
+      // readSecureObjectPart( *input );
       break;
     }
 
     case TcrMessage::INVALID: {
       // Read the string in the reply
       LOGWARN("Received invalid message type as reply from server");
-      readObjectPart(input, true);
+      readObjectPart(*input, true);
       break;
     }
 
@@ -1170,41 +1163,43 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len,
     case TcrMessage::REPLY: {
       switch (m_msgTypeRequest) {
         case TcrMessage::PUT: {
-          readPrMetaData(input);
+          readPrMetaData(*input);
           uint32_t flags = 0;
-          readIntPart(input, &flags);
+          readIntPart(*input, &flags);
           if (flags & 0x01) {  //  has old value
-            readOldValue(input);
+            readOldValue(*input);
           }
           if (flags & 0x04) {
-            readVersionTag(input, endpointMemId);
+            readVersionTag(*input, endpointMemId, memberListForVersionStamp);
           }
           break;
         }
         case TcrMessage::INVALIDATE: {
           uint32_t flags = 0;
-          readIntPart(input, &flags);
-          if (flags & 0x01) readVersionTag(input, endpointMemId);
-          readPrMetaData(input);
+          readIntPart(*input, &flags);
+          if (flags & 0x01)
+            readVersionTag(*input, endpointMemId, memberListForVersionStamp);
+          readPrMetaData(*input);
 
           break;
         }
         case TcrMessage::DESTROY: {
           uint32_t flags = 0;
-          readIntPart(input, &flags);
-          if (flags & 0x01) readVersionTag(input, endpointMemId);
-          readPrMetaData(input);
+          readIntPart(*input, &flags);
+          if (flags & 0x01)
+            readVersionTag(*input, endpointMemId, memberListForVersionStamp);
+          readPrMetaData(*input);
           // skip the Destroy65.java response entryNotFound int part so
           // that the readSecureObjectPart() call below gets the security part
-          // skipParts(input, 1);
-          readIntPart(input, &m_entryNotFound);
+          // skipParts(*input, 1);
+          readIntPart(*input, &m_entryNotFound);
           LOGDEBUG("Inside TcrMessage::REPLY::DESTROY m_entryNotFound = %d ",
                    m_entryNotFound);
           break;
         }
         case TcrMessage::PING:
         default: {
-          readPrMetaData(input);
+          readPrMetaData(*input);
           break;
         }
       }
@@ -1213,36 +1208,36 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len,
     case TcrMessage::LOCAL_INVALIDATE:
     case TcrMessage::LOCAL_DESTROY: {
       int32_t regionLen;
-      input.readInt(&regionLen);
+      input->readInt(&regionLen);
       int8_t isObj;
-      input.read(&isObj);
+      input->read(&isObj);
       char* regname = nullptr;
       regname = new char[regionLen + 1];
       DeleteArray<char> delRegName(regname);
-      input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
+      input->readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
       regname[regionLen] = '\0';
       m_regionName = regname;
 
-      readKeyPart(input);
+      readKeyPart(*input);
 
-      // skipParts(input, 1); // skip callbackarg parts
-      readCallbackObjectPart(input);
-      readVersionTag(input, endpointMemId);
-      readBooleanPartAsObject(input, &m_isInterestListPassed);
-      readBooleanPartAsObject(input, &m_hasCqsPart);
+      // skipParts(*input, 1); // skip callbackarg parts
+      readCallbackObjectPart(*input);
+      readVersionTag(*input, endpointMemId, memberListForVersionStamp);
+      readBooleanPartAsObject(*input, &m_isInterestListPassed);
+      readBooleanPartAsObject(*input, &m_hasCqsPart);
       if (m_hasCqsPart) {
         if (m_msgType == TcrMessage::LOCAL_INVALIDATE) {
-          readIntPart(input, &m_msgTypeForCq);
+          readIntPart(*input, &m_msgTypeForCq);
         } else {
           m_msgTypeForCq = static_cast<uint32_t>(m_msgType);
         }
         // LOGINFO("got cq local local_invalidate/local_destroy read
         // m_hasCqsPart");
-        readCqsPart(input);
+        readCqsPart(*input);
       }
 
       // read eventid part
-      readEventIdPart(input, false);
+      readEventIdPart(*input, false);
 
       break;
     }
@@ -1250,79 +1245,80 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len,
     case TcrMessage::LOCAL_CREATE:
     case TcrMessage::LOCAL_UPDATE: {
       int32_t regionLen;
-      input.readInt(&regionLen);
+      input->readInt(&regionLen);
       int8_t isObj;
-      input.read(&isObj);
+      input->read(&isObj);
       char* regname = nullptr;
       regname = new char[regionLen + 1];
       DeleteArray<char> delRegName(regname);
-      input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
+      input->readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
       regname[regionLen] = '\0';
       m_regionName = regname;
 
-      readKeyPart(input);
+      readKeyPart(*input);
       //  Read delta flag
       bool isDelta = false;
-      readBooleanPartAsObject(input, &isDelta);
+      readBooleanPartAsObject(*input, &isDelta);
       if (isDelta) {
-        input.readInt(&m_deltaBytesLen);
+        input->readInt(&m_deltaBytesLen);
 
         int8_t isObj;
-        input.read(&isObj);
+        input->read(&isObj);
         m_deltaBytes = new uint8_t[m_deltaBytesLen];
-        input.readBytesOnly(m_deltaBytes, m_deltaBytesLen);
-        m_delta = new DataInput(m_deltaBytes, m_deltaBytesLen);
+        input->readBytesOnly(m_deltaBytes, m_deltaBytesLen);
+        m_delta = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataInput(
+            m_deltaBytes, m_deltaBytesLen);
       } else {
-        readObjectPart(input);
+        readObjectPart(*input);
       }
 
       // skip callbackarg part
-      // skipParts(input, 1);
-      readCallbackObjectPart(input);
-      readVersionTag(input, endpointMemId);
-      readBooleanPartAsObject(input, &m_isInterestListPassed);
-      readBooleanPartAsObject(input, &m_hasCqsPart);
+      // skipParts(*input, 1);
+      readCallbackObjectPart(*input);
+      readVersionTag(*input, endpointMemId, memberListForVersionStamp);
+      readBooleanPartAsObject(*input, &m_isInterestListPassed);
+      readBooleanPartAsObject(*input, &m_hasCqsPart);
 
       if (m_hasCqsPart) {
         // LOGINFO("got cq local_create/local_create");
-        readCqsPart(input);
+        readCqsPart(*input);
         m_msgTypeForCq = static_cast<uint32_t>(m_msgType);
       }
 
       // read eventid part
-      readEventIdPart(input, false);
+      readEventIdPart(*input, false);
       GF_SAFE_DELETE_ARRAY(regname);  // COVERITY ---> 30299 Resource leak
 
       break;
     }
     case TcrMessage::CLIENT_MARKER: {
       // dont skip (non-existent) callbackarg part, just read eventid part
-      readEventIdPart(input, false);
+      readEventIdPart(*input, false);
       break;
     }
 
     case TcrMessage::LOCAL_DESTROY_REGION:
     case TcrMessage::CLEAR_REGION: {
       int32_t regionLen;
-      input.readInt(&regionLen);
+      input->readInt(&regionLen);
       int8_t isObj;
-      input.read(&isObj);
+      input->read(&isObj);
       char* regname = nullptr;
       regname = new char[regionLen + 1];
       DeleteArray<char> delRegName(regname);
-      input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
+      input->readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
       regname[regionLen] = '\0';
       m_regionName = regname;
       // skip callbackarg part
-      // skipParts(input, 1);
-      readCallbackObjectPart(input);
-      readBooleanPartAsObject(input, &m_hasCqsPart);
+      // skipParts(*input, 1);
+      readCallbackObjectPart(*input);
+      readBooleanPartAsObject(*input, &m_hasCqsPart);
       if (m_hasCqsPart) {
         // LOGINFO("got cq region_destroy read m_hasCqsPart");
-        readCqsPart(input);
+        readCqsPart(*input);
       }
       // read eventid part
-      readEventIdPart(input, false);
+      readEventIdPart(*input, false);
       break;
     }
 
@@ -1334,27 +1330,27 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len,
       m_metadata = new std::vector<std::vector<BucketServerLocationPtr> >();
       for (int32_t i = 0; i < numparts; i++) {
         int32_t bits32;
-        input.readInt(&bits32);  // partlen;
+        input->readInt(&bits32);  // partlen;
         int8_t bits8;
-        input.read(&bits8);  // isObj;
-        input.read(&bits8);  // cacheable vector typeid
+        input->read(&bits8);  // isObj;
+        input->read(&bits8);  // cacheable vector typeid
         LOGDEBUG("Expected typeID %d, got %d", GeodeTypeIds::CacheableArrayList,
                  bits8);
 
-        input.readArrayLen(&bits32);  // array length
+        input->readArrayLen(&bits32);  // array length
         LOGDEBUG("Array length = %d ", bits32);
         if (bits32 > 0) {
           std::vector<BucketServerLocationPtr> bucketServerLocations;
           for (int32_t index = 0; index < bits32; index++) {
             int8_t header;
-            input.read(&header);  // ignore DS typeid
-            input.read(&header);  // ignore CLASS typeid
-            input.read(&header);  // ignore string typeid
+            input->read(&header);  // ignore DS typeid
+            input->read(&header);  // ignore CLASS typeid
+            input->read(&header);  // ignore string typeid
             uint16_t classLen;
-            input.readInt(&classLen);  // Read classLen
-            input.advanceCursor(classLen);
+            input->readInt(&classLen);  // Read classLen
+            input->advanceCursor(classLen);
             auto location = std::make_shared<BucketServerLocation>();
-            location->fromData(input);
+            location->fromData(*input);
             LOGFINE("location contains %d\t%s\t%d\t%d\t%s",
                     location->getBucketId(), location->getServerName().c_str(),
                     location->getPort(), location->getVersion(),
@@ -1375,43 +1371,43 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len,
 
     case TcrMessage::RESPONSE_CLIENT_PARTITION_ATTRIBUTES: {
       int32_t bits32;
-      input.readInt(&bits32);  // partlen;
+      input->readInt(&bits32);  // partlen;
       int8_t bits8;
-      input.read(&bits8);  // isObj;
+      input->read(&bits8);  // isObj;
 
-      m_bucketCount = input.readNativeInt32();  // PART1 = bucketCount
+      m_bucketCount = input->readNativeInt32();  // PART1 = bucketCount
 
-      input.readInt(&bits32);  // partlen;
-      input.read(&bits8);      // isObj;
+      input->readInt(&bits32);  // partlen;
+      input->read(&bits8);      // isObj;
       if (bits32 > 0) {
-        input.readNativeString(m_colocatedWith);  // PART2 = colocatedwith
+        input->readNativeString(m_colocatedWith);  // PART2 = colocatedwith
       }
 
       if (numparts == 4) {
-        input.readInt(&bits32);  // partlen;
-        input.read(&bits8);      // isObj;
+        input->readInt(&bits32);  // partlen;
+        input->read(&bits8);      // isObj;
         if (bits32 > 0) {
-          input.readNativeString(
+          input->readNativeString(
               m_partitionResolverName);  // PART3 = partitionresolvername
         }
 
-        input.readInt(&bits32);  // partlen;
-        input.read(&bits8);      // isObj;
-        input.read(&bits8);      // cacheable CacheableHashSet typeid
+        input->readInt(&bits32);  // partlen;
+        input->read(&bits8);      // isObj;
+        input->read(&bits8);      // cacheable CacheableHashSet typeid
 
-        input.readArrayLen(&bits32);  // array length
+        input->readArrayLen(&bits32);  // array length
         if (bits32 > 0) {
           m_fpaSet = new std::vector<FixedPartitionAttributesImplPtr>();
           for (int32_t index = 0; index < bits32; index++) {
             int8_t header;
-            input.read(&header);  // ignore DS typeid
-            input.read(&header);  // ignore CLASS typeid
-            input.read(&header);  // ignore string typeid
+            input->read(&header);  // ignore DS typeid
+            input->read(&header);  // ignore CLASS typeid
+            input->read(&header);  // ignore string typeid
             uint16_t classLen;
-            input.readInt(&classLen);  // Read classLen
-            input.advanceCursor(classLen);
+            input->readInt(&classLen);  // Read classLen
+            input->advanceCursor(classLen);
             auto fpa = std::make_shared<FixedPartitionAttributesImpl>();
-            fpa->fromData(input);  // PART4 = set of FixedAttributes.
+            fpa->fromData(*input);  // PART4 = set of FixedAttributes.
             LOGDEBUG("fpa contains %d\t%s\t%d\t%d", fpa->getNumBuckets(),
                      fpa->getPartitionName().c_str(), fpa->isPrimary(),
                      fpa->getStartingBucketID());
@@ -1424,38 +1420,38 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len,
     case TcrMessage::TOMBSTONE_OPERATION: {
       uint32_t tombstoneOpType;
       int32_t regionLen;
-      input.readInt(&regionLen);
+      input->readInt(&regionLen);
       int8_t isObj;
-      input.read(&isObj);
+      input->read(&isObj);
       char* regname = nullptr;
 
       regname = new char[regionLen + 1];
       DeleteArray<char> delRegName(regname);
-      input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
+      input->readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen);
       regname[regionLen] = '\0';
       m_regionName = regname;
-      readIntPart(input, &tombstoneOpType);  // partlen;
+      readIntPart(*input, &tombstoneOpType);  // partlen;
       int32_t len;
-      input.readInt(&len);
-      input.read(&isObj);
+      input->readInt(&len);
+      input->read(&isObj);
 
       if (tombstoneOpType == 0) {
         if (m_tombstoneVersions == nullptr) {
           m_tombstoneVersions = CacheableHashMap::create();
         }
-        readHashMapForGCVersions(input, m_tombstoneVersions);
+        readHashMapForGCVersions(*input, m_tombstoneVersions);
       } else if (tombstoneOpType == 1) {
         if (m_tombstoneKeys == nullptr) {
           m_tombstoneKeys = CacheableHashSet::create();
         }
-        // input.readObject(m_tombstoneKeys);
-        readHashSetForGCVersions(input, m_tombstoneKeys);
+        // input->readObject(m_tombstoneKeys);
+        readHashSetForGCVersions(*input, m_tombstoneKeys);
       } else {
         LOGERROR("Failed to read the tombstone versions");
         break;
       }
       // readEventId Part
-      readEventIdPart(input, false);
+      readEventIdPart(*input, false);
       break;
     }
     case TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES_ERROR: {
@@ -1483,12 +1479,14 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len,
       throw MessageException("handleByteArrayResponse: unknown message type");
   }
   LOGDEBUG("handleByteArrayResponse earlyack = %d ", earlyack);
-  if (earlyack & 0x2) readSecureObjectPart(input);
+  if (earlyack & 0x2) readSecureObjectPart(*input);
 }
 
 TcrMessageDestroyRegion::TcrMessageDestroyRegion(
-    const Region* region, const UserDataPtr& aCallbackArgument,
-    int messageResponsetimeout, ThinClientBaseDM* connectionDM) {
+    std::unique_ptr<DataOutput> dataOutput, const Region* region,
+    const UserDataPtr& aCallbackArgument, int messageResponsetimeout,
+    ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::DESTROY_REGION;
   m_tcdm = connectionDM;
   m_regionName =
@@ -1519,8 +1517,10 @@ TcrMessageDestroyRegion::TcrMessageDestroyRegion(
 }
 
 TcrMessageClearRegion::TcrMessageClearRegion(
-    const Region* region, const UserDataPtr& aCallbackArgument,
-    int messageResponsetimeout, ThinClientBaseDM* connectionDM) {
+    std::unique_ptr<DataOutput> dataOutput, const Region* region,
+    const UserDataPtr& aCallbackArgument, int messageResponsetimeout,
+    ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::CLEAR_REGION;
   m_tcdm = connectionDM;
   m_regionName =
@@ -1553,10 +1553,11 @@ TcrMessageClearRegion::TcrMessageClearRegion(
   writeMessageLength();
 }
 
-TcrMessageQuery::TcrMessageQuery(const std::string& regionName,
+TcrMessageQuery::TcrMessageQuery(std::unique_ptr<DataOutput> dataOutput,
+                                 const std::string& regionName,
                                  int messageResponsetimeout,
                                  ThinClientBaseDM* connectionDM) {
-  m_request = new DataOutput;
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::QUERY;
   m_tcdm = connectionDM;
   m_regionName = regionName;  // this is querystri;
@@ -1577,9 +1578,11 @@ TcrMessageQuery::TcrMessageQuery(const std::string& regionName,
   writeMessageLength();
 }
 
-TcrMessageStopCQ::TcrMessageStopCQ(const std::string& regionName,
+TcrMessageStopCQ::TcrMessageStopCQ(std::unique_ptr<DataOutput> dataOutput,
+                                   const std::string& regionName,
                                    int messageResponsetimeout,
                                    ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::STOPCQ_MSG_TYPE;
   m_tcdm = connectionDM;
   m_regionName = regionName;  // this is querystring
@@ -1603,9 +1606,11 @@ TcrMessageStopCQ::TcrMessageStopCQ(const std::string& regionName,
   writeMessageLength();
 }
 
-TcrMessageCloseCQ::TcrMessageCloseCQ(const std::string& regionName,
+TcrMessageCloseCQ::TcrMessageCloseCQ(std::unique_ptr<DataOutput> dataOutput,
+                                     const std::string& regionName,
                                      int messageResponsetimeout,
                                      ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::CLOSECQ_MSG_TYPE;
   m_tcdm = connectionDM;
   m_regionName = regionName;  // this is querystring
@@ -1627,9 +1632,10 @@ TcrMessageCloseCQ::TcrMessageCloseCQ(const std::string& regionName,
 }
 
 TcrMessageQueryWithParameters::TcrMessageQueryWithParameters(
-    const std::string& regionName, const UserDataPtr& aCallbackArgument,
-    CacheableVectorPtr paramList, int messageResponsetimeout,
-    ThinClientBaseDM* connectionDM) {
+    std::unique_ptr<DataOutput> dataOutput, const std::string& regionName,
+    const UserDataPtr& aCallbackArgument, CacheableVectorPtr paramList,
+    int messageResponsetimeout, ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::QUERY_WITH_PARAMETERS;
   m_tcdm = connectionDM;
   m_regionName = regionName;
@@ -1664,9 +1670,10 @@ TcrMessageQueryWithParameters::TcrMessageQueryWithParameters(
 }
 
 TcrMessageContainsKey::TcrMessageContainsKey(
-    const Region* region, const CacheableKeyPtr& key,
-    const UserDataPtr& aCallbackArgument, bool isContainsKey,
-    ThinClientBaseDM* connectionDM) {
+    std::unique_ptr<DataOutput> dataOutput, const Region* region,
+    const CacheableKeyPtr& key, const UserDataPtr& aCallbackArgument,
+    bool isContainsKey, ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::CONTAINS_KEY;
   m_tcdm = connectionDM;
   m_regionName =
@@ -1682,7 +1689,6 @@ TcrMessageContainsKey::TcrMessageContainsKey(
   numOfParts++;
 
   if (key == nullptr) {
-    delete m_request;
     throw IllegalArgumentException(
         "key passed to the constructor can't be nullptr");
   }
@@ -1699,7 +1705,8 @@ TcrMessageContainsKey::TcrMessageContainsKey(
 }
 
 TcrMessageGetDurableCqs::TcrMessageGetDurableCqs(
-    ThinClientBaseDM* connectionDM) {
+    std::unique_ptr<DataOutput> dataOutput, ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::GETDURABLECQS_MSG_TYPE;
   m_tcdm = connectionDM;
   m_timeout = DEFAULT_TIMEOUT_SECONDS;
@@ -1711,10 +1718,12 @@ TcrMessageGetDurableCqs::TcrMessageGetDurableCqs(
   writeMessageLength();
 }
 
-TcrMessageRequest::TcrMessageRequest(const Region* region,
+TcrMessageRequest::TcrMessageRequest(std::unique_ptr<DataOutput> dataOutput,
+                                     const Region* region,
                                      const CacheableKeyPtr& key,
                                      const UserDataPtr& aCallbackArgument,
                                      ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::REQUEST;
   m_tcdm = connectionDM;
   m_key = key;
@@ -1731,7 +1740,6 @@ TcrMessageRequest::TcrMessageRequest(const Region* region,
   numOfParts++;
 
   if (key == nullptr) {
-    delete m_request;
     throw IllegalArgumentException(
         "key passed to the constructor can't be nullptr");
   }
@@ -1748,10 +1756,11 @@ TcrMessageRequest::TcrMessageRequest(const Region* region,
   writeMessageLength();
 }
 
-TcrMessageInvalidate::TcrMessageInvalidate(const Region* region,
-                                           const CacheableKeyPtr& key,
-                                           const UserDataPtr& aCallbackArgument,
-                                           ThinClientBaseDM* connectionDM) {
+TcrMessageInvalidate::TcrMessageInvalidate(
+    std::unique_ptr<DataOutput> dataOutput, const Region* region,
+    const CacheableKeyPtr& key, const UserDataPtr& aCallbackArgument,
+    ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::INVALIDATE;
   m_tcdm = connectionDM;
   m_key = key;
@@ -1768,7 +1777,6 @@ TcrMessageInvalidate::TcrMessageInvalidate(const Region* region,
   numOfParts++;
 
   if (key == nullptr) {
-    delete m_request;
     throw IllegalArgumentException(
         "key passed to the constructor can't be nullptr");
   }
@@ -1785,11 +1793,13 @@ TcrMessageInvalidate::TcrMessageInvalidate(const Region* region,
   writeMessageLength();
 }
 
-TcrMessageDestroy::TcrMessageDestroy(const Region* region,
+TcrMessageDestroy::TcrMessageDestroy(std::unique_ptr<DataOutput> dataOutput,
+                                     const Region* region,
                                      const CacheableKeyPtr& key,
                                      const CacheablePtr& value,
                                      const UserDataPtr& aCallbackArgument,
                                      ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::DESTROY;
   m_tcdm = connectionDM;
   m_key = key;
@@ -1805,7 +1815,6 @@ TcrMessageDestroy::TcrMessageDestroy(const Region* region,
   numOfParts++;
 
   if (key == nullptr) {
-    delete m_request;
     throw IllegalArgumentException(
         "key passed to the constructor can't be nullptr");
   }
@@ -1839,12 +1848,14 @@ TcrMessageDestroy::TcrMessageDestroy(const Region* region,
   }
 }
 
-TcrMessagePut::TcrMessagePut(const Region* region, const CacheableKeyPtr& key,
+TcrMessagePut::TcrMessagePut(std::unique_ptr<DataOutput> dataOutput,
+                             const Region* region, const CacheableKeyPtr& key,
                              const CacheablePtr& value,
                              const UserDataPtr& aCallbackArgument, bool isDelta,
                              ThinClientBaseDM* connectionDM, bool isMetaRegion,
                              bool fullValueAfterDeltaFail,
                              const char* regionName) {
+  m_request = std::move(dataOutput);
   // m_securityHeaderLength = 0;
   m_isMetaRegion = isMetaRegion;
   m_msgType = TcrMessage::PUT;
@@ -1864,7 +1875,6 @@ TcrMessagePut::TcrMessagePut(const Region* region, const CacheableKeyPtr& key,
   numOfParts++;
 
   if (key == nullptr) {
-    delete m_request;
     throw IllegalArgumentException(
         "key passed to the constructor can't be nullptr");
   }
@@ -1893,10 +1903,11 @@ TcrMessageReply::TcrMessageReply(bool decodeAll,
   if (connectionDM != nullptr) isSecurityOn = connectionDM->isSecurityOn();
 }
 
-TcrMessagePing::TcrMessagePing(bool decodeAll) {
+TcrMessagePing::TcrMessagePing(std::unique_ptr<DataOutput> dataOutput,
+                               bool decodeAll) {
   m_msgType = TcrMessage::PING;
   m_decodeAll = decodeAll;
-
+  m_request = std::move(dataOutput);
   m_request->writeInt(m_msgType);
   m_request->writeInt(
       (int32_t)0);  // 17 is fixed message len ...  PING only has a header.
@@ -1911,10 +1922,11 @@ TcrMessagePing::TcrMessagePing(bool decodeAll) {
   m_txId = 0;
 }
 
-TcrMessageCloseConnection::TcrMessageCloseConnection(bool decodeAll) {
+TcrMessageCloseConnection::TcrMessageCloseConnection(
+    std::unique_ptr<DataOutput> dataOutput, bool decodeAll) {
   m_msgType = TcrMessage::CLOSE_CONNECTION;
   m_decodeAll = decodeAll;
-
+  m_request = std::move(dataOutput);
   m_request->writeInt(m_msgType);
   m_request->writeInt((int32_t)6);
   m_request->writeInt((int32_t)1);  // Number of parts.
@@ -1929,15 +1941,19 @@ TcrMessageCloseConnection::TcrMessageCloseConnection(bool decodeAll) {
   m_request->write(static_cast<int8_t>(0));  // keepalive is '0'.
 }
 
-TcrMessageClientMarker::TcrMessageClientMarker(bool decodeAll) {
+TcrMessageClientMarker::TcrMessageClientMarker(
+    std::unique_ptr<DataOutput> dataOutput, bool decodeAll) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::CLIENT_MARKER;
   m_decodeAll = decodeAll;
 }
 
 TcrMessageRegisterInterestList::TcrMessageRegisterInterestList(
-    const Region* region, const VectorOfCacheableKey& keys, bool isDurable,
-    bool isCachingEnabled, bool receiveValues,
-    InterestResultPolicy interestPolicy, ThinClientBaseDM* connectionDM) {
+    std::unique_ptr<DataOutput> dataOutput, const Region* region,
+    const VectorOfCacheableKey& keys, bool isDurable, bool isCachingEnabled,
+    bool receiveValues, InterestResultPolicy interestPolicy,
+    ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::REGISTER_INTEREST_LIST;
   m_tcdm = connectionDM;
   m_keyList = &keys;
@@ -1964,7 +1980,6 @@ TcrMessageRegisterInterestList::TcrMessageRegisterInterestList(
 
   for (uint32_t i = 0; i < numInItrestList; i++) {
     if (keys[i] == nullptr) {
-      delete m_request;
       throw IllegalArgumentException(
           "keys in the interest list cannot be nullptr");
     }
@@ -1987,9 +2002,11 @@ TcrMessageRegisterInterestList::TcrMessageRegisterInterestList(
 }
 
 TcrMessageUnregisterInterestList::TcrMessageUnregisterInterestList(
-    const Region* region, const VectorOfCacheableKey& keys, bool isDurable,
-    bool isCachingEnabled, bool receiveValues,
-    InterestResultPolicy interestPolicy, ThinClientBaseDM* connectionDM) {
+    std::unique_ptr<DataOutput> dataOutput, const Region* region,
+    const VectorOfCacheableKey& keys, bool isDurable, bool isCachingEnabled,
+    bool receiveValues, InterestResultPolicy interestPolicy,
+    ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::UNREGISTER_INTEREST_LIST;
   m_tcdm = connectionDM;
   m_keyList = &keys;
@@ -2014,7 +2031,6 @@ TcrMessageUnregisterInterestList::TcrMessageUnregisterInterestList(
 
   for (uint32_t i = 0; i < numInItrestList; i++) {
     if (keys[i] == nullptr) {
-      delete m_request;
       throw IllegalArgumentException(
           "keys in the interest list cannot be nullptr");
     }
@@ -2026,9 +2042,11 @@ TcrMessageUnregisterInterestList::TcrMessageUnregisterInterestList(
 }
 
 TcrMessageCreateRegion::TcrMessageCreateRegion(
-    const std::string& str1, const std::string& str2,
-    InterestResultPolicy interestPolicy, bool isDurable, bool isCachingEnabled,
-    bool receiveValues, ThinClientBaseDM* connectionDM) {
+    std::unique_ptr<DataOutput> dataOutput, const std::string& str1,
+    const std::string& str2, InterestResultPolicy interestPolicy,
+    bool isDurable, bool isCachingEnabled, bool receiveValues,
+    ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::CREATE_REGION;
   m_tcdm = connectionDM;
   m_isDurable = isDurable;
@@ -2043,9 +2061,11 @@ TcrMessageCreateRegion::TcrMessageCreateRegion(
 }
 
 TcrMessageRegisterInterest::TcrMessageRegisterInterest(
-    const std::string& str1, const std::string& str2,
-    InterestResultPolicy interestPolicy, bool isDurable, bool isCachingEnabled,
-    bool receiveValues, ThinClientBaseDM* connectionDM) {
+    std::unique_ptr<DataOutput> dataOutput, const std::string& str1,
+    const std::string& str2, InterestResultPolicy interestPolicy,
+    bool isDurable, bool isCachingEnabled, bool receiveValues,
+    ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::REGISTER_INTEREST;
   m_tcdm = connectionDM;
   m_isDurable = isDurable;
@@ -2078,9 +2098,11 @@ TcrMessageRegisterInterest::TcrMessageRegisterInterest(
 }
 
 TcrMessageUnregisterInterest::TcrMessageUnregisterInterest(
-    const std::string& str1, const std::string& str2,
-    InterestResultPolicy interestPolicy, bool isDurable, bool isCachingEnabled,
-    bool receiveValues, ThinClientBaseDM* connectionDM) {
+    std::unique_ptr<DataOutput> dataOutput, const std::string& str1,
+    const std::string& str2, InterestResultPolicy interestPolicy,
+    bool isDurable, bool isCachingEnabled, bool receiveValues,
+    ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::UNREGISTER_INTEREST;
   m_tcdm = connectionDM;
   m_isDurable = isDurable;
@@ -2100,8 +2122,9 @@ TcrMessageUnregisterInterest::TcrMessageUnregisterInterest(
   m_interestPolicy = interestPolicy.ordinal;
 }
 
-TcrMessageTxSynchronization::TcrMessageTxSynchronization(int ordinal, int txid,
-                                                         int status) {
+TcrMessageTxSynchronization::TcrMessageTxSynchronization(
+    std::unique_ptr<DataOutput> dataOutput, int ordinal, int txid, int status) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::TX_SYNCHRONIZATION;
 
   writeHeader(m_msgType, ordinal == 1 ? 3 : 2);
@@ -2114,7 +2137,9 @@ TcrMessageTxSynchronization::TcrMessageTxSynchronization(int ordinal, int txid,
   writeMessageLength();
 }
 
-TcrMessageClientReady::TcrMessageClientReady() {
+TcrMessageClientReady::TcrMessageClientReady(
+    std::unique_ptr<DataOutput> dataOutput) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::CLIENT_READY;
 
   writeHeader(m_msgType, 1);
@@ -2123,8 +2148,9 @@ TcrMessageClientReady::TcrMessageClientReady() {
   writeMessageLength();
 }
 
-TcrMessageCommit::TcrMessageCommit() {
+TcrMessageCommit::TcrMessageCommit(std::unique_ptr<DataOutput> dataOutput) {
   m_msgType = TcrMessage::COMMIT;
+  m_request = std::move(dataOutput);
 
   writeHeader(m_msgType, 1);
   // the server expects at least 1 part, so writing a dummy
@@ -2132,8 +2158,9 @@ TcrMessageCommit::TcrMessageCommit() {
   writeMessageLength();
 }
 
-TcrMessageRollback::TcrMessageRollback() {
+TcrMessageRollback::TcrMessageRollback(std::unique_ptr<DataOutput> dataOutput) {
   m_msgType = TcrMessage::ROLLBACK;
+  m_request = std::move(dataOutput);
 
   writeHeader(m_msgType, 1);
   // the server expects at least 1 part, so writing a dummy
@@ -2141,8 +2168,10 @@ TcrMessageRollback::TcrMessageRollback() {
   writeMessageLength();
 }
 
-TcrMessageTxFailover::TcrMessageTxFailover() {
+TcrMessageTxFailover::TcrMessageTxFailover(
+    std::unique_ptr<DataOutput> dataOutput) {
   m_msgType = TcrMessage::TX_FAILOVER;
+  m_request = std::move(dataOutput);
 
   writeHeader(m_msgType, 1);
   // the server expects at least 1 part, so writing a dummy
@@ -2151,8 +2180,10 @@ TcrMessageTxFailover::TcrMessageTxFailover() {
 }
 
 // constructor for MAKE_PRIMARY message.
-TcrMessageMakePrimary::TcrMessageMakePrimary(bool processedMarker) {
+TcrMessageMakePrimary::TcrMessageMakePrimary(
+    std::unique_ptr<DataOutput> dataOutput, bool processedMarker) {
   m_msgType = TcrMessage::MAKE_PRIMARY;
+  m_request = std::move(dataOutput);
 
   writeHeader(m_msgType, 1);
   writeBytePart(processedMarker ? 1 : 0);  // boolean processedMarker
@@ -2161,8 +2192,10 @@ TcrMessageMakePrimary::TcrMessageMakePrimary(bool processedMarker) {
 
 // constructor for PERIODIC_ACK of notified eventids
 TcrMessagePeriodicAck::TcrMessagePeriodicAck(
+    std::unique_ptr<DataOutput> dataOutput,
     const EventIdMapEntryList& entries) {
   m_msgType = TcrMessage::PERIODIC_ACK;
+  m_request = std::move(dataOutput);
 
   uint32_t numParts = static_cast<uint32_t>(entries.size());
   GF_D_ASSERT(numParts > 0);
@@ -2178,7 +2211,8 @@ TcrMessagePeriodicAck::TcrMessagePeriodicAck(
   writeMessageLength();
 }
 
-TcrMessagePutAll::TcrMessagePutAll(const Region* region,
+TcrMessagePutAll::TcrMessagePutAll(std::unique_ptr<DataOutput> dataOutput,
+                                   const Region* region,
                                    const HashMapOfCacheable& map,
                                    int messageResponsetimeout,
                                    ThinClientBaseDM* connectionDM,
@@ -2187,6 +2221,7 @@ TcrMessagePutAll::TcrMessagePutAll(const Region* region,
   m_regionName = region->getFullPath();
   m_region = region;
   m_messageResponseTimeout = messageResponsetimeout;
+  m_request = std::move(dataOutput);
 
   // TODO check the number of parts in this constructor. doubt because in PUT
   // value can be nullptr also.
@@ -2245,7 +2280,8 @@ TcrMessagePutAll::TcrMessagePutAll(const Region* region,
   writeMessageLength();
 }
 
-TcrMessageRemoveAll::TcrMessageRemoveAll(const Region* region,
+TcrMessageRemoveAll::TcrMessageRemoveAll(std::unique_ptr<DataOutput> dataOutput,
+                                         const Region* region,
                                          const VectorOfCacheableKey& keys,
                                          const UserDataPtr& aCallbackArgument,
                                          ThinClientBaseDM* connectionDM) {
@@ -2253,6 +2289,7 @@ TcrMessageRemoveAll::TcrMessageRemoveAll(const Region* region,
   m_tcdm = connectionDM;
   m_regionName = region->getFullPath();
   m_region = region;
+  m_request = std::move(dataOutput);
 
   // TODO check the number of parts in this constructor. doubt because in PUT
   // value can be nullptr also.
@@ -2288,15 +2325,17 @@ TcrMessageRemoveAll::TcrMessageRemoveAll(const Region* region,
 }
 
 TcrMessageUpdateClientNotification::TcrMessageUpdateClientNotification(
-    int32_t port) {
+    std::unique_ptr<DataOutput> dataOutput, int32_t port) {
   m_msgType = TcrMessage::UPDATE_CLIENT_NOTIFICATION;
+  m_request = std::move(dataOutput);
 
   writeHeader(m_msgType, 1);
   writeIntPart(port);
   writeMessageLength();
 }
 
-TcrMessageGetAll::TcrMessageGetAll(const Region* region,
+TcrMessageGetAll::TcrMessageGetAll(std::unique_ptr<DataOutput> dataOutput,
+                                   const Region* region,
                                    const VectorOfCacheableKey* keys,
                                    ThinClientBaseDM* connectionDM,
                                    const UserDataPtr& aCallbackArgument) {
@@ -2306,6 +2345,7 @@ TcrMessageGetAll::TcrMessageGetAll(const Region* region,
   m_callbackArgument = aCallbackArgument;
   m_regionName = region->getFullPath();
   m_region = region;
+  m_request = std::move(dataOutput);
 
   /*CacheableObjectArrayPtr keyArr = nullptr;
   if (keys != nullptr) {
@@ -2348,10 +2388,13 @@ void TcrMessage::InitializeGetallMsg(const UserDataPtr& aCallbackArgument) {
   writeMessageLength();
 }
 
-TcrMessageExecuteCq::TcrMessageExecuteCq(const std::string& str1,
+TcrMessageExecuteCq::TcrMessageExecuteCq(std::unique_ptr<DataOutput> dataOutput,
+                                         const std::string& str1,
                                          const std::string& str2, int state,
                                          bool isDurable,
                                          ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
+
   m_msgType = TcrMessage::EXECUTECQ_MSG_TYPE;
   m_tcdm = connectionDM;
   m_isDurable = isDurable;
@@ -2374,8 +2417,11 @@ TcrMessageExecuteCq::TcrMessageExecuteCq(const std::string& str1,
 }
 
 TcrMessageExecuteCqWithIr::TcrMessageExecuteCqWithIr(
-    const std::string& str1, const std::string& str2, int state, bool isDurable,
+    std::unique_ptr<DataOutput> dataOutput, const std::string& str1,
+    const std::string& str2, int state, bool isDurable,
     ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
+
   m_msgType = TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE;
   m_tcdm = connectionDM;
   m_isDurable = isDurable;
@@ -2398,33 +2444,31 @@ TcrMessageExecuteCqWithIr::TcrMessageExecuteCqWithIr(
 }
 
 TcrMessageExecuteFunction::TcrMessageExecuteFunction(
-    const std::string& funcName, const CacheablePtr& args, uint8_t getResult,
-    ThinClientBaseDM* connectionDM, int32_t timeout) {
+    std::unique_ptr<DataOutput> dataOutput, const std::string& funcName,
+    const CacheablePtr& args, uint8_t getResult, ThinClientBaseDM* connectionDM,
+    int32_t timeout) {
+  m_request = std::move(dataOutput);
+
   m_msgType = TcrMessage::EXECUTE_FUNCTION;
   m_tcdm = connectionDM;
   m_hasResult = getResult;
 
   uint32_t numOfParts = 3;
   writeHeader(m_msgType, numOfParts);
-  // writeBytePart(getResult ? 1 : 0);
-  // if gfcpp property unit set then timeout will be in millisecond
-  // otherwise it will be in second
-  if ((DistributedSystem::getSystemProperties() != nullptr) &&
-      (DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis())) {
-    writeByteAndTimeOutPart(getResult, timeout);
-  } else {
-    writeByteAndTimeOutPart(getResult, (timeout * 1000));
-  }
+  writeByteAndTimeOutPart(getResult, timeout);
   writeRegionPart(funcName);  // function name string
   writeObjectPart(args);
   writeMessageLength();
 }
 
 TcrMessageExecuteRegionFunction::TcrMessageExecuteRegionFunction(
-    const std::string& funcName, const Region* region, const CacheablePtr& args,
+    std::unique_ptr<DataOutput> dataOutput, const std::string& funcName,
+    const Region* region, const CacheablePtr& args,
     CacheableVectorPtr routingObj, uint8_t getResult,
     CacheableHashSetPtr failedNodes, int32_t timeout,
     ThinClientBaseDM* connectionDM, int8_t reExecute) {
+  m_request = std::move(dataOutput);
+
   m_msgType = TcrMessage::EXECUTE_REGION_FUNCTION;
   m_tcdm = connectionDM;
   m_regionName =
@@ -2445,15 +2489,7 @@ TcrMessageExecuteRegionFunction::TcrMessageExecuteRegionFunction(
     numOfParts++;
   }
   writeHeader(m_msgType, numOfParts);
-
-  // if gfcpp property unit set then timeout will be in millisecond
-  // otherwise it will be in second
-  if ((DistributedSystem::getSystemProperties() != nullptr) &&
-      (DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis())) {
-    writeByteAndTimeOutPart(getResult, timeout);
-  } else {
-    writeByteAndTimeOutPart(getResult, (timeout * 1000));
-  }
+  writeByteAndTimeOutPart(getResult, timeout);
   writeRegionPart(m_regionName);
   writeRegionPart(funcName);  // function name string
   writeObjectPart(args);
@@ -2466,7 +2502,7 @@ TcrMessageExecuteRegionFunction::TcrMessageExecuteRegionFunction(
       writeObjectPart(value);
     }
   } else {
-    writeIntPart(0); 
+    writeIntPart(0);
   }
   if (failedNodes) {
     writeIntPart(static_cast<int32_t>(failedNodes->size()));
@@ -2479,10 +2515,13 @@ TcrMessageExecuteRegionFunction::TcrMessageExecuteRegionFunction(
 
 TcrMessageExecuteRegionFunctionSingleHop::
     TcrMessageExecuteRegionFunctionSingleHop(
-        const std::string& funcName, const Region* region,
-        const CacheablePtr& args, CacheableHashSetPtr routingObj,
-        uint8_t getResult, CacheableHashSetPtr failedNodes, bool allBuckets,
-        int32_t timeout, ThinClientBaseDM* connectionDM) {
+        std::unique_ptr<DataOutput> dataOutput, const std::string& funcName,
+        const Region* region, const CacheablePtr& args,
+        CacheableHashSetPtr routingObj, uint8_t getResult,
+        CacheableHashSetPtr failedNodes, bool allBuckets, int32_t timeout,
+        ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
+
   m_msgType = TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP;
   m_tcdm = connectionDM;
   m_regionName =
@@ -2498,15 +2537,7 @@ TcrMessageExecuteRegionFunctionSingleHop::
     numOfParts++;
   }
   writeHeader(m_msgType, numOfParts);
-
-  // if gfcpp property unit set then timeout will be in millisecond
-  // otherwise it will be in second
-  if ((DistributedSystem::getSystemProperties() != nullptr) &&
-      (DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis())) {
-    writeByteAndTimeOutPart(getResult, timeout);
-  } else {
-    writeByteAndTimeOutPart(getResult, (timeout * 1000));
-  }
+  writeByteAndTimeOutPart(getResult, timeout);
   writeRegionPart(m_regionName);
   writeRegionPart(funcName);  // function name string
   writeObjectPart(args);
@@ -2541,7 +2572,9 @@ TcrMessageExecuteRegionFunctionSingleHop::
 }
 
 TcrMessageGetClientPartitionAttributes::TcrMessageGetClientPartitionAttributes(
-    const char* regionName) {
+    std::unique_ptr<DataOutput> dataOutput, const char* regionName) {
+  m_request = std::move(dataOutput);
+
   m_msgType = TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES;
   writeHeader(m_msgType, 1);
   writeRegionPart(regionName);
@@ -2549,14 +2582,19 @@ TcrMessageGetClientPartitionAttributes::TcrMessageGetClientPartitionAttributes(
 }
 
 TcrMessageGetClientPrMetadata::TcrMessageGetClientPrMetadata(
-    const char* regionName) {
+    std::unique_ptr<DataOutput> dataOutput, const char* regionName) {
+  m_request = std::move(dataOutput);
+
   m_msgType = TcrMessage::GET_CLIENT_PR_METADATA;
   writeHeader(m_msgType, 1);
   writeRegionPart(regionName);
   writeMessageLength();
 }
 
-TcrMessageSize::TcrMessageSize(const char* regionName) {
+TcrMessageSize::TcrMessageSize(std::unique_ptr<DataOutput> dataOutput,
+                               const char* regionName) {
+  m_request = std::move(dataOutput);
+
   m_msgType = TcrMessage::SIZE;
   writeHeader(m_msgType, 1);
   writeRegionPart(regionName);
@@ -2564,7 +2602,10 @@ TcrMessageSize::TcrMessageSize(const char* regionName) {
 }
 
 TcrMessageUserCredential::TcrMessageUserCredential(
-    PropertiesPtr creds, ThinClientBaseDM* connectionDM) {
+    std::unique_ptr<DataOutput> dataOutput, PropertiesPtr creds,
+    ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
+
   m_msgType = TcrMessage::USER_CREDENTIAL_MESSAGE;
   m_tcdm = connectionDM;
 
@@ -2584,7 +2625,10 @@ TcrMessageUserCredential::TcrMessageUserCredential(
 }
 
 TcrMessageRemoveUserAuth::TcrMessageRemoveUserAuth(
-    bool keepAlive, ThinClientBaseDM* connectionDM) {
+    std::unique_ptr<DataOutput> dataOutput, bool keepAlive,
+    ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
+
   m_msgType = TcrMessage::REMOVE_USER_AUTH;
   m_tcdm = connectionDM;
   LOGDEBUG("Tcrmessage sending REMOVE_USER_AUTH message to server");
@@ -2605,12 +2649,12 @@ void TcrMessage::createUserCredentialMessage(TcrConnection* conn) {
   m_isSecurityHeaderAdded = false;
   writeHeader(m_msgType, 1);
 
-  DataOutput dOut;
+  auto dOut = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataOutput();
 
-  if (m_creds != nullptr) m_creds->toData(dOut);
+  if (m_creds != nullptr) m_creds->toData(*dOut);
 
   CacheableBytesPtr credBytes =
-      CacheableBytes::create(dOut.getBuffer(), dOut.getBufferLength());
+      CacheableBytes::create(dOut->getBuffer(), dOut->getBufferLength());
   CacheableBytesPtr encryptBytes = conn->encryptBytes(credBytes);
   writeObjectPart(encryptBytes);
 
@@ -2633,13 +2677,13 @@ void TcrMessage::addSecurityPart(int64_t connectionId, int64_t unique_id,
   }
   m_isSecurityHeaderAdded = true;
   LOGDEBUG("addSecurityPart( , ) ");
-  DataOutput dOutput;
+  auto dOutput = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataOutput();
 
-  dOutput.writeInt(connectionId);
-  dOutput.writeInt(unique_id);
+  dOutput->writeInt(connectionId);
+  dOutput->writeInt(unique_id);
 
   CacheableBytesPtr bytes =
-      CacheableBytes::create(dOutput.getBuffer(), dOutput.getBufferLength());
+      CacheableBytes::create(dOutput->getBuffer(), dOutput->getBufferLength());
 
   CacheableBytesPtr encryptBytes = conn->encryptBytes(bytes);
 
@@ -2663,12 +2707,12 @@ void TcrMessage::addSecurityPart(int64_t connectionId, TcrConnection* conn) {
   }
   m_isSecurityHeaderAdded = true;
   LOGDEBUG("TcrMessage::addSecurityPart only connid");
-  DataOutput dOutput;
+  auto dOutput = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataOutput();
 
-  dOutput.writeInt(connectionId);
+  dOutput->writeInt(connectionId);
 
   CacheableBytesPtr bytes =
-      CacheableBytes::create(dOutput.getBuffer(), dOutput.getBufferLength());
+      CacheableBytes::create(dOutput->getBuffer(), dOutput->getBufferLength());
 
   CacheableBytesPtr encryptBytes = conn->encryptBytes(bytes);
 
@@ -2681,7 +2725,9 @@ void TcrMessage::addSecurityPart(int64_t connectionId, TcrConnection* conn) {
                ->asChar());
 }
 
-TcrMessageRequestEventValue::TcrMessageRequestEventValue(EventIdPtr eventId) {
+TcrMessageRequestEventValue::TcrMessageRequestEventValue(
+    std::unique_ptr<DataOutput> dataOutput, EventIdPtr eventId) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::REQUEST_EVENT_VALUE;
 
   uint32_t numOfParts = 1;
@@ -2691,8 +2737,9 @@ TcrMessageRequestEventValue::TcrMessageRequestEventValue(EventIdPtr eventId) {
 }
 
 TcrMessageGetPdxIdForType::TcrMessageGetPdxIdForType(
-    const CacheablePtr& pdxType, ThinClientBaseDM* connectionDM,
-    int32_t pdxTypeId) {
+    std::unique_ptr<DataOutput> dataOutput, const CacheablePtr& pdxType,
+    ThinClientBaseDM* connectionDM, int32_t pdxTypeId) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::GET_PDX_ID_FOR_TYPE;
   m_tcdm = connectionDM;
 
@@ -2706,9 +2753,10 @@ TcrMessageGetPdxIdForType::TcrMessageGetPdxIdForType(
                ->asChar());
 }
 
-TcrMessageAddPdxType::TcrMessageAddPdxType(const CacheablePtr& pdxType,
-                                           ThinClientBaseDM* connectionDM,
-                                           int32_t pdxTypeId) {
+TcrMessageAddPdxType::TcrMessageAddPdxType(
+    std::unique_ptr<DataOutput> dataOutput, const CacheablePtr& pdxType,
+    ThinClientBaseDM* connectionDM, int32_t pdxTypeId) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::ADD_PDX_TYPE;
   m_tcdm = connectionDM;
 
@@ -2724,8 +2772,9 @@ TcrMessageAddPdxType::TcrMessageAddPdxType(const CacheablePtr& pdxType,
 }
 
 TcrMessageGetPdxIdForEnum::TcrMessageGetPdxIdForEnum(
-    const CacheablePtr& pdxType, ThinClientBaseDM* connectionDM,
-    int32_t pdxTypeId) {
+    std::unique_ptr<DataOutput> dataOutput, const CacheablePtr& pdxType,
+    ThinClientBaseDM* connectionDM, int32_t pdxTypeId) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::GET_PDX_ID_FOR_ENUM;
   m_tcdm = connectionDM;
 
@@ -2739,9 +2788,10 @@ TcrMessageGetPdxIdForEnum::TcrMessageGetPdxIdForEnum(
                ->asChar());
 }
 
-TcrMessageAddPdxEnum::TcrMessageAddPdxEnum(const CacheablePtr& pdxType,
-                                           ThinClientBaseDM* connectionDM,
-                                           int32_t pdxTypeId) {
+TcrMessageAddPdxEnum::TcrMessageAddPdxEnum(
+    std::unique_ptr<DataOutput> dataOutput, const CacheablePtr& pdxType,
+    ThinClientBaseDM* connectionDM, int32_t pdxTypeId) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::ADD_PDX_ENUM;
   m_tcdm = connectionDM;
 
@@ -2757,7 +2807,9 @@ TcrMessageAddPdxEnum::TcrMessageAddPdxEnum(const CacheablePtr& pdxType,
 }
 
 TcrMessageGetPdxTypeById::TcrMessageGetPdxTypeById(
-    int32_t typeId, ThinClientBaseDM* connectionDM) {
+    std::unique_ptr<DataOutput> dataOutput, int32_t typeId,
+    ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::GET_PDX_TYPE_BY_ID;
   m_tcdm = connectionDM;
 
@@ -2774,7 +2826,9 @@ TcrMessageGetPdxTypeById::TcrMessageGetPdxTypeById(
 }
 
 TcrMessageGetPdxEnumById::TcrMessageGetPdxEnumById(
-    int32_t typeId, ThinClientBaseDM* connectionDM) {
+    std::unique_ptr<DataOutput> dataOutput, int32_t typeId,
+    ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::GET_PDX_ENUM_BY_ID;
   m_tcdm = connectionDM;
 
@@ -2791,7 +2845,9 @@ TcrMessageGetPdxEnumById::TcrMessageGetPdxEnumById(
 }
 
 TcrMessageGetFunctionAttributes::TcrMessageGetFunctionAttributes(
-    const std::string& funcName, ThinClientBaseDM* connectionDM) {
+    std::unique_ptr<DataOutput> dataOutput, const std::string& funcName,
+    ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::GET_FUNCTION_ATTRIBUTES;
   m_tcdm = connectionDM;
 
@@ -2801,8 +2857,10 @@ TcrMessageGetFunctionAttributes::TcrMessageGetFunctionAttributes(
   writeMessageLength();
 }
 
-TcrMessageKeySet::TcrMessageKeySet(const std::string& funcName,
+TcrMessageKeySet::TcrMessageKeySet(std::unique_ptr<DataOutput> dataOutput,
+                                   const std::string& funcName,
                                    ThinClientBaseDM* connectionDM) {
+  m_request = std::move(dataOutput);
   m_msgType = TcrMessage::KEY_SET;
   m_tcdm = connectionDM;
 
@@ -2812,17 +2870,21 @@ TcrMessageKeySet::TcrMessageKeySet(const std::string& funcName,
   writeMessageLength();
 }
 
-void TcrMessage::setData(const char* bytearray, int32_t len, uint16_t memId) {
+void TcrMessage::setData(const char* bytearray, int32_t len, uint16_t memId,
+                         const SerializationRegistry& serializationRegistry,
+                         MemberListForVersionStamp& memberListForVersionStamp) {
+  if (m_request == nullptr) {
+    m_request = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataOutput();
+  }
   if (bytearray) {
     DeleteArray<const char> delByteArr(bytearray);
-    handleByteArrayResponse(bytearray, len, memId);
+    handleByteArrayResponse(bytearray, len, memId, serializationRegistry,
+                            memberListForVersionStamp);
   }
 }
 
 TcrMessage::~TcrMessage() {
-  GF_SAFE_DELETE(m_request);
   GF_SAFE_DELETE(m_cqs);
-  GF_SAFE_DELETE(m_delta);
   /* adongre
    * CID 29167: Non-array delete for scalars (DELETE_ARRAY)
    * Coverity - II


Mime
View raw message