geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echobr...@apache.org
Subject [geode-native] branch fakeDev2 updated: Revert " YABB test"
Date Wed, 14 Feb 2018 20:47:11 GMT
This is an automated email from the ASF dual-hosted git repository.

echobravo pushed a commit to branch fakeDev2
in repository https://gitbox.apache.org/repos/asf/geode-native.git


The following commit(s) were added to refs/heads/fakeDev2 by this push:
     new 13c11b2  Revert " YABB test"
13c11b2 is described below

commit 13c11b204aebc0e2b6eb37b8443455e83ff0585a
Author: Ernest Burghardt <eburghardt@pivotal.io>
AuthorDate: Wed Feb 14 13:47:00 2018 -0700

    Revert " YABB test"
    
    This reverts commit df1d211db0c19b048d8db47b69491772bf179c33.
---
 cppcache/src/TcrEndpoint.cpp | 1353 ++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 1353 insertions(+)

diff --git a/cppcache/src/TcrEndpoint.cpp b/cppcache/src/TcrEndpoint.cpp
new file mode 100644
index 0000000..c6503fb
--- /dev/null
+++ b/cppcache/src/TcrEndpoint.cpp
@@ -0,0 +1,1353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <thread>
+#include <chrono>
+#include <ace/OS.h>
+
+#include <geode/SystemProperties.hpp>
+#include <geode/AuthInitialize.hpp>
+
+#include "TcrEndpoint.hpp"
+#include "ThinClientRegion.hpp"
+#include "ThinClientPoolHADM.hpp"
+#include "StackTrace.hpp"
+#include "CacheImpl.hpp"
+#include "Utils.hpp"
+#include "DistributedSystemImpl.hpp"
+#include "util/exception.hpp"
+
+namespace apache {
+namespace geode {
+namespace client {
+
+#define throwException(ex)                              \
+  {                                                     \
+    LOGFINEST("%s: %s", ex.getName(), ex.what()); \
+    throw ex;                                           \
+  }
+/*
+This is replaced by the connect-timeout (times 3) system property for SR # 6525.
+#define DEFAULT_CALLBACK_CONNECTION_TIMEOUT_SECONDS 180
+*/
+const char* TcrEndpoint::NC_Notification = "NC Notification";
+
+TcrEndpoint::TcrEndpoint(const std::string& name, CacheImpl* cacheImpl,
+                         ACE_Semaphore& failoverSema,
+                         ACE_Semaphore& cleanupSema,
+                         ACE_Semaphore& redundancySema, ThinClientBaseDM* DM,
+                         bool isMultiUserMode)
+    : m_needToConnectInLock(false),
+      m_connectLockCond(m_connectLock),
+      m_maxConnections(cacheImpl->getDistributedSystem()
+                           .getSystemProperties()
+                           .connectionPoolSize()),
+      m_notifyConnection(0),
+      m_notifyReceiver(0),
+      m_numRegionListener(0),
+      m_isQueueHosted(false),
+      m_uniqueId(0),
+      m_isAuthenticated(false),
+      m_msgSent(false),
+      m_pingSent(false),
+      m_numberOfTimesFailed(0),
+      m_isMultiUserMode(isMultiUserMode),
+      m_name(name),
+      m_connected(false),
+      m_isActiveEndpoint(false),
+      m_numRegions(0),
+      m_pingTimeouts(0),
+      m_notifyCount(0),
+      m_cacheImpl(cacheImpl),
+      m_failoverSema(failoverSema),
+      m_cleanupSema(cleanupSema),
+      m_notificationCleanupSema(0),
+      m_redundancySema(redundancySema),
+      m_dupCount(0),
+      m_serverQueueStatus(NON_REDUNDANT_SERVER),
+      m_isServerQueueStatusSet(false),
+      m_queueSize(0),
+      // m_poolHADM( poolHADM ),
+      m_baseDM(DM),
+      m_noOfConnRefs(0),
+      m_distributedMemId(0) {
+  /*
+  m_name = Utils::convertHostToCanonicalForm(m_name.c_str() );
+  */
+}
+
+TcrEndpoint::~TcrEndpoint() {
+  m_connected = false;
+  m_isActiveEndpoint = false;
+  closeConnections();
+  {
+    // force close the notification channel -- see bug #295
+    ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock);
+    if (m_numRegionListener > 0) {
+      LOGFINE(
+          "Connection to %s still has references "
+          "to subscription channel while closing",
+          m_name.c_str());
+      // fail in dev build to track #295 better in regressions
+      GF_DEV_ASSERT(m_numRegionListener == 0);
+
+      m_numRegionListener = 0;
+      closeNotification();
+    }
+  }
+  while (m_notifyCount > 0) {
+    LOGDEBUG("TcrEndpoint::~TcrEndpoint(): reducing notify count at %d",
+             m_notifyCount);
+    m_notificationCleanupSema.acquire();
+    m_notifyCount--;
+  }
+  LOGFINE("Connection to %s deleted", m_name.c_str());
+}
+
+inline bool TcrEndpoint::needtoTakeConnectLock() {
+#ifdef __linux
+  if (m_cacheImpl->getDistributedSystem()
+          .getSystemProperties()
+          .connectWaitTimeout() > std::chrono::seconds::zero()) {
+    return m_needToConnectInLock;  // once pipe or other socket error will take
+                                   // lock to connect.
+  }
+  return false;  // once pipe or other socket error will take lock to connect.
+#else
+  return false;
+#endif
+}
+
+GfErrType TcrEndpoint::createNewConnectionWL(
+    TcrConnection*& newConn, bool isClientNotification, bool isSecondary,
+    std::chrono::microseconds connectTimeout) {
+  LOGFINE("TcrEndpoint::createNewConnectionWL");
+  auto connectWaitTimeout = m_cacheImpl->getDistributedSystem()
+                                .getSystemProperties()
+                                .connectWaitTimeout();
+  ACE_Time_Value interval(connectWaitTimeout);
+  ACE_Time_Value stopAt(ACE_OS::gettimeofday());
+  stopAt += interval;
+  bool connCreated = false;
+
+  while (ACE_OS::gettimeofday() < stopAt) {
+    int32_t ret = m_connectLock.acquire(&stopAt);
+
+    LOGFINE(
+        "TcrEndpoint::createNewConnectionWL ret = %d interval = %ld error =%s",
+        ret, interval.get_msec(), ACE_OS::strerror(ACE_OS::last_error()));
+
+    if (ret != -1) {  // got lock
+      try {
+        LOGFINE("TcrEndpoint::createNewConnectionWL got lock");
+        newConn =
+            new TcrConnection(m_cacheImpl->tcrConnectionManager(), m_connected);
+        newConn->InitTcrConnection(this, m_name.c_str(), m_ports,
+                                   isClientNotification, isSecondary,
+                                   connectTimeout);
+
+        connCreated = true;  // to break while loop
+
+        m_needToConnectInLock = false;  // no need to take lock
+
+        m_connectLock.release();
+        LOGFINE("New Connection Created");
+        break;
+      } catch (const TimeoutException&) {
+        LOGINFO("Timeout1 in handshake with endpoint[%s]", m_name.c_str());
+        m_connectLock.release();
+        // throw te;
+        return GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA;
+      } catch (std::exception& ex) {
+        m_connectLock.release();
+        LOGWARN("Failed1 in handshake with endpoint[%s]: %s", m_name.c_str(),
+                ex.what());
+        // throw ex;
+        return GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA;
+      } catch (...) {
+        LOGWARN("Unknown1 failure in handshake with endpoint[%s]",
+                m_name.c_str());
+        m_connectLock.release();
+        // throw;
+        return GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA;
+      }
+    }
+  }
+
+  if (!connCreated) {
+    LOGFINE("TcrEndpoint::createNewConnectionWL timeout");
+    // throwException(TimeoutException("Thread is hanged in connect call"));
+    return GF_CLIENT_WAIT_TIMEOUT;
+  }
+
+  return GF_NOERR;
+}
+
+GfErrType TcrEndpoint::createNewConnection(
+    TcrConnection*& newConn, bool isClientNotification, bool isSecondary,
+    std::chrono::microseconds connectTimeout, int32_t timeoutRetries,
+    bool sendUpdateNotification, bool appThreadRequest) {
+  LOGFINE(
+      "TcrEndpoint::createNewConnection: connectTimeout =%d "
+      "m_needToConnectInLock=%d appThreadRequest =%d",
+      connectTimeout.count(), m_needToConnectInLock, appThreadRequest);
+  GfErrType err = GF_NOERR;
+  newConn = nullptr;
+  while (timeoutRetries-- >= 0) {
+    try {
+      if (newConn == nullptr) {
+        if (!needtoTakeConnectLock() || !appThreadRequest) {
+          newConn = new TcrConnection(m_cacheImpl->tcrConnectionManager(),
+                                      m_connected);
+          bool authenticate = newConn->InitTcrConnection(
+              this, m_name.c_str(), m_ports, isClientNotification, isSecondary,
+              connectTimeout);
+          if (authenticate) {
+            authenticateEndpoint(newConn);
+          }
+        } else {
+          err = createNewConnectionWL(newConn, isClientNotification,
+                                      isSecondary, connectTimeout);
+          if (err == GF_CLIENT_WAIT_TIMEOUT ||
+              err == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA) {
+            break;
+          }
+        }
+        // m_connected = true;
+      }
+      if (!isClientNotification && sendUpdateNotification) {
+        bool notificationStarted;
+        {
+          ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock);
+          notificationStarted = (m_numRegionListener > 0) || m_isQueueHosted;
+        }
+        if (notificationStarted) {
+          LOGFINE("Sending update notification message to endpoint %s",
+                  m_name.c_str());
+          TcrMessageUpdateClientNotification updateNotificationMsg(
+              newConn->getConnectionManager()
+                  .getCacheImpl()
+                  ->getCache()
+                  ->createDataOutput(),
+              static_cast<int32_t>(newConn->getPort()));
+          newConn->send(updateNotificationMsg.getMsgData(),
+                        updateNotificationMsg.getMsgLength());
+        }
+      }
+      err = GF_NOERR;
+      break;
+    } catch (const TimeoutException&) {
+      LOGINFO("Timeout in handshake with endpoint[%s]", m_name.c_str());
+      err = GF_TIMOUT;
+      m_needToConnectInLock = true;  // while creating the connection
+      std::this_thread::sleep_for(std::chrono::milliseconds(50));
+    } catch (const GeodeIOException& ex) {
+      LOGINFO("IO error[%d] in handshake with endpoint[%s]: %s",
+              ACE_OS::last_error(), m_name.c_str(), ex.what());
+      err = GF_IOERR;
+      m_needToConnectInLock = true;  // while creating the connection
+      break;
+    } catch (const AuthenticationFailedException& ex) {
+      LOGWARN("Authentication failed in handshake with endpoint[%s]: %s",
+              m_name.c_str(), ex.what());
+      err = GF_AUTHENTICATION_FAILED_EXCEPTION;
+      break;
+    } catch (const AuthenticationRequiredException& ex) {
+      LOGWARN("Authentication required in handshake with endpoint[%s]: %s",
+              m_name.c_str(), ex.what());
+      err = GF_AUTHENTICATION_REQUIRED_EXCEPTION;
+      break;
+    } catch (const CacheServerException& ex) {
+      LOGWARN("Exception in handshake on server[%s]: %s", m_name.c_str(),
+              ex.what());
+      err = GF_CACHESERVER_EXCEPTION;
+      break;
+    } catch (const Exception& ex) {
+      LOGWARN("Failed in handshake with endpoint[%s]: %s", m_name.c_str(),
+              ex.what());
+      err = GF_MSG;
+      break;
+    } catch (std::exception& ex) {
+      LOGWARN("Failed in handshake with endpoint[%s]: %s", m_name.c_str(),
+              ex.what());
+      err = GF_MSG;
+      break;
+    } catch (...) {
+      LOGWARN("Unknown failure in handshake with endpoint[%s]", m_name.c_str());
+      err = GF_MSG;
+      break;
+    }
+  }
+  if (err != GF_NOERR && newConn != nullptr) {
+    _GEODE_SAFE_DELETE(newConn);
+  }
+  return err;
+}
+
+void TcrEndpoint::authenticateEndpoint(TcrConnection*& conn) {
+  LOGDEBUG(
+      "TcrEndpoint::authenticateEndpoint m_isAuthenticated  = %d "
+      "this->m_baseDM = %d",
+      m_isAuthenticated, m_baseDM);
+  if (!m_isAuthenticated && m_baseDM) {
+    this->setConnected();
+    ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_endpointAuthenticationLock);
+    GfErrType err = GF_NOERR;
+    auto creds = getCredentials();
+
+    if (creds != nullptr) {
+      LOGDEBUG("TcrEndpoint::authenticateEndpoint got creds from app = %d",
+               creds->getSize());
+    } else {
+      LOGDEBUG("TcrEndpoint::authenticateEndpoint no creds from app ");
+    }
+
+    TcrMessageUserCredential request(
+        m_cacheImpl->getCache()->createDataOutput(), creds, m_baseDM);
+
+    LOGDEBUG("request is created");
+    TcrMessageReply reply(true, this->m_baseDM);
+    // err = this->sendRequestToEP(request, reply, ( *it ).int_id_);
+    err = this->sendRequestConnWithRetry(request, reply, conn);
+    LOGDEBUG("authenticateEndpoint error = %d", err);
+    if (err == GF_NOERR) {
+      // put the object into local region
+      switch (reply.getMessageType()) {
+        case TcrMessage::RESPONSE: {
+          // nothing to be done;
+          break;
+        }
+        case TcrMessage::EXCEPTION: {
+          err = ThinClientRegion::handleServerException("AuthException",
+                                                        reply.getException());
+          break;
+        }
+        default: {
+          LOGERROR("Unknown message type %d while sending credentials",
+                   reply.getMessageType());
+          err = GF_MSG;
+          break;
+        }
+      }
+    }
+    // throw exception if it is not authenticated
+    GfErrTypeToException("TcrEndpoint::authenticateEndpoint", err);
+
+    m_isAuthenticated = true;
+  }
+}
+std::shared_ptr<Properties> TcrEndpoint::getCredentials() {
+  const auto& distributedSystem = m_cacheImpl->getDistributedSystem();
+  const auto& tmpSecurityProperties =
+      distributedSystem.getSystemProperties().getSecurityProperties();
+
+  if (const auto& authInitialize = m_cacheImpl->getAuthInitialize()) {
+    LOGFINER(
+        "Acquired handle to AuthInitialize plugin, "
+        "getting credentials for %s",
+        m_name.c_str());
+    const auto& tmpAuthIniSecurityProperties =
+        authInitialize->getCredentials(tmpSecurityProperties, m_name.c_str());
+    LOGFINER("Done getting credentials");
+    return tmpAuthIniSecurityProperties;
+  }
+  return nullptr;
+}
+
+ServerQueueStatus TcrEndpoint::getFreshServerQueueStatus(
+    int32_t& queueSize, bool addToQueue, TcrConnection*& statusConn) {
+  GfErrType err = GF_NOERR;
+  TcrConnection* newConn;
+  ServerQueueStatus status = NON_REDUNDANT_SERVER;
+
+  err = createNewConnection(newConn, false, false,
+                            m_cacheImpl->getDistributedSystem()
+                                .getSystemProperties()
+                                .connectTimeout());
+  if (err == GF_NOERR) {
+    status = newConn->getServerQueueStatus(queueSize);
+
+    if (status == REDUNDANT_SERVER || status == PRIMARY_SERVER) {
+      if (addToQueue) {
+        m_opConnections.put(newConn, true);
+      } else {
+        statusConn = newConn;
+      }
+      m_connected = true;
+      return status;
+    } else {
+      //  remove port from ports list (which is sent to server in notification
+      //  handshake).
+      closeConnection(newConn);
+      return status;
+    }
+  }
+
+  return status;
+}
+
+GfErrType TcrEndpoint::registerDM(bool clientNotification, bool isSecondary,
+                                  bool isActiveEndpoint,
+                                  ThinClientBaseDM* distMgr) {
+  // Pre-conditions:
+  // 1. If this is a secondary server then clientNotification must be true
+  GF_DEV_ASSERT(!isSecondary || clientNotification);
+
+  bool connected = false;
+  GfErrType err = GF_NOERR;
+
+  ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_connectionLock);
+  // Three cases here:
+  // 1. m_connected is false, m_isActiveEndpoint is false and then
+  //    if isActiveEndpoint is true, then create 'max' connections
+  // 2. m_connected is false, m_isActiveEndpoint is false and then
+  //    if isActiveEndpoint is false, then create just one connection
+  //    to ping the server
+  // 3. m_connected is true, m_isActiveEndpoint is false (i.e. server was
+  //    previously not an active endpoint) then if isSecondary is false then
+  //    create 'max-1' connections else do nothing
+  m_opConnections.reset();
+  if (m_maxConnections <= 0) {
+    connected = true;
+  } else if (!m_isActiveEndpoint) {
+    int maxConnections = 0;
+    if (isActiveEndpoint) {
+      if (m_connected) {
+        maxConnections = m_maxConnections - 1;
+      } else {
+        maxConnections = m_maxConnections;
+      }
+    } else if (!m_connected) {
+      maxConnections = 1;
+    }
+    if (maxConnections > 0) {
+      LOGINFO("Starting Handshake with %s%s",
+              (isSecondary ? "secondary server "
+                           : (isActiveEndpoint ? "" : "primary server ")),
+              m_name.c_str());
+      for (int connNum = 0; connNum < maxConnections; ++connNum) {
+        TcrConnection* newConn;
+        if ((err = createNewConnection(newConn, false, false,
+                                       m_cacheImpl->getDistributedSystem()
+                                           .getSystemProperties()
+                                           .connectTimeout(),
+                                       0, m_connected)) != GF_NOERR) {
+          m_connected = false;
+          m_isActiveEndpoint = false;
+          closeConnections();
+          return err;
+        }
+        m_opConnections.put(newConn, true);
+      }
+      LOGINFO("Handshake with %s%s success",
+              (isSecondary ? "secondary server "
+                           : (isActiveEndpoint ? "" : "primary server ")),
+              m_name.c_str());
+      m_connected = true;
+      m_isActiveEndpoint = isActiveEndpoint;
+    }
+  }
+
+  if (m_connected || connected) {
+    if (clientNotification) {
+      if (distMgr != nullptr) {
+        ACE_Guard<ACE_Recursive_Thread_Mutex> guardDistMgrs(m_distMgrsLock);
+        m_distMgrs.push_back(distMgr);
+      }
+      LOGFINEST(
+          "Registering subscription "
+          "channel for endpoint %s",
+          m_name.c_str());
+      // setup notification channel for the first region
+      ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock);
+      if (m_numRegionListener == 0) {
+        if ((err = createNewConnection(m_notifyConnection, true, isSecondary,
+                                       m_cacheImpl->getDistributedSystem()
+                                               .getSystemProperties()
+                                               .connectTimeout() *
+                                           3,
+                                       0)) != GF_NOERR) {
+          m_connected = false;
+          m_isActiveEndpoint = false;
+          closeConnections();
+          LOGWARN("Failed to start subscription channel for endpoint %s",
+                  m_name.c_str());
+          return err;
+        }
+        m_notifyReceiver = new Task<TcrEndpoint>(
+            this, &TcrEndpoint::receiveNotification, NC_Notification);
+        m_notifyReceiver->start();
+      }
+      ++m_numRegionListener;
+      LOGFINEST("Incremented notification region count for endpoint %s to %d",
+                m_name.c_str(), m_numRegionListener);
+      m_connected = true;
+    }
+  }
+
+  // Post-conditions:
+  // 1. The endpoint should be marked as active, only if m_connected is true
+  // 2. If this is not an active endpoint and it is connected then only one
+  //    connection + notify channel
+  GF_DEV_ASSERT(!m_isActiveEndpoint || m_connected);
+#if GF_DEVEL_ASSERTS == 1
+  int numConnections = m_opConnections.size();
+  if (!m_isActiveEndpoint && !isActiveEndpoint && m_connected &&
+      (numConnections != 1 || m_numRegionListener <= 0 ||
+       m_notifyReceiver == nullptr)) {
+    LOGWARN(
+        "Inactive connected endpoint does not have exactly one "
+        "connection. Number of connections: %d, number of region listeners: "
+        "%d",
+        numConnections, m_numRegionListener);
+  }
+#endif
+
+  return err;
+}
+
+void TcrEndpoint::unregisterDM(bool clientNotification,
+                               ThinClientBaseDM* distMgr,
+                               bool checkQueueHosted) {
+  if (clientNotification) {
+    LOGFINEST(
+        "Closing subscription "
+        "channel for endpoint %s",
+        m_name.c_str());
+    // close notification channel if there is no region
+    ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock);
+    if (m_numRegionListener > 0 && --m_numRegionListener == 0) {
+      closeNotification();
+    }
+    LOGFINEST("Decremented subscription region count for endpoint %s to %d",
+              m_name.c_str(), m_numRegionListener);
+    if (distMgr != nullptr) {
+      ACE_Guard<ACE_Recursive_Thread_Mutex> guardDistMgrs(m_distMgrsLock);
+      m_distMgrs.remove(distMgr);
+    }
+    LOGFINEST("Done unsubscribe for endpoint %s", m_name.c_str());
+  }
+}
+
+void TcrEndpoint::pingServer(ThinClientPoolDM* poolDM) {
+  LOGDEBUG("Sending ping message to endpoint %s", m_name.c_str());
+  if (!m_connected || m_noOfConnRefs == 0) {
+    LOGFINER("Skipping ping task for disconnected endpoint %s", m_name.c_str());
+    return;
+  }
+
+  if (!m_msgSent && !m_pingSent) {
+    TcrMessagePing* pingMsg =
+        TcrMessage::getPingMessage(m_cacheImpl->getCache());
+    TcrMessageReply reply(true, nullptr);
+    LOGFINEST("Sending ping message to endpoint %s", m_name.c_str());
+    GfErrType error;
+    if (poolDM != nullptr) {
+      error = poolDM->sendRequestToEP(*pingMsg, reply, this);
+    } else {
+      error = send(*pingMsg, reply);
+    }
+    LOGFINEST("Sent ping message to endpoint %s with error code %d%s",
+              m_name.c_str(), error, error == GF_NOERR ? " (no error)" : "");
+    if (error == GF_NOERR) {
+      m_pingSent = true;
+    }
+    if (error == GF_TIMOUT && m_pingTimeouts < 2) {
+      ++m_pingTimeouts;
+    } else {
+      m_pingTimeouts = 0;
+      //  Only call setConnectionStatus if the status has changed (non thread
+      //  safe check)
+      // This is to avoid blocking the ping thread if notification channel takes
+      // a long time to
+      // complete causing the server to drop the client in the midst of
+      // connection establishment.
+      bool connected = (error == GF_NOERR)
+                           ? (reply.getMessageType() == TcrMessage::REPLY)
+                           : false;
+      if (m_connected != connected) {
+        setConnectionStatus(connected);
+      }
+    }
+    LOGFINEST("Completed sending ping message to endpoint %s", m_name.c_str());
+  } else {
+    m_msgSent = false;
+    m_pingSent = false;
+  }
+}
+
+bool TcrEndpoint::checkDupAndAdd(std::shared_ptr<EventId> eventid) {
+  return m_cacheImpl->tcrConnectionManager().checkDupAndAdd(eventid);
+}
+
+int TcrEndpoint::receiveNotification(volatile bool& isRunning) {
+  LOGFINE("Started subscription channel for endpoint %s", m_name.c_str());
+  while (isRunning) {
+    TcrMessageReply* msg = nullptr;
+    try {
+      size_t dataLen;
+      ConnErrType opErr = CONN_NOERR;
+      auto data = m_notifyConnection->receive(&dataLen, &opErr,
+                                              std::chrono::seconds(5));
+
+      if (opErr == CONN_IOERR) {
+        // Endpoint is disconnected, this exception is expected
+        LOGFINER(
+            "IO exception while receiving subscription event for endpoint %d",
+            opErr);
+        if (isRunning) {
+          setConnectionStatus(false);
+          // close notification channel
+          ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock);
+          if (m_numRegionListener > 0) {
+            m_numRegionListener = 0;
+            closeNotification();
+          }
+        }
+        break;
+      }
+
+      if (data) {
+        msg = new TcrMessageReply(true, m_baseDM);
+        msg->initCqMap();
+        msg->setData(data, static_cast<int32_t>(dataLen),
+                     this->getDistributedMemberID(),
+                     *(m_cacheImpl->getSerializationRegistry()),
+                     *(m_cacheImpl->getMemberListForVersionStamp()));
+        handleNotificationStats(static_cast<int64_t>(dataLen));
+        LOGDEBUG("receive notification %d", msg->getMessageType());
+
+        if (!isRunning) {
+          _GEODE_SAFE_DELETE(msg);
+          break;
+        }
+
+        if (msg->getMessageType() == TcrMessage::SERVER_TO_CLIENT_PING) {
+          LOGFINE("Received ping from server subscription channel.");
+        }
+
+        // ignore some message types like REGISTER_INSTANTIATORS
+        if (msg->shouldIgnore()) {
+          _GEODE_SAFE_DELETE(msg);
+          continue;
+        }
+
+        bool isMarker = (msg->getMessageType() == TcrMessage::CLIENT_MARKER);
+        if (!msg->hasCqPart()) {
+          if (msg->getMessageType() != TcrMessage::CLIENT_MARKER) {
+            const std::string& regionFullPath1 = msg->getRegionName();
+            std::shared_ptr<Region> region1;
+            m_cacheImpl->getRegion(regionFullPath1.c_str(), region1);
+            if (region1 != nullptr &&
+                !static_cast<ThinClientRegion*>(region1.get())
+                     ->getDistMgr()
+                     ->isEndpointAttached(this)) {
+              // drop event before even processing the eventid for duplicate
+              // checking
+              LOGFINER("Endpoint %s dropping event for region %s",
+                       m_name.c_str(), regionFullPath1.c_str());
+              _GEODE_SAFE_DELETE(msg);
+              continue;
+            }
+          }
+        }
+
+        if (!checkDupAndAdd(msg->getEventId())) {
+          m_dupCount++;
+          if (m_dupCount % 100 == 1) {
+            LOGFINE("Dropped %dst duplicate notification message", m_dupCount);
+          }
+          _GEODE_SAFE_DELETE(msg);
+          continue;
+        }
+
+        if (isMarker) {
+          LOGFINE("Got a marker message on endpont %s", m_name.c_str());
+          m_cacheImpl->processMarker();
+          processMarker();
+          _GEODE_SAFE_DELETE(msg);
+        } else {
+          if (!msg->hasCqPart())  // || msg->isInterestListPassed())
+          {
+            const std::string& regionFullPath = msg->getRegionName();
+            std::shared_ptr<Region> region;
+            m_cacheImpl->getRegion(regionFullPath.c_str(), region);
+            if (region != nullptr) {
+              static_cast<ThinClientRegion*>(region.get())
+                  ->receiveNotification(msg);
+            } else {
+              LOGWARN(
+                  "Notification for region %s that does not exist in "
+                  "client cacheImpl.",
+                  regionFullPath.c_str());
+            }
+          } else {
+            LOGDEBUG("receive cq notification %d", msg->getMessageType());
+           auto queryService = getQueryService();
+           if (queryService != nullptr) {
+             static_cast<RemoteQueryService*>(queryService.get())
+                 ->receiveNotification(msg);
+           }
+          }
+        }
+      }
+    } catch (const TimeoutException&) {
+      // If there is no notification, this exception is expected
+      // But this is valid only when *no* data has been received
+      // otherwise if data has been read then TcrConnection will throw
+      // a GeodeIOException which will cause the channel to close.
+      LOGDEBUG(
+          "receiveNotification timed out: no data received from "
+          "endpoint %s",
+          m_name.c_str());
+    } catch (const GeodeIOException& e) {
+      // Endpoint is disconnected, this exception is expected
+      LOGFINER(
+          "IO exception while receiving subscription event for endpoint %s: %s",
+          m_name.c_str(), e.what());
+      if (m_connected) {
+        setConnectionStatus(false);
+        // close notification channel
+        ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock);
+        if (m_numRegionListener > 0) {
+          m_numRegionListener = 0;
+          closeNotification();
+        }
+      }
+      break;
+    } catch (const Exception& ex) {
+      _GEODE_SAFE_DELETE(msg);
+      LOGERROR(
+          "Exception while receiving subscription event for endpoint %s:: %s: "
+          "%s",
+          m_name.c_str(), ex.getName().c_str(), ex.what());
+    } catch (...) {
+      _GEODE_SAFE_DELETE(msg);
+      LOGERROR(
+          "Unexpected exception while "
+          "receiving subscription event from endpoint %s",
+          m_name.c_str());
+    }
+  }
+  LOGFINE("Ended subscription channel for endpoint %s", m_name.c_str());
+  return 0;
+}
+
+inline bool TcrEndpoint::compareTransactionIds(int32_t reqTransId,
+                                               int32_t replyTransId,
+                                               std::string& failReason,
+                                               TcrConnection* conn) {
+  LOGDEBUG("TcrEndpoint::compareTransactionIds requested id = %d ,replied = %d",
+           reqTransId, replyTransId);
+  if (replyTransId != reqTransId) {
+    LOGERROR(
+        "Transaction ids do not match on endpoint %s for "
+        "send operation: %d, %d. Possible serialization mismatch",
+        m_name.c_str(), reqTransId, replyTransId);
+    closeConnection(conn);
+    failReason = "mismatch of transaction IDs in operation";
+    return false;
+  }
+  return true;
+}
+
+inline bool TcrEndpoint::handleIOException(const std::string& message,
+                                           TcrConnection*& conn,
+                                           bool isBgThread) {
+  int32_t lastError = ACE_OS::last_error();
+  if (lastError == ECONNRESET || lastError == EPIPE) {
+    _GEODE_SAFE_DELETE(conn);
+  } else {
+    closeConnection(conn);
+  }
+  LOGFINE(
+      "IO error during send for endpoint %s "
+      "[errno: %d: %s]: %s",
+      m_name.c_str(), lastError, ACE_OS::strerror(lastError), message.c_str());
+  // EAGAIN =11, EWOULDBLOCK = 10035L, EPIPE = 32, ECONNRESET =10054L(An
+  // existing connection was forcibly closed by the remote host.)
+  if (!(lastError == EAGAIN || lastError == EWOULDBLOCK /*||
+        lastError == ECONNRESET */ /*|| lastError == EPIPE*/)) {
+    // break from enclosing loop without retries
+    // something wrong try connect in lock
+    m_needToConnectInLock = true;
+    return false;
+  }
+  std::this_thread::sleep_for(std::chrono::milliseconds(10));
+  return true;
+}
+
+GfErrType TcrEndpoint::sendRequestConn(const TcrMessage& request,
+                                       TcrMessageReply& reply,
+                                       TcrConnection* conn,
+                                       std::string& failReason) {
+  int32_t type = request.getMessageType();
+  GfErrType error = GF_NOERR;
+
+  LOGFINER("Sending request type %d to endpoint [%s] via connection [%p]", type,
+           m_name.c_str(), conn);
+  // TcrMessage * req = const_cast<TcrMessage *>(&request);
+  LOGDEBUG("TcrEndpoint::sendRequestConn  = %d", m_baseDM);
+  if (m_baseDM != nullptr) m_baseDM->beforeSendingRequest(request, conn);
+  if (((type == TcrMessage::EXECUTE_FUNCTION ||
+        type == TcrMessage::EXECUTE_REGION_FUNCTION) &&
+       (request.hasResult() & 2))) {
+    sendRequestForChunkedResponse(request, reply, conn);
+  } else if (type == TcrMessage::REGISTER_INTEREST_LIST ||
+             type == TcrMessage::REGISTER_INTEREST ||
+             type == TcrMessage::QUERY ||
+             type == TcrMessage::QUERY_WITH_PARAMETERS ||
+             type == TcrMessage::GET_ALL_70 ||
+             type == TcrMessage::GET_ALL_WITH_CALLBACK ||
+             type == TcrMessage::PUTALL ||
+             type == TcrMessage::PUT_ALL_WITH_CALLBACK ||
+             type == TcrMessage::REMOVE_ALL ||
+             ((type == TcrMessage::EXECUTE_FUNCTION ||
+               type == TcrMessage::EXECUTE_REGION_FUNCTION) &&
+              (request.hasResult() & 2)) ||
+             type ==
+                 TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||  // This is
+                                                                    // kept
+                                                                    // aside as
+                                                                    // server
+                                                                    // always
+                                                                    // sends
+                                                                    // chunked
+                                                                    // response.
+             type == TcrMessage::EXECUTECQ_MSG_TYPE ||
+             type == TcrMessage::STOPCQ_MSG_TYPE ||
+             type == TcrMessage::CLOSECQ_MSG_TYPE ||
+             type == TcrMessage::KEY_SET ||
+             type == TcrMessage::CLOSECLIENTCQS_MSG_TYPE ||
+             type == TcrMessage::GETCQSTATS_MSG_TYPE ||
+             type == TcrMessage::MONITORCQ_MSG_TYPE ||
+             type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE ||
+             type == TcrMessage::GETDURABLECQS_MSG_TYPE) {
+    sendRequestForChunkedResponse(request, reply, conn);
+    LOGDEBUG("sendRequestConn: calling sendRequestForChunkedResponse DONE");
+  } else {
+    // Chk request type to request if so request.getCallBackArg flag & setCall
+    // back arg flag to true, and in response chk for this flag.
+    if (request.getMessageType() == TcrMessage::REQUEST) {
+      if (request.isCallBackArguement()) {
+        reply.setCallBackArguement(true);
+      }
+    }
+    size_t dataLen;
+    LOGDEBUG("sendRequestConn: calling sendRequest");
+    auto data = conn->sendRequest(request.getMsgData(), request.getMsgLength(),
+                                  &dataLen, request.getTimeout(),
+                                  reply.getTimeout(), request.getMessageType());
+    reply.setMessageTypeRequest(type);
+    reply.setData(
+        data, static_cast<int32_t>(dataLen), this->getDistributedMemberID(),
+        *(m_cacheImpl->getSerializationRegistry()),
+        *(m_cacheImpl
+              ->getMemberListForVersionStamp()));  // memory is released by
+                                                   // TcrMessage setData().
+  }
+
+  // reset idle timeout of the connection for pool connection manager
+  if (type != TcrMessage::PING) {
+    conn->touch();
+  }
+
+  if (reply.getMessageType() == TcrMessage::INVALID) {
+    if (type == TcrMessage::EXECUTE_FUNCTION ||
+        type == TcrMessage::EXECUTE_REGION_FUNCTION ||
+        type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP) {
+      ChunkedFunctionExecutionResponse* resultCollector =
+          dynamic_cast<ChunkedFunctionExecutionResponse*>(
+              reply.getChunkedResultHandler());
+      if (resultCollector->getResult() == false) {
+        LOGDEBUG("TcrEndpoint::send: function execution, no response desired");
+        //            m_opConnections.put( conn, false );
+        //  return GF_NOERR;
+        error = GF_NOERR;
+      }
+    } else {
+      // Treat INVALID messages like IO exceptions
+      error = GF_IOERR;
+    }
+  }
+  // do we need to consider case where compareTransactionIds return true?
+  // I think we will not have issue here
+  else if (!compareTransactionIds(request.getTransId(), reply.getTransId(),
+                                  failReason, conn)) {
+    error = GF_NOTCON;
+  }
+  if (error == GF_NOERR) {
+    if (m_baseDM != nullptr)
+      m_baseDM->afterSendingRequest(request, reply, conn);
+  }
+
+  return error;
+}
+
+bool TcrEndpoint::isMultiUserMode() {
+  LOGDEBUG("TcrEndpoint::isMultiUserMode %d", m_isMultiUserMode);
+  return m_isMultiUserMode;
+}
+
+GfErrType TcrEndpoint::sendRequestWithRetry(
+    const TcrMessage& request, TcrMessageReply& reply, TcrConnection*& conn,
+    bool& epFailure, std::string& failReason, int maxSendRetries,
+    bool useEPPool, std::chrono::microseconds requestedTimeout,
+    bool isBgThread) {
+  GfErrType error = GF_NOTCON;
+  bool createNewConn = false;
+  // int32_t type = request.getMessageType();
+  int sendRetryCount = 0;
+
+  //  Retry on the following send errors:
+  // Timeout: 1 retry
+  // EAGAIN, ECONNRESET, EWOULDBLOCK: 1 retry
+  // Connection pool is empty (too many threads or no connections available): 1
+  // retry
+
+  do {
+    if (sendRetryCount > 0) {
+      // this is a retry. set the retry bit in the early Ack
+      (const_cast<TcrMessage&>(request)).updateHeaderForRetry();
+    }
+
+    auto timeout = requestedTimeout;
+    epFailure = false;
+    if (useEPPool) {
+      if (m_maxConnections == 0) {
+        ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_connectionLock);
+        if (m_maxConnections == 0) {
+          LOGFINE(
+              "Creating a new connection when connection-pool-size system "
+              "property set to 0");
+          if ((error = createNewConnection(conn, false, false,
+                                           m_cacheImpl->getDistributedSystem()
+                                               .getSystemProperties()
+                                               .connectTimeout())) !=
+              GF_NOERR) {
+            epFailure = true;
+            continue;
+          }
+          m_maxConnections = 1;
+        }
+      }
+    }
+    LOGDEBUG("TcrEndpoint::send() getting a connection for endpoint %s",
+             m_name.c_str());
+    if (createNewConn) {
+      createNewConn = false;
+      if (!m_connected) {
+        return GF_NOTCON;
+      } else if ((error =
+                      createNewConnection(conn, false, false,
+                                          m_cacheImpl->getDistributedSystem()
+                                              .getSystemProperties()
+                                              .connectTimeout(),
+                                          0, true)) != GF_NOERR) {
+        epFailure = true;
+        continue;
+      }
+    } else if (conn == nullptr && useEPPool) {
+      LOGFINER(
+          "sendRequestWithRetry:: looking for connection in queue timeout = "
+          "%d ",
+          timeout.count());
+      // max wait time to get a connection
+      conn = m_opConnections.getUntil(timeout);
+    }
+    if (!m_connected) {
+      return GF_NOTCON;
+    }
+    if (conn != nullptr) {
+      LOGDEBUG("TcrEndpoint::send() obtained a connection for endpoint %s",
+               m_name.c_str());
+      int reqTransId = request.getTransId();
+
+      try {
+        LOGDEBUG("Calling sendRequestConn");
+        error = sendRequestConn(request, reply, conn, failReason);
+        if (error == GF_IOERR) {
+          epFailure = true;
+          failReason = "received INVALID reply from server";
+          if (!handleIOException(failReason, conn, isBgThread)) {
+            break;
+          }
+          createNewConn = true;
+        } else if (error == GF_NOTCON) {
+          epFailure = true;
+          createNewConn = true;
+        } else {
+          if (useEPPool) {
+            m_opConnections.put(conn, false);
+          }
+          return GF_NOERR;
+        }
+      } catch (const TimeoutException&) {
+        error = GF_TIMOUT;
+        LOGFINE(
+            "Send timed out for endpoint %s. "
+            "Message txid = %d",
+            m_name.c_str(), reqTransId);
+        closeFailedConnection(conn);
+        /*
+        if ( !(m_poolHADM && m_poolHADM->getThreadLocalConnections()) ){ //close
+        connection only when not a sticky connection.
+          closeConnection( conn );
+        }*/
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        int32_t type = request.getMessageType();
+        epFailure = (type != TcrMessage::QUERY && type != TcrMessage::PUTALL &&
+                     type != TcrMessage::PUT_ALL_WITH_CALLBACK &&
+                     type != TcrMessage::EXECUTE_FUNCTION &&
+                     type != TcrMessage::EXECUTE_REGION_FUNCTION &&
+                     type != TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP &&
+                     type != TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE);
+
+        // epFailure = true;
+        failReason = "timed out waiting for endpoint";
+        createNewConn = true;
+      } catch (const GeodeIOException& ex) {
+        error = GF_IOERR;
+        epFailure = true;
+        failReason = "IO error for endpoint";
+        if (!handleIOException(ex.what(), conn,
+                               isBgThread)) {  // change here
+          break;
+        }
+        createNewConn = true;
+      } catch (const Exception& ex) {
+        failReason = ex.getName();
+        failReason.append(": ");
+        failReason.append(ex.what());
+        LOGWARN("Error during send for endpoint %s due to %s", m_name.c_str(),
+                failReason.c_str());
+        if (compareTransactionIds(reqTransId, reply.getTransId(), failReason,
+                                  conn)) {
+          if (Log::warningEnabled()) {
+            LOGWARN("Stack trace: %s", ex.getStackTrace().c_str());
+          }
+          error = GF_MSG;
+          if (useEPPool) {
+            m_opConnections.put(conn, false);
+          } else {
+            // we are here its better to close the connection as
+            // "compareTransactionIds"
+            // will not close the connection
+            closeConnection(conn);
+          }
+          break;
+        } else {
+          error = GF_NOTCON;
+          epFailure = true;
+          createNewConn = true;
+        }
+      } catch (...) {
+        failReason = "unexpected exception";
+        LOGERROR(
+            "Unexpected exception while sending request to "
+            "endpoint %s",
+            m_name.c_str());
+        if (compareTransactionIds(reqTransId, reply.getTransId(), failReason,
+                                  conn)) {
+          error = GF_MSG;
+          if (useEPPool) {
+            m_opConnections.put(conn, false);
+          } else {
+            // we are here its better to close the connection as
+            // "compareTransactionIds"
+            // will not close the connection
+            closeConnection(conn);
+          }
+          break;
+        } else {
+          error = GF_NOTCON;
+          epFailure = true;
+          createNewConn = true;
+        }
+      }
+    } else {
+      if (useEPPool) {
+        epFailure = true;
+        failReason = "server connection could not be obtained";
+        if (timeout <= std::chrono::microseconds::zero()) {
+          error = GF_TIMOUT;
+          LOGWARN(
+              "No connection available for %ld seconds "
+              "for endpoint %s.",
+              requestedTimeout.count(), m_name.c_str());
+        } else {
+          error = GF_NOTCON;
+          LOGFINE(
+              "Returning without connection with %d seconds remaining "
+              "for endpoint %s.",
+              timeout.count(), m_name.c_str());
+        }
+      } else {
+        LOGERROR("Unexpected failure while sending request to server.");
+        GF_DEV_ASSERT("Bug in TcrEndpoint::sendRequestWithRetry()?" ? false
+                                                                    : true);
+      }
+    }
+  } while (++sendRetryCount <= maxSendRetries);
+  return error;
+}
+
+void TcrEndpoint::setRetry(const TcrMessage& request, int& maxSendRetries) {
+  int32_t type = request.getMessageType();
+  if (type == TcrMessage::QUERY || type == TcrMessage::QUERY_WITH_PARAMETERS ||
+      type == TcrMessage::PUTALL || type == TcrMessage::PUT_ALL_WITH_CALLBACK ||
+      type == TcrMessage::EXECUTE_FUNCTION ||
+      type == TcrMessage::EXECUTE_REGION_FUNCTION ||
+      type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||
+      type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) {
+    maxSendRetries = 0;
+  }
+}
+
+GfErrType TcrEndpoint::send(const TcrMessage& request, TcrMessageReply& reply) {
+  GfErrType error = GF_NOTCON;
+
+  int maxSendRetries = 1;
+  setRetry(request, maxSendRetries);
+
+  TcrConnection* conn = nullptr;
+  bool epFailure;
+  std::string failReason;
+  //  TODO: remove sendRetryCount as parameter.
+  error = sendRequestWithRetry(request, reply, conn, epFailure, failReason,
+                               maxSendRetries, true, reply.getTimeout());
+
+  if (error == GF_NOERR) {
+    m_msgSent = true;
+  }
+
+  if (error != GF_NOERR && epFailure) {
+    LOGFINE("Send Giving up for endpoint %s; reason: %s.", m_name.c_str(),
+            failReason.c_str());
+    setConnectionStatus(false);
+  }
+
+// Postconditions:
+#if GF_DEVEL_ASSERTS == 1
+  int opConnectionsSize = m_opConnections.size();
+  if (!m_isActiveEndpoint && (opConnectionsSize > 1)) {
+    LOGWARN("Connections size = %d, expected maximum %d", opConnectionsSize, 1);
+  } else if (opConnectionsSize > m_maxConnections) {
+    LOGWARN("Connections size = %d, expected maximum %d", opConnectionsSize,
+            m_maxConnections);
+  }
+#endif
+
+  return error;
+}
+
+GfErrType TcrEndpoint::sendRequestConnWithRetry(const TcrMessage& request,
+                                                TcrMessageReply& reply,
+                                                TcrConnection*& conn,
+                                                bool isBgThread) {
+  GfErrType error = GF_NOTCON;
+
+  int maxSendRetries = 1;
+  setRetry(request, maxSendRetries);
+
+  //  Retry on the following send errors:
+  // Timeout: 1 retry
+  // EAGAIN, ECONNRESET, EWOULDBLOCK: 1 retry
+  // Connection pool is empty (too many threads or no connections available): 1
+  // retry
+  bool epFailure;
+  std::string failReason;
+  LOGFINE("sendRequestConnWithRetry:: maxSendRetries = %d ", maxSendRetries);
+  error = sendRequestWithRetry(request, reply, conn, epFailure, failReason,
+                               maxSendRetries, false, reply.getTimeout(),
+                               isBgThread);
+  if (error == GF_NOERR) {
+    m_msgSent = true;
+  }
+
+  if (error != GF_NOERR && epFailure) {
+    LOGFINE("sendRequestConnWithRetry: Giving up for endpoint %s; reason: %s.",
+            m_name.c_str(), failReason.c_str());
+    setConnectionStatus(false);
+  }
+
+  return error;
+}
+
+void TcrEndpoint::setConnectionStatus(bool status) {
+  // : Store the original value of m_isActiveEndpoint.
+  // This is to try make failover more resilient for the case when
+  // a foreground operation thread is connecting to an endpoint while
+  // the notification thread is disconnecting from the same, or vice versa.
+  // By comparing the original value with the new value we know if
+  // someone else has changed the status in that duration, and skip
+  // the change if that is the case.
+  // Same logic applies for the ping thread.
+  // Try something like (after the 2.5 patch release):
+  // bool wasActive = m_isActiveEndpoint;
+  // Then after taking the lock:
+  // If ( !wasActive && isActiveEndpoint ) { return; }
+  ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_connectionLock);
+  if (m_connected != status) {
+    bool connected = m_connected;
+    m_connected = status;
+    if (connected) {
+      m_numberOfTimesFailed += 1;
+      m_isAuthenticated = false;
+      // disconnected
+      LOGFINE("Disconnecting from endpoint %s", m_name.c_str());
+      closeConnections();
+      m_isActiveEndpoint = false;
+      LOGFINE("Disconnected from endpoint %s", m_name.c_str());
+      triggerRedundancyThread();
+    }
+  }
+}
+
+void TcrEndpoint::triggerRedundancyThread() {
+  m_failoverSema.release();
+  m_redundancySema.release();
+}
+
+void TcrEndpoint::closeConnection(TcrConnection*& conn) {
+  conn->close();
+  m_ports.erase(conn->getPort());
+  _GEODE_SAFE_DELETE(conn);
+}
+
+void TcrEndpoint::closeConnections() {
+  m_opConnections.close();
+  m_ports.clear();
+  m_maxConnections = m_cacheImpl->getDistributedSystem()
+                         .getSystemProperties()
+                         .connectionPoolSize();
+}
+
+/*
+void TcrEndpoint::sendNotificationCloseMsg()
+{
+  if (m_notifyConnection != nullptr) {
+    m_notifyReceiver->stop();
+    m_notifyConnection->close();
+  }
+}
+*/
+
+void TcrEndpoint::closeNotification() {
+  LOGFINEST("Closing subscription channel for endpoint %s", m_name.c_str());
+  m_notifyConnection->close();
+  m_notifyReceiver->stopNoblock();
+  TcrConnectionManager& tccm = m_cacheImpl->tcrConnectionManager();
+  tccm.addNotificationForDeletion(m_notifyReceiver, m_notifyConnection,
+                                  m_notificationCleanupSema);
+  m_notifyCount++;
+  m_cleanupSema.release();
+  m_isQueueHosted = false;
+  LOGFINEST(
+      "Added susbcription channel for deletion and "
+      "released cleanup semaphore for endpoint %s",
+      m_name.c_str());
+}
+
+void TcrEndpoint::stopNoBlock() {
+  if (m_notifyReceiver != nullptr) {
+    m_notifyConnection->close();
+    m_notifyReceiver->stopNoblock();
+  }
+}
+
+void TcrEndpoint::stopNotifyReceiverAndCleanup() {
+  LOGFINER("Stopping subscription receiver and cleaning up");
+  ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock);
+
+  if (m_notifyReceiver != nullptr) {
+    LOGFINER("Waiting for notification thread...");
+    // m_notifyReceiver->stopNoblock();
+    m_notifyReceiver->wait();
+    bool found = false;
+    for (std::list<Task<TcrEndpoint>*>::iterator it =
+             m_notifyReceiverList.begin();
+         it != m_notifyReceiverList.end(); it++) {
+      if (*it == m_notifyReceiver) {
+        found = true;
+        break;
+      }
+    }
+
+    if (!found) {
+      _GEODE_SAFE_DELETE(m_notifyReceiver);
+      _GEODE_SAFE_DELETE(m_notifyConnection);
+    }
+  }
+
+  m_numRegionListener = 0;
+
+  if (m_notifyReceiverList.size() > 0) {
+    LOGFINER("TcrEndpoint::stopNotifyReceiverAndCleanup: notifylist size = %d",
+             m_notifyReceiverList.size());
+    for (std::list<Task<TcrEndpoint>*>::iterator it =
+             m_notifyReceiverList.begin();
+         it != m_notifyReceiverList.end(); it++) {
+      LOGFINER(
+          "TcrEndpoint::stopNotifyReceiverAndCleanup: deleting old notify "
+          "recievers.");
+      _GEODE_SAFE_DELETE(*it);
+    }
+  }
+
+  if (m_notifyConnectionList.size() > 0) {
+    LOGFINER("TcrEndpoint::stopNotifyReceiverAndCleanup: notifylist size = %d",
+             m_notifyConnectionList.size());
+    for (std::list<TcrConnection*>::iterator it =
+             m_notifyConnectionList.begin();
+         it != m_notifyConnectionList.end(); it++) {
+      LOGFINER(
+          "TcrEndpoint::stopNotifyReceiverAndCleanup: deleting old notify "
+          "connections.");
+      _GEODE_SAFE_DELETE(*it);
+    }
+  }
+}
+
+void TcrEndpoint::setServerQueueStatus(ServerQueueStatus queueStatus,
+                                       int32_t queueSize) {
+  if (!m_isServerQueueStatusSet) {
+    m_isServerQueueStatusSet = true;
+    m_serverQueueStatus = queueStatus;
+    m_queueSize = queueSize;
+  }
+}
+
+bool TcrEndpoint::isQueueHosted() { return m_isQueueHosted; }
+void TcrEndpoint::processMarker() {
+  m_cacheImpl->tcrConnectionManager().processMarker();
+}
+ std::shared_ptr<QueryService> TcrEndpoint::getQueryService() {
+   return m_cacheImpl->getQueryService(true);
+ }
+ void TcrEndpoint::sendRequestForChunkedResponse(const TcrMessage& request,
+                                                 TcrMessageReply& reply,
+                                                 TcrConnection* conn) {
+   conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply);
+ }
+ void TcrEndpoint::closeFailedConnection(TcrConnection*& conn) {
+   closeConnection(conn);
+}
+
+}  // namespace client
+}  // namespace geode
+}  // namespace apache

-- 
To stop receiving notification emails like this one, please contact
echobravo@apache.org.

Mime
View raw message