geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [geode] branch feature/CleanMicrometer updated: WIP: Fixing PartitionedRegionStats.kt
Date Sat, 28 Jul 2018 00:22:24 GMT
This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch feature/CleanMicrometer
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/CleanMicrometer by this push:
     new ebc5960  WIP: Fixing PartitionedRegionStats.kt
ebc5960 is described below

commit ebc5960898f52edb2e68a0c66e97f7d907592e1e
Author: Udo Kohlmeyer <ukohlmeyer@pivotal.io>
AuthorDate: Fri Jul 27 17:22:12 2018 -0700

    WIP: Fixing PartitionedRegionStats.kt
---
 .../internal/InternalDistributedSystem.java        |   2 -
 .../geode/internal/InternalDataSerializer.java     |   5 +-
 .../org/apache/geode/internal/cache/Oplog.java     |   1 +
 .../apache/geode/internal/cache/OverflowOplog.java |   1 +
 .../geode/internal/cache/PartitionedRegion.java    |   1 +
 .../geode/internal/cache/TombstoneService.java     |   1 +
 .../geode/internal/cache/ha/HARegionQueue.java     |   8 +-
 .../cache/tier/sockets/CacheClientNotifier.java    |  29 +-
 .../cache/tier/sockets/CacheClientProxy.java       | 203 +++---
 .../geode/internal/cache/tier/sockets/Message.java |   5 +-
 .../geode/internal/offheap/MemoryAllocator.java    |   2 +
 .../internal/offheap/MemoryAllocatorImpl.java      |  27 +-
 .../geode/internal/offheap/OffHeapStorage.java     | 280 +--------
 .../sockets/{MessageStats.java => MessageStats.kt} |  17 +-
 .../statistics/cache/CacheClientNotifierStats.kt   |  96 +++
 .../statistics/cache/CacheClientProxyStats.kt      | 101 +++
 .../geode/statistics/cache/CacheServerStats.kt     | 623 ++++++++++++++++++
 .../geode/statistics/client/ClientHealthStats.kt   | 224 +++++++
 .../client/connection/ConnectionStats.kt           |   4 +-
 .../geode/statistics/offheap/OffHeapMemoryStats.kt |  82 +++
 .../geode/statistics/region/HARegionQueueStats.kt  |  80 +++
 .../statistics/region/PartitionedRegionStats.kt    | 695 +++++++++++++++++++++
 22 files changed, 2012 insertions(+), 475 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
index 065d38f..e3623ab 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
@@ -108,9 +108,7 @@ import org.apache.geode.security.GemFireSecurityException;
 import org.apache.geode.security.PostProcessor;
 import org.apache.geode.security.SecurityManager;
 import org.apache.geode.statistics.InternalDistributedSystemStats;
-import org.apache.geode.statistics.Statistics;
 import org.apache.geode.statistics.distributed.DMStats;
-import org.apache.geode.statistics.micrometer.MicrometerStatisticsFactoryImpl;
 
 /**
  * The concrete implementation of {@link DistributedSystem} that provides internal-only
diff --git a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
index 94aea2c..768a2cd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
@@ -83,7 +83,6 @@ import org.apache.geode.ToDataException;
 import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
-import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributedSystemService;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -122,6 +121,7 @@ import org.apache.geode.pdx.internal.PdxReaderImpl;
 import org.apache.geode.pdx.internal.PdxType;
 import org.apache.geode.pdx.internal.PdxWriterImpl;
 import org.apache.geode.pdx.internal.TypeRegistry;
+import org.apache.geode.statistics.distributed.DMStats;
 
 /**
  * Contains static methods for data serializing instances of internal GemFire classes. It also
@@ -3233,9 +3233,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
       return internalCache.getDistributionManager().getStats();
     } else {
       DMStats result = InternalDistributedSystem.getDMStats();
-      if (result == null) {
-        result = new LonerDistributionManager.DummyDMStats();
-      }
       return result;
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
index d56061b..00638aa 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
@@ -112,6 +112,7 @@ import org.apache.geode.internal.sequencelog.EntryLogger;
 import org.apache.geode.internal.shared.NativeCalls;
 import org.apache.geode.internal.util.BlobHelper;
 import org.apache.geode.pdx.internal.PdxWriterImpl;
+import org.apache.geode.statistics.disk.DiskStoreStats;
 
 /**
  * Implements an operation log to write to disk. As of prPersistSprint2 this file only supports
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/OverflowOplog.java b/geode-core/src/main/java/org/apache/geode/internal/cache/OverflowOplog.java
index 2d6ac53..22ec465 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/OverflowOplog.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/OverflowOplog.java
@@ -44,6 +44,7 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.statistics.disk.DiskStoreStats;
 
 /**
  * An oplog used for overflow-only regions. For regions that are persistent (i.e. they can be
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 7a54494..e5a13bb 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -251,6 +251,7 @@ import org.apache.geode.internal.sequencelog.RegionLogger;
 import org.apache.geode.internal.size.Sizeable;
 import org.apache.geode.internal.util.TransformUtils;
 import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch;
+import org.apache.geode.statistics.disk.DiskRegionStats;
 
 /**
  * A Region whose total storage is split into chunks of data (partitions) which are copied up to a
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
index 14c1d53..b757e05 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
@@ -48,6 +48,7 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.geode.internal.logging.log4j.LogMarker;
 import org.apache.geode.internal.size.ReflectionSingleObjectSizer;
 import org.apache.geode.internal.util.concurrent.StoppableReentrantLock;
+import org.apache.geode.statistics.cache.CachePerfStats;
 
 /**
  * Tombstones are region entries that have been destroyed but are held for future concurrency
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 9d63eae..be2b198 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -50,7 +50,6 @@ import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.InternalGemFireException;
-import org.apache.geode.statistics.StatisticsFactory;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.CacheException;
@@ -66,7 +65,6 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.TimeoutException;
-import org.apache.geode.cache.query.internal.CqQueryVsdStats;
 import org.apache.geode.cache.query.internal.cq.CqService;
 import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
 import org.apache.geode.cache.server.CacheServer;
@@ -103,6 +101,8 @@ import org.apache.geode.internal.util.concurrent.StoppableCondition;
 import org.apache.geode.internal.util.concurrent.StoppableReentrantLock;
 import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
 import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock.StoppableWriteLock;
+import org.apache.geode.statistics.query.CqQueryVsdStats;
+import org.apache.geode.statistics.region.HARegionQueueStats;
 
 /**
  * An implementation of Queue using Gemfire Region as the underlying datastructure. The key will be
@@ -369,11 +369,10 @@ public class HARegionQueue implements RegionQueue {
     String processedRegionName = createRegionName(regionName);
 
     // Initialize the statistics
-    StatisticsFactory factory = cache.getDistributedSystem().getStatisticsFactory();
     createHARegion(processedRegionName, cache);
 
     initializeHARegionQueue(processedRegionName, this.region, haContainer, clientProxyId,
-        clientConflation, isPrimary, new HARegionQueueStats(factory, processedRegionName),
+        clientConflation, isPrimary, new HARegionQueueStats(processedRegionName),
         new StoppableReentrantReadWriteLock(cache.getCancelCriterion()),
         new StoppableReentrantReadWriteLock(region.getCancelCriterion()),
         this.region.getCancelCriterion(), true);
@@ -3418,7 +3417,6 @@ public class HARegionQueue implements RegionQueue {
       }
       ((HAContainerWrapper) haContainer).removeProxy(regionName);
     } finally {
-      this.stats.close();
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index f500e8e..e5d4463 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -107,10 +107,11 @@ import org.apache.geode.internal.logging.InternalLogWriter;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.geode.internal.net.SocketCloser;
-import org.apache.geode.internal.statistics.DummyStatisticsFactory;
 import org.apache.geode.security.AccessControl;
 import org.apache.geode.security.AuthenticationFailedException;
 import org.apache.geode.security.AuthenticationRequiredException;
+import org.apache.geode.statistics.cache.CacheClientNotifierStats;
+import org.apache.geode.statistics.cache.CacheServerStats;
 
 /**
  * Class <code>CacheClientNotifier</code> works on the server and manages client socket connections
@@ -1496,9 +1497,6 @@ public class CacheClientNotifier {
       // cancel the ping task
       this.clientPingTask.cancel();
 
-      // Close the statistics
-      this.statistics.close();
-
       this.socketCloser.close();
     }
   }
@@ -1911,13 +1909,7 @@ public class CacheClientNotifier {
     this.messageTimeToLive = messageTimeToLive;
 
     // Initialize the statistics
-    StatisticsFactory factory;
-    if (isGatewayReceiver) {
-      factory = new DummyStatisticsFactory(cache.getDistributedSystem().getStatisticsFactory());
-    } else {
-      factory = this.getCache().getDistributedSystem().getStatisticsFactory();
-    }
-    this.statistics = new CacheClientNotifierStats();
+    this.statistics = new CacheClientNotifierStats("");
 
     try {
       this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
@@ -1957,11 +1949,6 @@ public class CacheClientNotifier {
     if (this.compiledQueries.putIfAbsent(query.getQueryString(), query) == null) {
       // Added successfully.
       this.statistics.incCompiledQueryCount(1);
-      if (logger.isDebugEnabled()) {
-        logger.debug(
-            "Added compiled query into ccn.compliedQueries list. Query: {}. Total compiled queries: {}",
-            query.getQueryString(), this.statistics.getCompiledQueryCount());
-      }
       // Start the clearIdleCompiledQueries thread.
       startCompiledQueryCleanupThread();
     }
@@ -1975,11 +1962,6 @@ public class CacheClientNotifier {
     if (this.compiledQueries.size() > 0) {
       this.statistics.incCompiledQueryCount(-(this.compiledQueries.size()));
       this.compiledQueries.clear();
-      if (logger.isDebugEnabled()) {
-        logger.debug(
-            "Removed all compiled queries from ccn.compliedQueries list. Total compiled queries: {}",
-            this.statistics.getCompiledQueryCount());
-      }
     }
   }
 
@@ -2007,11 +1989,6 @@ public class CacheClientNotifier {
             if (compiledQueries.remove(e.getKey()) != null) {
               // If successfully removed decrement the counter.
               statistics.incCompiledQueryCount(-1);
-              if (isDebugEnabled) {
-                logger.debug("Removed compiled query from ccn.compliedQueries list. Query: "
-                    + q.getQueryString() + ". Total compiled queries are : "
-                    + statistics.getCompiledQueryCount());
-              }
             }
           }
         }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 60ba65f..0609411 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -44,7 +44,6 @@ import org.apache.shiro.util.ThreadState;
 
 import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializer;
-import org.apache.geode.statistics.StatisticsFactory;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.ClientSession;
@@ -104,12 +103,13 @@ import org.apache.geode.internal.logging.log4j.LogMarker;
 import org.apache.geode.internal.security.AuthorizeRequestPP;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.security.AccessControl;
+import org.apache.geode.statistics.cache.CacheClientProxyStats;
+import org.apache.geode.statistics.region.HARegionQueueStats;
 
 /**
  * Class <code>CacheClientProxy</code> represents the server side of the {@link CacheClientUpdater}.
  * It queues messages to be sent from the server to the client. It then reads those messages from
  * the queue and sends them to the client.
- *
  * @since GemFire 4.2
  */
 @SuppressWarnings("synthetic-access")
@@ -203,7 +203,9 @@ public class CacheClientProxy implements ClientSession {
    */
   // protected static final int MESSAGE_QUEUE_SIZE_DEFAULT = 230000;
 
-  /** The message queue size */
+  /**
+   * The message queue size
+   */
   protected final int _maximumMessageCount;
 
   /**
@@ -240,7 +242,9 @@ public class CacheClientProxy implements ClientSession {
 
   private boolean isPrimary;
 
-  /** @since GemFire 5.7 */
+  /**
+   * @since GemFire 5.7
+   */
   protected byte clientConflation = Handshake.CONFLATION_DEFAULT;
 
   /**
@@ -267,7 +271,6 @@ public class CacheClientProxy implements ClientSession {
    * A map of region name as key and integer as its value. Basically, it stores the names of the
    * regions with <code>DataPolicy</code> as EMPTY. If an event's region name is present in this
    * map, it's full value (and not delta) is sent to the client represented by this proxy.
-   *
    * @since GemFire 6.1
    */
   private volatile Map regionsWithEmptyDataPolicy = new HashMap();
@@ -290,10 +293,14 @@ public class CacheClientProxy implements ClientSession {
    */
   private final long _acceptorId;
 
-  /** acceptor's setting for notifyBySubscription */
+  /**
+   * acceptor's setting for notifyBySubscription
+   */
   private final boolean notifyBySubscription;
 
-  /** To queue the events arriving during message dispatcher initialization */
+  /**
+   * To queue the events arriving during message dispatcher initialization
+   */
   private volatile ConcurrentLinkedQueue<Conflatable> queuedEvents =
       new ConcurrentLinkedQueue<Conflatable>();
 
@@ -309,7 +316,9 @@ public class CacheClientProxy implements ClientSession {
   private final AtomicInteger pingCounter = new AtomicInteger();
 
 
-  /** Date on which this instances was created */
+  /**
+   * Date on which this instances was created
+   */
   private Date creationDate;
 
   /**
@@ -319,7 +328,9 @@ public class CacheClientProxy implements ClientSession {
   private boolean drainLocked = false;
   private final Object drainLock = new Object();
 
-  /** number of cq drains that are currently in progress **/
+  /**
+   * number of cq drains that are currently in progress
+   **/
   private int numDrainsInProgress = 0;
   private final Object drainsInProgressLock = new Object();
 
@@ -327,7 +338,6 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Constructor.
-   *
    * @param ccn The <code>CacheClientNotifier</code> registering this proxy
    * @param socket The socket between the server and the client
    * @param proxyID representing the Connection Proxy of the clien
@@ -335,9 +345,11 @@ public class CacheClientProxy implements ClientSession {
    * @throws CacheException {
    */
   protected CacheClientProxy(CacheClientNotifier ccn, Socket socket,
-      ClientProxyMembershipID proxyID, boolean isPrimary, byte clientConflation,
-      Version clientVersion, long acceptorId, boolean notifyBySubscription,
-      SecurityService securityService, Subject subject) throws CacheException {
+                             ClientProxyMembershipID proxyID, boolean isPrimary,
+                             byte clientConflation,
+                             Version clientVersion, long acceptorId, boolean notifyBySubscription,
+                             SecurityService securityService, Subject subject)
+      throws CacheException {
 
     initializeTransientFields(socket, proxyID, isPrimary, clientConflation, clientVersion);
     this._cacheClientNotifier = ccn;
@@ -347,9 +359,8 @@ public class CacheClientProxy implements ClientSession {
     this._messageTimeToLive = ccn.getMessageTimeToLive();
     this._acceptorId = acceptorId;
     this.notifyBySubscription = notifyBySubscription;
-    StatisticsFactory factory = this._cache.getDistributedSystem().getStatisticsFactory();
     this._statistics =
-        new CacheClientProxyStats(factory, "id_" + this.proxyID.getDistributedMember().getId()
+        new CacheClientProxyStats("id_" + this.proxyID.getDistributedMember().getId()
             + "_at_" + this._remoteHostAddress + ":" + this._socket.getPort());
     this.subject = subject;
 
@@ -366,8 +377,9 @@ public class CacheClientProxy implements ClientSession {
   }
 
   private void initializeClientAuths() {
-    if (AcceptorImpl.isPostAuthzCallbackPresent())
+    if (AcceptorImpl.isPostAuthzCallbackPresent()) {
       this.clientUserAuths = ServerConnection.getClientUserAuths(this.proxyID);
+    }
   }
 
   private void reinitializeClientAuths() {
@@ -383,8 +395,9 @@ public class CacheClientProxy implements ClientSession {
   public void setPostAuthzCallback(AccessControl authzCallback) {
     // TODO:hitesh synchronization
     synchronized (this.clientUserAuthsLock) {
-      if (this.postAuthzCallback != null)
+      if (this.postAuthzCallback != null) {
         this.postAuthzCallback.close();
+      }
       this.postAuthzCallback = authzCallback;
     }
   }
@@ -402,13 +415,14 @@ public class CacheClientProxy implements ClientSession {
   public void setCQVsUserAuth(String cqName, long uniqueId, boolean isDurable) {
     if (postAuthzCallback == null) // only for multiuser
     {
-      if (this.clientUserAuths != null)
+      if (this.clientUserAuths != null) {
         this.clientUserAuths.setUserAuthAttributesForCq(cqName, uniqueId, isDurable);
+      }
     }
   }
 
   private void initializeTransientFields(Socket socket, ClientProxyMembershipID pid, boolean ip,
-      byte cc, Version vers) {
+                                         byte cc, Version vers) {
     this._socket = socket;
     this.proxyID = pid;
     this.connected = true;
@@ -466,7 +480,6 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Set the queue keepalive option
-   *
    * @param option whether to keep the durable client's queue alive
    */
   protected void setKeepAlive(boolean option) {
@@ -475,7 +488,6 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Returns the socket between the server and the client
-   *
    * @return the socket between the server and the client
    */
   protected Socket getSocket() {
@@ -492,7 +504,6 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Returns the remote host's IP address string
-   *
    * @return the remote host's IP address string
    */
   protected String getRemoteHostAddress() {
@@ -501,7 +512,6 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Returns the remote host's port
-   *
    * @return the remote host's port
    */
   public int getRemotePort() {
@@ -510,7 +520,6 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Returns whether the proxy is connected to a remote client
-   *
    * @return whether the proxy is connected to a remote client
    */
   public boolean isConnected() {
@@ -519,7 +528,6 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Mark the receiver as needing removal
-   *
    * @return true if it was already marked for removal
    */
   protected boolean startRemoval() {
@@ -533,7 +541,6 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Wait until the receiver's removal has completed before returning.
-   *
    * @return true if the proxy was initially marked for removal
    */
   protected boolean waitRemoval() {
@@ -574,7 +581,6 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Returns the GemFire cache
-   *
    * @return the GemFire cache
    */
   public InternalCache getCache() {
@@ -593,7 +599,6 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Returns the proxy's statistics
-   *
    * @return the proxy's statistics
    */
   public CacheClientProxyStats getStatistics() {
@@ -602,7 +607,6 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Returns this proxy's <code>CacheClientNotifier</code>.
-   *
    * @return this proxy's <code>CacheClientNotifier</code>
    */
   protected CacheClientNotifier getCacheClientNotifier() {
@@ -726,7 +730,6 @@ public class CacheClientProxy implements ClientSession {
   /**
    * Returns whether the proxy is alive. It is alive if its message dispatcher is processing
    * messages.
-   *
    * @return whether the proxy is alive
    */
   protected boolean isAlive() {
@@ -739,9 +742,7 @@ public class CacheClientProxy implements ClientSession {
   /**
    * Returns whether the proxy is paused. It is paused if its message dispatcher is paused. This
    * only applies to durable clients.
-   *
    * @return whether the proxy is paused
-   *
    * @since GemFire 5.5
    */
   public boolean isPaused() {
@@ -755,7 +756,6 @@ public class CacheClientProxy implements ClientSession {
   /**
    * Closes the proxy. This method checks the message queue for any unprocessed messages and
    * processes them for MAXIMUM_SHUTDOWN_PEEKS.
-   *
    * @see CacheClientProxy#MAXIMUM_SHUTDOWN_PEEKS
    */
   protected void close() {
@@ -769,11 +769,9 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Close the <code>CacheClientProxy</code>.
-   *
    * @param checkQueue Whether to message check the queue and process any contained messages (up to
-   *        MAXIMUM_SHUTDOWN_PEEKS).
+   * MAXIMUM_SHUTDOWN_PEEKS).
    * @param stoppedNormally Whether client stopped normally
-   *
    * @return whether to keep this <code>CacheClientProxy</code>
    * @see CacheClientProxy#MAXIMUM_SHUTDOWN_PEEKS
    */
@@ -811,7 +809,7 @@ public class CacheClientProxy implements ClientSession {
     } catch (Exception ex) {
       if (this._cache.getSecurityLoggerI18n().warningEnabled()) {
         this._cache.getSecurityLoggerI18n().warning(LocalizedStrings.TWO_ARG_COLON,
-            new Object[] {this, ex});
+            new Object[]{this, ex});
       }
     }
     // Notify the caller whether to keep this proxy. If the proxy is durable
@@ -942,7 +940,6 @@ public class CacheClientProxy implements ClientSession {
       }
     } finally {
       // Close the statistics
-      this._statistics.close(); // fix for bug 40105
       closeTransientFields(); // make sure this happens
     }
   }
@@ -1030,7 +1027,7 @@ public class CacheClientProxy implements ClientSession {
   }
 
   public void registerInterestRegex(String regionName, String regex, boolean isDurable,
-      boolean receiveValues) {
+                                    boolean receiveValues) {
     if (this.isPrimary) {
       // Notify all secondaries and client of change in interest
       notifySecondariesAndClient(regionName, regex, InterestResultPolicy.NONE, isDurable,
@@ -1042,12 +1039,12 @@ public class CacheClientProxy implements ClientSession {
   }
 
   public void registerInterest(String regionName, Object keyOfInterest, InterestResultPolicy policy,
-      boolean isDurable) {
+                               boolean isDurable) {
     registerInterest(regionName, keyOfInterest, policy, isDurable, true);
   }
 
   public void registerInterest(String regionName, Object keyOfInterest, InterestResultPolicy policy,
-      boolean isDurable, boolean receiveValues) {
+                               boolean isDurable, boolean receiveValues) {
     if (keyOfInterest instanceof String && keyOfInterest.equals("ALL_KEYS")) {
       registerInterestRegex(regionName, ".*", isDurable, receiveValues);
     } else if (keyOfInterest instanceof List) {
@@ -1078,7 +1075,8 @@ public class CacheClientProxy implements ClientSession {
   }
 
   private void notifySecondariesAndClient(String regionName, Object keyOfInterest,
-      InterestResultPolicy policy, boolean isDurable, boolean receiveValues, int interestType) {
+                                          InterestResultPolicy policy, boolean isDurable,
+                                          boolean receiveValues, int interestType) {
     // Create a client interest message for the keyOfInterest
     ClientInterestMessageImpl message = new ClientInterestMessageImpl(
         new EventID(this._cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
@@ -1100,7 +1098,7 @@ public class CacheClientProxy implements ClientSession {
   }
 
   private void enqueueInitialValue(ClientInterestMessageImpl clientInterestMessage,
-      String regionName, Object keyOfInterest) {
+                                   String regionName, Object keyOfInterest) {
     // Get the initial value
     Get70 request = (Get70) Get70.getCommand();
     LocalRegion lr = (LocalRegion) this._cache.getRegion(regionName);
@@ -1150,7 +1148,7 @@ public class CacheClientProxy implements ClientSession {
     // client doesn't support a ClientInterestMessageImpl containing a list.
     if (Version.GFE_701.compareTo(this.clientVersion) > 0
         && message.getKeyOfInterest() instanceof List) {
-      for (Iterator i = ((List) message.getKeyOfInterest()).iterator(); i.hasNext();) {
+      for (Iterator i = ((List) message.getKeyOfInterest()).iterator(); i.hasNext(); ) {
         this._messageDispatcher.enqueueMessage(
             new ClientInterestMessageImpl(getCache().getDistributedSystem(), message, i.next()));
       }
@@ -1164,7 +1162,7 @@ public class CacheClientProxy implements ClientSession {
   }
 
   public void unregisterInterestRegex(String regionName, String regex, boolean isDurable,
-      boolean receiveValues) {
+                                      boolean receiveValues) {
     if (this.isPrimary) {
       notifySecondariesAndClient(regionName, regex, isDurable, receiveValues,
           InterestType.REGULAR_EXPRESSION);
@@ -1179,7 +1177,7 @@ public class CacheClientProxy implements ClientSession {
   }
 
   public void unregisterInterest(String regionName, Object keyOfInterest, boolean isDurable,
-      boolean receiveValues) {
+                                 boolean receiveValues) {
     if (keyOfInterest instanceof String && keyOfInterest.equals("ALL_KEYS")) {
       unregisterInterestRegex(regionName, ".*", isDurable, receiveValues);
     } else {
@@ -1194,7 +1192,8 @@ public class CacheClientProxy implements ClientSession {
   }
 
   private void notifySecondariesAndClient(String regionName, Object keyOfInterest,
-      boolean isDurable, boolean receiveValues, int interestType) {
+                                          boolean isDurable, boolean receiveValues,
+                                          int interestType) {
     // Notify all secondary proxies of a change in interest
     ClientInterestMessageImpl message = new ClientInterestMessageImpl(
         new EventID(this._cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
@@ -1240,12 +1239,12 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Registers interest in the input region name and key
-   *
    * @param regionName The fully-qualified name of the region in which to register interest
    * @param keyOfInterest The key in which to register interest
    */
   protected void registerClientInterest(String regionName, Object keyOfInterest, int interestType,
-      boolean isDurable, boolean sendUpdatesAsInvalidates, boolean flushState) {
+                                        boolean isDurable, boolean sendUpdatesAsInvalidates,
+                                        boolean flushState) {
     ClientInterestList cil =
         this.cils[RegisterInterestTracker.getInterestLookupIndex(isDurable, false)];
     cil.registerClientInterest(regionName, keyOfInterest, interestType, sendUpdatesAsInvalidates);
@@ -1303,13 +1302,12 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Unregisters interest in the input region name and key
-   *
    * @param regionName The fully-qualified name of the region in which to unregister interest
    * @param keyOfInterest The key in which to unregister interest
    * @param isClosing Whether the caller is closing
    */
   protected void unregisterClientInterest(String regionName, Object keyOfInterest, int interestType,
-      boolean isClosing) {
+                                          boolean isClosing) {
     // only unregister durable interest if isClosing and !keepalive
     if (!isClosing /* explicit unregister */
         || !getDurableKeepAlive() /* close and no keepAlive */) {
@@ -1323,12 +1321,12 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Registers interest in the input region name and list of keys
-   *
    * @param regionName The fully-qualified name of the region in which to register interest
    * @param keysOfInterest The list of keys in which to register interest
    */
   protected void registerClientInterestList(String regionName, List keysOfInterest,
-      boolean isDurable, boolean sendUpdatesAsInvalidates, boolean flushState) {
+                                            boolean isDurable, boolean sendUpdatesAsInvalidates,
+                                            boolean flushState) {
     // we only use two interest lists to map the non-durable and durable
     // identifiers to their interest settings
     ClientInterestList cil = this.cils[RegisterInterestTracker.getInterestLookupIndex(isDurable,
@@ -1345,13 +1343,12 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Unregisters interest in the input region name and list of keys
-   *
    * @param regionName The fully-qualified name of the region in which to unregister interest
    * @param keysOfInterest The list of keys in which to unregister interest
    * @param isClosing Whether the caller is closing
    */
   protected void unregisterClientInterest(String regionName, List keysOfInterest,
-      boolean isClosing) {
+                                          boolean isClosing) {
     // only unregister durable interest if isClosing and !keepalive
     if (!isClosing /* explicit unregister */
         || !getDurableKeepAlive() /* close and no keepAlive */) {
@@ -1364,7 +1361,9 @@ public class CacheClientProxy implements ClientSession {
   }
 
 
-  /** sent by the cache client notifier when there is an interest registration change */
+  /**
+   * sent by the cache client notifier when there is an interest registration change
+   */
   protected void processInterestMessage(ClientInterestMessageImpl message) {
     // Register or unregister interest depending on the interest type
     int interestType = message.getInterestType();
@@ -1415,7 +1414,7 @@ public class CacheClientProxy implements ClientSession {
     // #52088)
     if (message.isRegister() && message.getInterestType() == InterestType.KEY
         && !(key instanceof List) && InterestResultPolicy
-            .fromOrdinal(message.getInterestResultPolicy()) == InterestResultPolicy.KEYS_VALUES) {
+        .fromOrdinal(message.getInterestResultPolicy()) == InterestResultPolicy.KEYS_VALUES) {
       enqueueInitialValue(message, regionName, key);
     }
   }
@@ -1439,7 +1438,7 @@ public class CacheClientProxy implements ClientSession {
         if (opctxt == null) {
           logger.warn(LocalizedMessage.create(
               LocalizedStrings.CacheClientProxy__0_NOT_ADDING_MESSAGE_TO_QUEUE_1_BECAUSE_THE_OPERATION_CONTEXT_OBJECT_COULD_NOT_BE_OBTAINED_FOR_THIS_CLIENT_MESSAGE,
-              new Object[] {this, clientMessage}));
+              new Object[]{this, clientMessage}));
           return false;
         }
 
@@ -1484,7 +1483,7 @@ public class CacheClientProxy implements ClientSession {
             if (!isAuthorized) {
               logger.warn(LocalizedMessage.create(
                   LocalizedStrings.CacheClientProxy__0_NOT_ADDING_CQ_MESSAGE_TO_QUEUE_1_BECAUSE_AUTHORIZATION_FAILED,
-                  new Object[] {this, clientMessage}));
+                  new Object[]{this, clientMessage}));
               clientCq.delete(cqNames[i]);
             }
           } catch (Exception ex) {
@@ -1512,7 +1511,7 @@ public class CacheClientProxy implements ClientSession {
       if (opctxt == null) {
         logger.warn(LocalizedMessage.create(
             LocalizedStrings.CacheClientProxy__0_NOT_ADDING_MESSAGE_TO_QUEUE_1_BECAUSE_THE_OPERATION_CONTEXT_OBJECT_COULD_NOT_BE_OBTAINED_FOR_THIS_CLIENT_MESSAGE,
-            new Object[] {this, clientMessage}));
+            new Object[]{this, clientMessage}));
         return false;
       }
       if (logger.isTraceEnabled()) {
@@ -1529,7 +1528,7 @@ public class CacheClientProxy implements ClientSession {
       if (!isAuthorize) {
         logger.warn(LocalizedMessage.create(
             LocalizedStrings.CacheClientProxy__0_NOT_ADDING_MESSAGE_TO_QUEUE_1_BECAUSE_AUTHORIZATION_FAILED,
-            new Object[] {this, clientMessage}));
+            new Object[]{this, clientMessage}));
         return false;
       }
     }
@@ -1539,7 +1538,6 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Delivers the message to the client representing this client proxy.
-   *
    */
   protected void deliverMessage(Conflatable conflatable) {
     ThreadState state = this.securityService.bindSubject(this.subject);
@@ -1565,7 +1563,7 @@ public class CacheClientProxy implements ClientSession {
       if (this.messageDispatcherInit) {
         synchronized (this.queuedEventsSync) {
           if (this.messageDispatcherInit) { // Check to see value did not changed while getting the
-                                            // synchronize lock.
+            // synchronize lock.
             if (logger.isDebugEnabled()) {
               logger.debug(
                   "Message dispatcher for proxy {} is getting initialized. Adding message to the queuedEvents.",
@@ -1591,8 +1589,9 @@ public class CacheClientProxy implements ClientSession {
       this._statistics.incMessagesFailedQueued();
     }
 
-    if (state != null)
+    if (state != null) {
       state.clear();
+    }
   }
 
   protected void sendMessageDirectly(ClientMessage message) {
@@ -1671,7 +1670,6 @@ public class CacheClientProxy implements ClientSession {
   /**
    * Initializes the message dispatcher thread. The <code>MessageDispatcher</code> processes the
    * message queue.
-   *
    */
   public void initializeMessageDispatcher() throws CacheException {
     this.messageDispatcherInit = true; // Initialization process.
@@ -1705,9 +1703,6 @@ public class CacheClientProxy implements ClientSession {
         this.messageDispatcherInit = false; // Done initialization.
       }
     } finally {
-      if (this.messageDispatcherInit) { // If its not successfully completed.
-        this._statistics.close();
-      }
     }
   }
 
@@ -1841,17 +1836,15 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Reinitialize a durable <code>CacheClientProxy</code> with a new client.
-   *
    * @param socket The socket between the server and the client
    * @param ip whether this proxy represents the primary
    */
   protected void reinitialize(Socket socket, ClientProxyMembershipID proxyId, Cache cache,
-      boolean ip, byte cc, Version ver) {
+                              boolean ip, byte cc, Version ver) {
     // Re-initialize transient fields
     initializeTransientFields(socket, proxyId, ip, cc, ver);
     getCacheClientNotifier().getAcceptorStats().incCurrentQueueConnections();
 
-
     // Cancel expiration task
     cancelDurableExpirationTask(true);
 
@@ -1986,7 +1979,7 @@ public class CacheClientProxy implements ClientSession {
      * Registers interest in the input region name and key
      */
     protected void registerClientInterest(String regionName, Object keyOfInterest, int interestType,
-        boolean sendUpdatesAsInvalidates) {
+                                          boolean sendUpdatesAsInvalidates) {
       if (logger.isDebugEnabled()) {
         logger.debug("{}: registerClientInterest region={} key={}", ccp, regionName, keyOfInterest);
       }
@@ -2024,12 +2017,11 @@ public class CacheClientProxy implements ClientSession {
 
     /**
      * Unregisters interest in the input region name and key
-     *
      * @param regionName The fully-qualified name of the region in which to unregister interest
      * @param keyOfInterest The key in which to unregister interest
      */
     protected void unregisterClientInterest(String regionName, Object keyOfInterest,
-        int interestType) {
+                                            int interestType) {
       if (logger.isDebugEnabled()) {
         logger.debug("{}: unregisterClientInterest region={} key={}", ccp, regionName,
             keyOfInterest);
@@ -2053,12 +2045,11 @@ public class CacheClientProxy implements ClientSession {
 
     /**
      * Registers interest in the input region name and list of keys
-     *
      * @param regionName The fully-qualified name of the region in which to register interest
      * @param keysOfInterest The list of keys in which to register interest
      */
     protected void registerClientInterestList(String regionName, List keysOfInterest,
-        boolean sendUpdatesAsInvalidates) {
+                                              boolean sendUpdatesAsInvalidates) {
       FilterProfile p = getProfile(regionName);
       if (p == null) {
         throw new RegionDestroyedException("Region not found during client interest registration",
@@ -2077,7 +2068,6 @@ public class CacheClientProxy implements ClientSession {
 
     /**
      * Unregisters interest in the input region name and list of keys
-     *
      * @param regionName The fully-qualified name of the region in which to unregister interest
      * @param keysOfInterest The list of keys in which to unregister interest
      */
@@ -2143,7 +2133,7 @@ public class CacheClientProxy implements ClientSession {
 
 
     private void handleInterestEvent(String regionName, Set keysOfInterest, int interestType,
-        boolean isRegister) {
+                                     boolean isRegister) {
       // Notify the region about this register interest event if:
       // - the application has requested it
       // - this is a primary CacheClientProxy (otherwise multiple notifications
@@ -2243,9 +2233,8 @@ public class CacheClientProxy implements ClientSession {
 
     /**
      * Constructor.
-     *
      * @param proxy The <code>CacheClientProxy</code> for which this dispatcher is processing
-     *        messages
+     * messages
      * @param name thread name for this dispatcher
      */
     protected MessageDispatcher(CacheClientProxy proxy, String name) throws CacheException {
@@ -2330,10 +2319,8 @@ public class CacheClientProxy implements ClientSession {
 
     /**
      * Notifies the dispatcher to stop dispatching.
-     *
      * @param checkQueue Whether to check the message queue for any unprocessed messages and process
-     *        them for MAXIMUM_SHUTDOWN_PEEKS.
-     *
+     * them for MAXIMUM_SHUTDOWN_PEEKS.
      * @see CacheClientProxy#MAXIMUM_SHUTDOWN_PEEKS
      */
     protected synchronized void stopDispatching(boolean checkQueue) {
@@ -2372,8 +2359,9 @@ public class CacheClientProxy implements ClientSession {
               logger.debug("{}: Exception occurred while trying to stop dispatching", this, e);
             }
           } finally {
-            if (interrupted)
+            if (interrupted) {
               Thread.currentThread().interrupt();
+            }
           }
         } // for
       } finally {
@@ -2383,7 +2371,6 @@ public class CacheClientProxy implements ClientSession {
 
     /**
      * Returns whether the dispatcher is stopped
-     *
      * @return whether the dispatcher is stopped
      */
     protected boolean isStopped() {
@@ -2393,7 +2380,6 @@ public class CacheClientProxy implements ClientSession {
     /**
      * Returns the size of the queue for heuristic purposes. This size may be changing concurrently
      * if puts / gets are occurring at the same time.
-     *
      * @return the size of the queue
      */
     protected int getQueueSize() {
@@ -2403,7 +2389,6 @@ public class CacheClientProxy implements ClientSession {
     /**
      * Returns the size of the queue calculated through stats This includes events that have
      * dispatched but have yet been removed
-     *
      * @return the size of the queue
      */
     protected int getQueueSizeStat() {
@@ -2418,7 +2403,7 @@ public class CacheClientProxy implements ClientSession {
     }
 
     protected void drainClientCqEvents(ClientProxyMembershipID clientId,
-        InternalCqQuery cqToClose) {
+                                       InternalCqQuery cqToClose) {
       this._messageQueue.closeClientCq(clientId, cqToClose);
     }
 
@@ -2632,7 +2617,7 @@ public class CacheClientProxy implements ClientSession {
             extraMsg =
                 LocalizedStrings.CacheClientProxy_PROBLEM_CAUSED_BY_MESSAGE_QUEUE_BEING_CLOSED;
           }
-          final Object[] msgArgs = new Object[] {((!isStopped()) ? this.toString() + ": " : ""),
+          final Object[] msgArgs = new Object[]{((!isStopped()) ? this.toString() + ": " : ""),
               ((list == null) ? 0 : list.size())};
           if (extraMsg != null) {
             // Dont print exception details, but add on extraMsg
@@ -2696,9 +2681,7 @@ public class CacheClientProxy implements ClientSession {
 
     /**
      * Sends a message to the client attached to this proxy
-     *
      * @param clientMessage The <code>ClientMessage</code> to send to the client
-     *
      */
     protected boolean dispatchMessage(ClientMessage clientMessage) throws IOException {
       boolean isDispatched = false;
@@ -2771,7 +2754,6 @@ public class CacheClientProxy implements ClientSession {
 
     /**
      * Add the input client message to the message queue
-     *
      * @param clientMessage The <code>Conflatable</code> to add to the queue
      */
     protected void enqueueMessage(Conflatable clientMessage) {
@@ -2901,19 +2883,7 @@ public class CacheClientProxy implements ClientSession {
   }
 
   /**
-   * Returns the current number of CQS the client installed.
-   *
-   * @return int the current count of CQs for this client
-   */
-  public int getCqCount() {
-    synchronized (this) {
-      return this._statistics.getCqCount();
-    }
-  }
-
-  /**
    * Increment the number of CQs the client installed
-   *
    */
   public void incCqCount() {
     synchronized (this) {
@@ -2923,7 +2893,6 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Decrement the number of CQs the client installed
-   *
    */
   public synchronized void decCqCount() {
     synchronized (this) {
@@ -2932,30 +2901,7 @@ public class CacheClientProxy implements ClientSession {
   }
 
   /**
-   * Returns true if the client has one CQ
-   *
-   * @return true if the client has one CQ
-   */
-  public boolean hasOneCq() {
-    synchronized (this) {
-      return this._statistics.getCqCount() == 1;
-    }
-  }
-
-  /**
-   * Returns true if the client has no CQs
-   *
-   * @return true if the client has no CQs
-   */
-  public boolean hasNoCq() {
-    synchronized (this) {
-      return this._statistics.getCqCount() == 0;
-    }
-  }
-
-  /**
    * Get map of regions with empty data policy
-   *
    * @since GemFire 6.1
    */
   public Map<String, Integer> getRegionsWithEmptyDataPolicy() {
@@ -2973,7 +2919,6 @@ public class CacheClientProxy implements ClientSession {
 
   /**
    * Returns the number of seconds that have elapsed since the Client proxy created.
-   *
    * @since GemFire 7.0
    */
   public long getUpTime() {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
index ea14bb2..2efda6f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
@@ -41,6 +41,7 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.geode.internal.offheap.StoredObject;
 import org.apache.geode.internal.offheap.annotations.Unretained;
 import org.apache.geode.internal.util.BlobHelper;
+import org.apache.geode.statistics.cache.CacheServerStats;
 
 /**
  * This class encapsulates the wire protocol. It provides accessors to encode and decode a message
@@ -483,7 +484,7 @@ public class Message {
     }
     if (this.readHeader) {
       if (this.messageStats != null) {
-        this.messageStats.decMessagesBeingReceived(len);
+        this.messageStats.decMessagesBytesBeingReceived(len);
       }
     }
     ByteBuffer buffer = getCommBuffer();
@@ -752,7 +753,7 @@ public class Message {
       }
     }
     if (this.messageStats != null) {
-      this.messageStats.incMessagesBeingReceived(len);
+      this.messageStats.incMessagesBytesBeingReceived(len);
       this.payloadLength = len; // makes sure payloadLength gets set now so we will dec on clear
     }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocator.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocator.java
index 8353091..cf12c42 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocator.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.internal.offheap;
 
+import org.apache.geode.statistics.offheap.OffHeapMemoryStats;
+
 /**
  * Basic contract for a heap that manages off heap memory. Any MemoryChunks allocated from a heap
  * are returned to that heap when freed.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java
index fc08d76..2e1fdf2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java
@@ -38,6 +38,7 @@ import org.apache.geode.internal.cache.RegionEntry;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.offheap.annotations.OffHeapIdentifier;
 import org.apache.geode.internal.offheap.annotations.Unretained;
+import org.apache.geode.statistics.offheap.OffHeapMemoryStats;
 
 /**
  * This allocator is somewhat like an Arena allocator. We start out with an array of multiple large
@@ -86,12 +87,7 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
   public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats,
       int slabCount, long offHeapMemorySize, long maxSlabSize) {
     return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null,
-        new SlabFactory() {
-          @Override
-          public Slab create(int size) {
-            return new SlabImpl(size);
-          }
-        });
+        size -> new SlabImpl(size));
   }
 
   private static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml,
@@ -147,9 +143,6 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
       }
     } finally {
       if (!created) {
-        if (stats != null) {
-          stats.close();
-        }
         if (ooohml != null) {
           ooohml.close();
         }
@@ -200,8 +193,6 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
           "attempted to reuse existing off-heap memory even though new off-heap memory was allocated");
     }
     this.ooohml = oooml;
-    newStats.initialize(this.stats);
-    this.stats = newStats;
   }
 
   private MemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml,
@@ -229,7 +220,7 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
     Set<OffHeapStoredObject> liveChunksSet = new HashSet<>(liveChunks);
     Set<OffHeapStoredObject> regionChunksSet = new HashSet<>(regionChunks);
     liveChunksSet.removeAll(regionChunksSet);
-    return new ArrayList<OffHeapStoredObject>(liveChunksSet);
+    return new ArrayList<>(liveChunksSet);
   }
 
   /**
@@ -386,7 +377,6 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
     // Removing this memory immediately can lead to a SEGV. See 47885.
     if (setClosed()) {
       this.freeList.freeSlabs();
-      this.stats.close();
       singleton = null;
     }
   }
@@ -461,8 +451,8 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
     }
 
     final long bytesUsed = getUsedMemory();
-    for (int i = 0; i < savedListeners.length; i++) {
-      savedListeners[i].updateMemoryUsed(bytesUsed);
+    for (MemoryUsageListener savedListener : savedListeners) {
+      savedListener.updateMemoryUsed(bytesUsed);
     }
   }
 
@@ -508,12 +498,7 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
     for (OffHeapStoredObject chunk : liveChunks) {
       orphans.add(new MemoryBlockNode(this, chunk));
     }
-    Collections.sort(orphans, new Comparator<MemoryBlock>() {
-      @Override
-      public int compare(MemoryBlock o1, MemoryBlock o2) {
-        return Long.valueOf(o1.getAddress()).compareTo(o2.getAddress());
-      }
-    });
+    Collections.sort(orphans, (o1, o2) -> Long.valueOf(o1.getAddress()).compareTo(o2.getAddress()));
     // this.memoryBlocks = new WeakReference<List<MemoryBlock>>(orphans);
     return orphans;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapStorage.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapStorage.java
index aca642b..1a80595 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapStorage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapStorage.java
@@ -16,118 +16,23 @@ package org.apache.geode.internal.offheap;
 
 import java.lang.reflect.Method;
 
-import org.apache.geode.statistics.StatisticDescriptor;
-import org.apache.geode.statistics.Statistics;
-import org.apache.geode.statistics.StatisticsFactory;
-import org.apache.geode.statistics.StatisticsType;
-import org.apache.geode.statistics.StatisticsTypeFactory;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.DistributionStats;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.ClassPathLoader;
 import org.apache.geode.internal.i18n.LocalizedStrings;
-import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl;
+import org.apache.geode.statistics.offheap.OffHeapMemoryStats;
 
 /**
  * Enables off-heap storage by creating a MemoryAllocator.
  * @since Geode 1.0
  */
-public class OffHeapStorage implements OffHeapMemoryStats {
+public class OffHeapStorage {
   public static final String STAY_CONNECTED_ON_OUTOFOFFHEAPMEMORY_PROPERTY =
       DistributionConfig.GEMFIRE_PREFIX + "offheap.stayConnectedOnOutOfOffHeapMemory";
 
-  // statistics type
-  private StatisticsType statsType;
-  private static final String statsTypeName = "OffHeapMemoryStats";
-  private static final String statsTypeDescription = "Statistics about off-heap memory storage.";
-
-  // statistics instance
-  private static final String statsName = "offHeapMemoryStats";
-
-  // statistics fields
-  private int freeMemoryId;
-  private int maxMemoryId;
-  private int usedMemoryId;
-  private int objectsId;
-  private int readsId;
-  private int defragmentationId;
-  private int fragmentsId;
-  private int largestFragmentId;
-  private int defragmentationTimeId;
-  private int fragmentationId;
-  private int defragmentationsInProgressId;
-
-  private void initializeStats(StatisticsFactory factory) {
-
-    final String usedMemoryDesc =
-        "The amount of off-heap memory, in bytes, that is being used to store data.";
-    final String defragmentationDesc =
-        "The total number of times off-heap memory has been defragmented.";
-    final String defragmentationsInProgressDesc =
-        "Current number of defragment operations currently in progress.";
-    final String defragmentationTimeDesc = "The total time spent defragmenting off-heap memory.";
-    final String
-        fragmentationDesc =
-        "The percentage of off-heap free memory that is fragmented.  Updated every time a defragmentation is performed.";
-    final String
-        fragmentsDesc =
-        "The number of fragments of free off-heap memory. Updated every time a defragmentation is done.";
-    final String freeMemoryDesc =
-        "The amount of off-heap memory, in bytes, that is not being used.";
-    final String
-        largestFragmentDesc =
-        "The largest fragment of memory found by the last defragmentation of off heap memory. Updated every time a defragmentation is done.";
-    final String objectsDesc = "The number of objects stored in off-heap memory.";
-    final String
-        readsDesc =
-        "The total number of reads of off-heap memory. Only reads of a full object increment this statistic. If only a part of the object is read this statistic is not incremented.";
-    final String
-        maxMemoryDesc =
-        "The maximum amount of off-heap memory, in bytes. This is the amount of memory allocated at startup and does not change.";
-
-    final String usedMemory = "usedMemory";
-    final String defragmentations = "defragmentations";
-    final String defragmentationsInProgress = "defragmentationsInProgress";
-    final String defragmentationTime = "defragmentationTime";
-    final String fragmentation = "fragmentation";
-    final String fragments = "fragments";
-    final String freeMemory = "freeMemory";
-    final String largestFragment = "largestFragment";
-    final String objects = "objects";
-    final String reads = "reads";
-    final String maxMemory = "maxMemory";
-
-    statsType = factory.createType(statsTypeName, statsTypeDescription,
-        new StatisticDescriptor[]{factory.createLongGauge(usedMemory, usedMemoryDesc, "bytes"),
-            factory.createIntCounter(defragmentations, defragmentationDesc, "operations"),
-            factory.createIntGauge(defragmentationsInProgress, defragmentationsInProgressDesc,
-                "operations"),
-            factory.createLongCounter(defragmentationTime, defragmentationTimeDesc, "nanoseconds",
-                false),
-            factory.createIntGauge(fragmentation, fragmentationDesc, "percentage"),
-            factory.createLongGauge(fragments, fragmentsDesc, "fragments"),
-            factory.createLongGauge(freeMemory, freeMemoryDesc, "bytes"),
-            factory.createIntGauge(largestFragment, largestFragmentDesc, "bytes"),
-            factory.createIntGauge(objects, objectsDesc, "objects"),
-            factory.createLongCounter(reads, readsDesc, "operations"),
-            factory.createLongGauge(maxMemory, maxMemoryDesc, "bytes"),});
-
-    usedMemoryId = statsType.nameToId(usedMemory);
-    defragmentationId = statsType.nameToId(defragmentations);
-    defragmentationsInProgressId = statsType.nameToId(defragmentationsInProgress);
-    defragmentationTimeId = statsType.nameToId(defragmentationTime);
-    fragmentationId = statsType.nameToId(fragmentation);
-    fragmentsId = statsType.nameToId(fragments);
-    freeMemoryId = statsType.nameToId(freeMemory);
-    largestFragmentId = statsType.nameToId(largestFragment);
-    objectsId = statsType.nameToId(objects);
-    readsId = statsType.nameToId(reads);
-    maxMemoryId = statsType.nameToId(maxMemory);
-  }
-
   public static long parseOffHeapMemorySize(String value) {
     final long parsed = parseLongWithUnits(value, 0L, 1024 * 1024);
     if (parsed < 0) {
@@ -189,7 +94,7 @@ public class OffHeapStorage implements OffHeapMemoryStats {
    * Constructs a MemoryAllocator for off-heap storage.
    * @return MemoryAllocator for off-heap storage
    */
-  public static MemoryAllocator createOffHeapStorage(StatisticsFactory sf, long offHeapMemorySize,
+  public static MemoryAllocator createOffHeapStorage(long offHeapMemorySize,
                                                      DistributedSystem system) {
     if (offHeapMemorySize == 0 || Boolean.getBoolean(InternalLocator.FORCE_LOCATOR_DM_TYPE)) {
       // Checking the FORCE_LOCATOR_DM_TYPE is a quick hack to keep our locator from allocating off
@@ -211,12 +116,12 @@ public class OffHeapStorage implements OffHeapMemoryStats {
     // ooohml provides the hook for disconnecting and closing cache on OutOfOffHeapMemoryException
     OutOfOffHeapMemoryListener ooohml =
         new DisconnectingOutOfOffHeapMemoryListener((InternalDistributedSystem) system);
-    return basicCreateOffHeapStorage(sf, offHeapMemorySize, ooohml);
+    return basicCreateOffHeapStorage(offHeapMemorySize, ooohml);
   }
 
-  static MemoryAllocator basicCreateOffHeapStorage(StatisticsFactory sf, long offHeapMemorySize,
+  static MemoryAllocator basicCreateOffHeapStorage(long offHeapMemorySize,
                                                    OutOfOffHeapMemoryListener ooohml) {
-    final OffHeapMemoryStats stats = new OffHeapStorage(sf);
+    final OffHeapMemoryStats stats = new OffHeapMemoryStats("");
 
     // determine off-heap and slab sizes
     final long maxSlabSize = calcMaxSlabSize(offHeapMemorySize);
@@ -264,175 +169,4 @@ public class OffHeapStorage implements OffHeapMemoryStats {
           "Memory size must be specified as <n>[g|m], where <n> is the size and [g|m] specifies the units in gigabytes or megabytes.");
     }
   }
-
-  private final Statistics stats;
-
-  private OffHeapStorage(StatisticsFactory factory) {
-    initializeStats(factory);
-    this.stats = factory.createAtomicStatistics(statsType, statsName);
-  }
-
-  public void incFreeMemory(long value) {
-    this.stats.incLong(freeMemoryId, value);
-  }
-
-  public void incMaxMemory(long value) {
-    this.stats.incLong(maxMemoryId, value);
-  }
-
-  public void incUsedMemory(long value) {
-    this.stats.incLong(usedMemoryId, value);
-  }
-
-  public void incObjects(int value) {
-    this.stats.incInt(objectsId, value);
-  }
-
-  public long getFreeMemory() {
-    return this.stats.getLong(freeMemoryId);
-  }
-
-  public long getMaxMemory() {
-    return this.stats.getLong(maxMemoryId);
-  }
-
-  public long getUsedMemory() {
-    return this.stats.getLong(usedMemoryId);
-  }
-
-  public int getObjects() {
-    return this.stats.getInt(objectsId);
-  }
-
-  @Override
-  public void incReads() {
-    this.stats.incLong(readsId, 1);
-  }
-
-  @Override
-  public long getReads() {
-    return this.stats.getLong(readsId);
-  }
-
-  private void incDefragmentations() {
-    this.stats.incInt(defragmentationId, 1);
-  }
-
-  @Override
-  public int getDefragmentations() {
-    return this.stats.getInt(defragmentationId);
-  }
-
-  @Override
-  public void setFragments(long value) {
-    this.stats.setLong(fragmentsId, value);
-  }
-
-  @Override
-  public long getFragments() {
-    return this.stats.getLong(fragmentsId);
-  }
-
-  @Override
-  public void setLargestFragment(int value) {
-    this.stats.setInt(largestFragmentId, value);
-  }
-
-  @Override
-  public int getLargestFragment() {
-    return this.stats.getInt(largestFragmentId);
-  }
-
-  @Override
-  public int getDefragmentationsInProgress() {
-    return this.stats.getInt(defragmentationsInProgressId);
-  }
-
-  @Override
-  public long startDefragmentation() {
-    this.stats.incInt(defragmentationsInProgressId, 1);
-    return DistributionStats.getStatTime();
-  }
-
-  @Override
-  public void endDefragmentation(long start) {
-    incDefragmentations();
-    this.stats.incInt(defragmentationsInProgressId, -1);
-    if (DistributionStats.enableClockStats) {
-      stats.incLong(defragmentationTimeId, DistributionStats.getStatTime() - start);
-    }
-  }
-
-  @Override
-  public long getDefragmentationTime() {
-    return stats.getLong(defragmentationTimeId);
-  }
-
-  @Override
-  public void setFragmentation(int value) {
-    this.stats.setInt(fragmentationId, value);
-  }
-
-  @Override
-  public int getFragmentation() {
-    return this.stats.getInt(fragmentationId);
-  }
-
-  public Statistics getStats() {
-    return this.stats;
-  }
-
-  @Override
-  public void close() {
-    this.stats.close();
-  }
-
-  @Override
-  public void initialize(OffHeapMemoryStats oldStats) {
-    setFreeMemory(oldStats.getFreeMemory());
-    setMaxMemory(oldStats.getMaxMemory());
-    setUsedMemory(oldStats.getUsedMemory());
-    setObjects(oldStats.getObjects());
-    setReads(oldStats.getReads());
-    setDefragmentations(oldStats.getDefragmentations());
-    setDefragmentationsInProgress(oldStats.getDefragmentationsInProgress());
-    setFragments(oldStats.getFragments());
-    setLargestFragment(oldStats.getLargestFragment());
-    setDefragmentationTime(oldStats.getDefragmentationTime());
-    setFragmentation(oldStats.getFragmentation());
-
-    oldStats.close();
-  }
-
-  private void setDefragmentationTime(long value) {
-    stats.setLong(defragmentationTimeId, value);
-  }
-
-  private void setDefragmentations(int value) {
-    this.stats.setInt(defragmentationId, value);
-  }
-
-  private void setDefragmentationsInProgress(int value) {
-    this.stats.setInt(defragmentationsInProgressId, value);
-  }
-
-  private void setReads(long value) {
-    this.stats.setLong(readsId, value);
-  }
-
-  private void setObjects(int value) {
-    this.stats.setInt(objectsId, value);
-  }
-
-  private void setUsedMemory(long value) {
-    this.stats.setLong(usedMemoryId, value);
-  }
-
-  private void setMaxMemory(long value) {
-    this.stats.setLong(maxMemoryId, value);
-  }
-
-  private void setFreeMemory(long value) {
-    this.stats.setLong(freeMemoryId, value);
-  }
-}
+}
\ No newline at end of file
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/internal/cache/tier/sockets/MessageStats.java b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/internal/cache/tier/sockets/MessageStats.kt
similarity index 71%
rename from geode-micrometer-stats/src/main/kotlin/org/apache/geode/internal/cache/tier/sockets/MessageStats.java
rename to geode-micrometer-stats/src/main/kotlin/org/apache/geode/internal/cache/tier/sockets/MessageStats.kt
index 5950952..e032039 100644
--- a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/internal/cache/tier/sockets/MessageStats.java
+++ b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/internal/cache/tier/sockets/MessageStats.kt
@@ -13,19 +13,14 @@
  * the License.
  */
 
-package org.apache.geode.internal.cache.tier.sockets;
+package org.apache.geode.internal.cache.tier.sockets
 
-/**
- * Statistics supported by cache/server Message.
- *
- * @since GemFire 5.0.2
- */
-public interface MessageStats {
-  void incReceivedBytes(long v);
+interface MessageStats {
+    fun incReceivedBytes(bytes: Long)
 
-  void incSentBytes(long v);
+    fun incSentBytes(bytes: Long)
 
-  void incMessagesBeingReceived(int bytes);
+    fun incMessagesBytesBeingReceived(bytes: Int)
 
-  void decMessagesBeingReceived(int bytes);
+    fun decMessagesBytesBeingReceived(bytes: Int)
 }
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CacheClientNotifierStats.kt b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CacheClientNotifierStats.kt
new file mode 100755
index 0000000..5f68424
--- /dev/null
+++ b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CacheClientNotifierStats.kt
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.statistics.cache
+
+import org.apache.geode.statistics.internal.micrometer.CounterStatisticMeter
+import org.apache.geode.statistics.internal.micrometer.GaugeStatisticMeter
+import org.apache.geode.statistics.internal.micrometer.MicrometerMeterGroup
+import org.apache.geode.statistics.internal.micrometer.TimerStatisticMeter
+import org.apache.geode.statistics.util.NOW_NANOS
+
+class CacheClientNotifierStats(clientName: String) : MicrometerMeterGroup("CacheClientNotifierStats-$clientName") {
+
+    private val clientNotifierEventsMeter = CounterStatisticMeter("client.notifier.events.count", "Number of events processed by the cache client notifier.", arrayOf("clientName", clientName))
+    private val clientNotifierEventProcessedTimer = TimerStatisticMeter("client.notifier.events.processed.time", "Total time spent by the cache client notifier processing events.", arrayOf("clientName", clientName), unit = "nanoseconds")
+    private val clientNotifierClientRegistrationMeter = GaugeStatisticMeter("client.notifier.client.registration.count", "Number of clients that have registered for updates.", arrayOf("clientName", clientName))
+    private val clientNotifierClientRegistrationTimer = TimerStatisticMeter("client.notifier.client.registration.time", "Total time spent doing client registrations.", arrayOf("clientName", clientName), unit = "nanoseconds")
+    private val clientNotifierHealthMonitorRegistrationMeter = GaugeStatisticMeter("client.notifier.healthmonitor.registration.count", "Number of client Register.", arrayOf("clientName", clientName))
+    private val clientNotifierHealthMonitorUnregistrationMeter = GaugeStatisticMeter("client.notifier.healthmonitor.unregistration.count", "Number of client UnRegister.", arrayOf("clientName", clientName))
+    private val clientNotifierDurableReconnectionMeter = CounterStatisticMeter("client.notifier.durable.reconnection.count", "Number of times the same durable client connects to the server", arrayOf("clientName", clientName))
+    private val clientNotifierDurableQueueDropMeter = CounterStatisticMeter("client.notifier.durable.queue.drop.count", "Number of times client queue for a particular durable client is dropped", arrayOf("clientName", clientName))
+    private val clientNotifierDurableQueueSizeMeter = CounterStatisticMeter("client.notifier.durable.queue.count", "Number of events enqueued in queue for a durable client ", arrayOf("clientName", clientName))
+    private val clientNotifierCQProcessingTimer = TimerStatisticMeter("client.notifier.cq.processing.time", "Total time spent by the cache client notifier processing cqs.", arrayOf("clientName", clientName), unit = "nanoseconds")
+    private val clientNotifierCompiledQueryMeter = GaugeStatisticMeter("client.notifier.compiledquery.count", "Number of compiled queries maintained.", arrayOf("clientName", clientName))
+    private val clientNotifierCompiledQueryUsedMeter = CounterStatisticMeter("client.notifier.compiledquery.used.count", "Number of times compiled queries are used.", arrayOf("clientName", clientName))
+
+    override fun initializeStaticMeters() {
+        registerMeter(clientNotifierEventsMeter)
+        registerMeter(clientNotifierEventProcessedTimer)
+        registerMeter(clientNotifierClientRegistrationMeter)
+        registerMeter(clientNotifierClientRegistrationTimer)
+        registerMeter(clientNotifierHealthMonitorRegistrationMeter)
+        registerMeter(clientNotifierHealthMonitorUnregistrationMeter)
+        registerMeter(clientNotifierDurableReconnectionMeter)
+        registerMeter(clientNotifierDurableQueueDropMeter)
+        registerMeter(clientNotifierDurableQueueSizeMeter)
+        registerMeter(clientNotifierCQProcessingTimer)
+        registerMeter(clientNotifierCompiledQueryMeter)
+        registerMeter(clientNotifierCompiledQueryUsedMeter)
+    }
+
+    fun startTime(): Long = NOW_NANOS
+
+    fun endEvent(start: Long) {
+        clientNotifierEventsMeter.increment()
+        clientNotifierEventProcessedTimer.recordValue(NOW_NANOS - start)
+    }
+
+    fun endClientRegistration(start: Long) {
+        clientNotifierClientRegistrationMeter.increment()
+        clientNotifierClientRegistrationTimer.recordValue(NOW_NANOS - start)
+    }
+
+    fun endCqProcessing(start: Long) {
+        clientNotifierCQProcessingTimer.recordValue(NOW_NANOS - start)
+    }
+
+    fun incClientRegisterRequests() {
+        clientNotifierHealthMonitorRegistrationMeter.increment()
+    }
+
+    fun incDurableReconnectionCount() {
+        clientNotifierDurableReconnectionMeter.increment()
+    }
+
+    fun incQueueDroppedCount() {
+        clientNotifierDurableQueueDropMeter.increment()
+    }
+
+    fun incEventEnqueuedWhileClientAwayCount() {
+        clientNotifierDurableQueueSizeMeter.increment()
+    }
+
+    fun incClientUnRegisterRequests() {
+        clientNotifierHealthMonitorUnregistrationMeter.increment()
+    }
+
+    fun incCompiledQueryCount(count: Long) {
+        clientNotifierCompiledQueryMeter.increment(count)
+    }
+
+    fun incCompiledQueryUsedCount(count: Long) {
+        clientNotifierCompiledQueryUsedMeter.increment(count)
+    }
+}
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CacheClientProxyStats.kt b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CacheClientProxyStats.kt
new file mode 100644
index 0000000..f1d9dc8
--- /dev/null
+++ b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CacheClientProxyStats.kt
@@ -0,0 +1,101 @@
+package org.apache.geode.statistics.cache
+
+import org.apache.geode.internal.cache.tier.sockets.MessageStats
+import org.apache.geode.statistics.internal.micrometer.CounterStatisticMeter
+import org.apache.geode.statistics.internal.micrometer.GaugeStatisticMeter
+import org.apache.geode.statistics.internal.micrometer.MicrometerMeterGroup
+import org.apache.geode.statistics.internal.micrometer.TimerStatisticMeter
+import org.apache.geode.statistics.util.NOW_NANOS
+
+class CacheClientProxyStats(clientName: String) : MicrometerMeterGroup("CacheClientProxyStats-$clientName"), MessageStats {
+    private val clientProxyMessagesReceivedMeter = CounterStatisticMeter("client.proxy.messages.received.count", "Number of client messages received.", arrayOf("clientName", clientName))
+    private val clientProxyMessageQueuedMeter = CounterStatisticMeter("client.proxy.messages.queued.count", "Number of client messages added to the message queue.", arrayOf("clientName", clientName))
+    private val clientProxyMessageQueingFailedMeter = CounterStatisticMeter("client.proxy.messages.queued.failed.count", "Number of client messages attempted but failed to be added to the message queue.", arrayOf("clientName", clientName))
+    private val clientProxyMessageNotQueuedOriginatorMeter = CounterStatisticMeter("client.proxy.messages.notqueued.originator.count", "Number of client messages received but not added to the message queue because the receiving proxy represents the client originating the message.", arrayOf("clientName", clientName))
+    private val clientProxyMessageNotQueuedNoInterestMeter = CounterStatisticMeter("client.proxy.messages.notqueued.nointerest", "Number of client messages received but not added to the message queue because the client represented by the receiving proxy was not interested in the message's key.", arrayOf("clientName", clientName))
+    private val clientProxyMessageQueueSizeMeter = GaugeStatisticMeter("client.proxy.messages.queued.count", "Size of the message queue.", arrayOf("clientName", clientName))
+    private val clientProxyMessageProcessedMeter = CounterStatisticMeter("client.proxy.messages.processed.count", "Number of client messages removed from the message queue and sent.", arrayOf("clientName", clientName))
+    private val clientProxyMessageProcessingTimer = TimerStatisticMeter("client.proxy.messages.processing.time", "Total time spent sending messages to clients.", arrayOf("clientName", clientName), unit = "nanoseconds")
+    private val clientProxyDeltaMessagesSentMeter = CounterStatisticMeter("client.proxy.delta.messages.sent.count", "Number of client messages containing only delta bytes dispatched to the client.", arrayOf("messageSize", "delta", "clientName", clientName))
+    private val clientProxyDeltaFullMessagesSentMeter = CounterStatisticMeter("client.proxy.delta.messages.sent.count", "Number of client messages dispatched in reponse to failed delta at client.", arrayOf("messageSize", "full", "clientName", clientName))
+    private val clientProxyCQCountMeter = GaugeStatisticMeter("client.proxy.cq.count", "Number of CQs on the client.", arrayOf("clientName", clientName))
+    private val clientProxyBytesSent = CounterStatisticMeter("client.proxy.sent.bytes", "Total number of bytes sent to client.", arrayOf("clientName", clientName), unit = "bytes")
+
+    override fun initializeStaticMeters() {
+        registerMeter(clientProxyMessagesReceivedMeter)
+        registerMeter(clientProxyMessageQueuedMeter)
+        registerMeter(clientProxyMessageQueingFailedMeter)
+        registerMeter(clientProxyMessageNotQueuedOriginatorMeter)
+        registerMeter(clientProxyMessageNotQueuedNoInterestMeter)
+        registerMeter(clientProxyMessageQueueSizeMeter)
+        registerMeter(clientProxyMessageProcessedMeter)
+        registerMeter(clientProxyMessageProcessingTimer)
+        registerMeter(clientProxyDeltaMessagesSentMeter)
+        registerMeter(clientProxyDeltaFullMessagesSentMeter)
+        registerMeter(clientProxyCQCountMeter)
+        registerMeter(clientProxyBytesSent)
+    }
+
+    fun incMessagesReceived() {
+        clientProxyMessagesReceivedMeter.increment()
+    }
+
+    fun incMessagesQueued() {
+        clientProxyMessageQueuedMeter.increment()
+    }
+
+    fun incMessagesNotQueuedOriginator() {
+        clientProxyMessageNotQueuedOriginatorMeter.increment()
+    }
+
+    fun incMessagesNotQueuedNotInterested() {
+        clientProxyMessageNotQueuedNoInterestMeter.increment()
+    }
+
+    fun incMessagesFailedQueued() {
+        clientProxyMessageQueingFailedMeter.increment()
+    }
+
+    fun incCqCount() {
+        clientProxyCQCountMeter.increment()
+    }
+
+    fun decCqCount() {
+        clientProxyCQCountMeter.decrement()
+    }
+
+    fun setQueueSize(size: Int) {
+        clientProxyMessageQueueSizeMeter.setValue(size)
+    }
+
+    fun startTime(): Long = NOW_NANOS
+
+    fun endMessage(start: Long) {
+        clientProxyMessageProcessedMeter.increment()
+        clientProxyMessageProcessingTimer.recordValue(NOW_NANOS - start)
+    }
+
+    fun incDeltaMessagesSent() {
+        clientProxyDeltaMessagesSentMeter.increment()
+    }
+
+    fun incDeltaFullMessagesSent() {
+        clientProxyDeltaFullMessagesSentMeter.increment()
+    }
+
+    override fun incSentBytes(bytes: Long) {
+        clientProxyBytesSent.increment(bytes)
+    }
+
+    override fun incReceivedBytes(bytes: Long) {
+        TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+    }
+
+    override fun incMessagesBytesBeingReceived(bytes: Int) {
+        TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+    }
+
+    override fun decMessagesBytesBeingReceived(bytes: Int) {
+        TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+    }
+}
\ No newline at end of file
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CacheServerStats.kt b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CacheServerStats.kt
new file mode 100644
index 0000000..2ca6ef7
--- /dev/null
+++ b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CacheServerStats.kt
@@ -0,0 +1,623 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.statistics.cache
+
+import org.apache.geode.distributed.internal.PoolStatHelper
+import org.apache.geode.internal.cache.tier.sockets.MessageStats
+import org.apache.geode.statistics.internal.micrometer.CounterStatisticMeter
+import org.apache.geode.statistics.internal.micrometer.GaugeStatisticMeter
+import org.apache.geode.statistics.internal.micrometer.MicrometerMeterGroup
+import org.apache.geode.statistics.internal.micrometer.TimerStatisticMeter
+
+class CacheServerStats(ownerName: String, typeName: String = "CacheServerStats") : MicrometerMeterGroup("$typeName-$ownerName"), MessageStats {
+
+    private val cacheServerClientGetRequestMeter = CounterStatisticMeter("cacheserver.client.requests.count", "Number of cache client get requests.", arrayOf("owner", ownerName, "operation", "get"))
+    private val cacheServerClientGetRequestTimer = TimerStatisticMeter("cacheserver.client.requests.time", "Total time spent in reading get requests.", arrayOf("owner", ownerName, "operation", "get"), unit = "nanoseconds")
+    private val cacheServerGetProcessTimer = TimerStatisticMeter("cacheserver.operation.process.time", "Total time spent in processing a cache client get request, including the time to get an object from the cache.", arrayOf("owner", ownerName, "operation", "get"), unit = "nanoseconds")
+    private val cacheServerGetResponseWrittenMeter = CounterStatisticMeter("cacheserver.client.responses.written.count", "Number of get responses written to the cache client.", arrayOf("owner", ownerName, "operation", "get"))
+    private val cacheServerGetResponseWrittenTimer = TimerStatisticMeter("cacheserver.client.responses.written.time", "Total time spent in writing get responses.", arrayOf("owner", ownerName, "operation", "get"), unit = "nanoseconds")
+
+    private val cacheServerClientPutRequestMeter = CounterStatisticMeter("cacheserver.client.requests.count", "Number of cache client put requests.", arrayOf("owner", ownerName, "operation", "put"))
+    private val cacheServerClientPutRequestTimer = TimerStatisticMeter("cacheserver.client.requests.time", "Total time spent in reading put requests.", arrayOf("owner", ownerName, "operation", "put"), unit = "nanoseconds")
+    private val cacheServerPutProcessTimer = TimerStatisticMeter("cacheserver.operation.process.time", "Total time spent in processing a cache client put request, including the time to put an object into the cache.", arrayOf("owner", ownerName, "operation", "put"), unit = "nanoseconds")
+    private val cacheServerPutResponseWrittenMeter = CounterStatisticMeter("cacheserver.client.responses.written.count", "Number of put responses written to the cache client.", arrayOf("owner", ownerName, "operation", "put"))
+    private val cacheServerPutResponseWrittenTimer = TimerStatisticMeter("cacheserver.client.responses.written.time", "Total time spent in writing put responses.", arrayOf("owner", ownerName, "operation", "put"), unit = "nanoseconds")
+
+    private val cacheServerClientPutAllRequestMeter = CounterStatisticMeter("cacheserver.client.requests.count", "Number of cache client putAll requests.", arrayOf("owner", ownerName, "operation", "putAll"))
+    private val cacheServerClientPutAllRequestTimer = TimerStatisticMeter("cacheserver.client.requests.time", "Total time spent in reading putAll requests.", arrayOf("owner", ownerName, "operation", "putAll"), unit = "nanoseconds")
+    private val cacheServerPutAllProcessTimer = TimerStatisticMeter("cacheserver.operation.process.time", "Total time spent in processing a cache client putAll request, including the time to put all objects into the cache.", arrayOf("owner", ownerName, "operation", "putAll"), unit = "nanoseconds")
+    private val cacheServerPutAllResponseWrittenMeter = CounterStatisticMeter("cacheserver.client.responses.written.count", "Number of putAll responses written to the cache client.", arrayOf("owner", ownerName, "operation", "putAll"))
+    private val cacheServerPutAllResponseWrittenTimer = TimerStatisticMeter("cacheserver.client.responses.written.time", "Total time spent in writing putAll responses.", arrayOf("owner", ownerName, "operation", "putAll"), unit = "nanoseconds")
+
+    private val cacheServerClientRemoveAllRequestMeter = CounterStatisticMeter("cacheserver.client.requests.count", "Number of cache client removeAll requests.", arrayOf("owner", ownerName, "operation", "removeAll"))
+    private val cacheServerClientRemoveAllRequestTimer = TimerStatisticMeter("cacheserver.client.requests.time", "Total time spent in reading removeAll requests.", arrayOf("owner", ownerName, "operation", "removeAll"), unit = "nanoseconds")
+    private val cacheServerRemoveAllProcessTimer = TimerStatisticMeter("cacheserver.operation.process.time", "Total time spent in processing a cache client removeAll request, including the time to remove all objects from the cache.", arrayOf("owner", ownerName, "operation", "removeAll"), unit = "nanoseconds")
+    private val cacheServerRemoveAllResponseWrittenMeter = CounterStatisticMeter("cacheserver.client.responses.written.count", "Number of removeAll responses written to the cache client.", arrayOf("owner", ownerName, "operation", "removeAll"))
+    private val cacheServerRemoveAllResponseWrittenTimer = TimerStatisticMeter("cacheserver.client.responses.written.time", "Total time spent in writing removeAll responses.", arrayOf("owner", ownerName, "operation", "removeAll"), unit = "nanoseconds")
+
+    private val cacheServerClientGetAllRequestMeter = CounterStatisticMeter("cacheserver.client.requests.count", "Number of cache client getAll requests.", arrayOf("owner", ownerName, "operation", "getAll"))
+    private val cacheServerClientGetAllRequestTimer = TimerStatisticMeter("cacheserver.client.requests.time", "Total time spent in reading getAll requests.", arrayOf("owner", ownerName, "operation", "getAll"), unit = "nanoseconds")
+    private val cacheServerGetAllProcessTimer = TimerStatisticMeter("cacheserver.operation.process.time", "Total time spent in processing a cache client getAll request.", arrayOf("owner", ownerName, "operation", "getAll"), unit = "nanoseconds")
+    private val cacheServerGetAllResponseWrittenMeter = CounterStatisticMeter("cacheserver.client.responses.written.count", "Number of getAll responses written to the cache client.", arrayOf("owner", ownerName, "operation", "getAll"))
+    private val cacheServerGetAllResponseWrittenTimer = TimerStatisticMeter("cacheserver.client.responses.written.time", "Total time spent in writing getAll responses.", arrayOf("owner", ownerName, "operation", "getAll"), unit = "nanoseconds")
+
+    private val cacheServerClientDestroyRequestMeter = CounterStatisticMeter("cacheserver.client.requests.count", "Number of cache client destroy requests.", arrayOf("owner", ownerName, "operation", "destroy"))
+    private val cacheServerClientDestroyRequestTimer = TimerStatisticMeter("cacheserver.client.requests.time", "Total time spent in reading destroy requests.", arrayOf("owner", ownerName, "operation", "destroy"), unit = "nanoseconds")
+    private val cacheServerDestroyProcessTimer = TimerStatisticMeter("cacheserver.operation.process.time", "Total time spent in processing a cache client destroy request, including the time to destroy an object from the cache.", arrayOf("owner", ownerName, "operation", "destroy"), unit = "nanoseconds")
+    private val cacheServerDestroyResponseWrittenMeter = CounterStatisticMeter("cacheserver.client.responses.written.count", "Number of destroy responses written to the cache client.", arrayOf("owner", ownerName, "operation", "destroy"))
+    private val cacheServerDestroyResponseWrittenTimer = TimerStatisticMeter("cacheserver.client.responses.written.time", "Total time spent in writing destroy responses.", arrayOf("owner", ownerName, "operation", "destroy"), unit = "nanoseconds")
+
+    private val cacheServerClientInvaldiateRequestMeter = CounterStatisticMeter("cacheserver.client.requests.count", "Number of cache client invalidate requests.", arrayOf("owner", ownerName, "operation", "invalidate"))
+    private val cacheServerClientInvaldiateRequestTimer = TimerStatisticMeter("cacheserver.client.requests.time", "Total time spent in reading invalidate requests.", arrayOf("owner", ownerName, "operation", "invalidate"), unit = "nanoseconds")
+    private val cacheServerInvaldiateProcessTimer = TimerStatisticMeter("cacheserver.operation.process.time", "Total time spent in processing a cache client invalidate request, including the time to invalidate an object from the cache.", arrayOf("owner", ownerName, "operation", "invalidate"), unit = "nanoseconds")
+    private val cacheServerInvaldiateResponseWrittenMeter = CounterStatisticMeter("cacheserver.client.responses.written.count", "Number of invalidate responses written to the cache client.", arrayOf("owner", ownerName, "operation", "invalidate"))
+    private val cacheServerInvaldiateResponseWrittenTimer = TimerStatisticMeter("cacheserver.client.responses.written.time", "Total time spent in writing invalidate responses.", arrayOf("owner", ownerName, "operation", "invalidate"), unit = "nanoseconds")
+
+    private val cacheServerClientSizeRequestMeter = CounterStatisticMeter("cacheserver.client.requests.count", "Number of cache client size requests.", arrayOf("owner", ownerName, "operation", "size"))
+    private val cacheServerClientSizeRequestTimer = TimerStatisticMeter("cacheserver.client.requests.time", "Total time spent in reading size requests.", arrayOf("owner", ownerName, "operation", "size"), unit = "nanoseconds")
+    private val cacheServerSizeProcessTimer = TimerStatisticMeter("cacheserver.operation.process.time", "Total time spent in processing a cache client size request, including the time to size an object from the cache.", arrayOf("owner", ownerName, "operation", "size"), unit = "nanoseconds")
+    private val cacheServerSizeResponseWrittenMeter = CounterStatisticMeter("cacheserver.client.responses.written.count", "Number of size responses written to the cache client.", arrayOf("owner", ownerName, "operation", "size"))
+    private val cacheServerSizeResponseWrittenTimer = TimerStatisticMeter("cacheserver.client.responses.written.time", "Total time spent in writing size responses.", arrayOf("owner", ownerName, "operation", "size"), unit = "nanoseconds")
+
+    private val cacheServerClientQueryRequestMeter = CounterStatisticMeter("cacheserver.client.requests.count", "Number of cache client query requests.", arrayOf("owner", ownerName, "operation", "query"))
+    private val cacheServerClientQueryRequestTimer = TimerStatisticMeter("cacheserver.client.requests.time", "Total time spent in reading query requests.", arrayOf("owner", ownerName, "operation", "query"), unit = "nanoseconds")
+    private val cacheServerQueryProcessTimer = TimerStatisticMeter("cacheserver.operation.process.time", "Total time spent in processing a cache client query request, including the time to destroy an object from the cache.", arrayOf("owner", ownerName, "operation", "query"), unit = "nanoseconds")
+    private val cacheServerQueryResponseWrittenMeter = CounterStatisticMeter("cacheserver.client.responses.written.count", "Number of query responses written to the cache client.", arrayOf("owner", ownerName, "operation", "query"))
+    private val cacheServerQueryResponseWrittenTimer = TimerStatisticMeter("cacheserver.client.responses.written.time", "Total time spent in writing query responses.", arrayOf("owner", ownerName, "operation", "query"), unit = "nanoseconds")
+
+    private val cacheServerClientDestroyRegionRequestMeter = CounterStatisticMeter("cacheserver.client.requests.count", "Number of cache client destroyRegion requests.", arrayOf("owner", ownerName, "operation", "destroyRegion"))
+    private val cacheServerClientDestroyRegionRequestTimer = TimerStatisticMeter("cacheserver.client.requests.time", "Total time spent in reading destroyRegion requests.", arrayOf("owner", ownerName, "operation", "destroyRegion"), unit = "nanoseconds")
+    private val cacheServerClientDestroyRegionProcessTimer = TimerStatisticMeter("cacheserver.operation.process.time", "Total time spent in processing a cache client destroyRegion request, including the time to destroy the region from the cache.", arrayOf("owner", ownerName, "operation", "destroyRegion"), unit = "nanoseconds")
+    private val cacheServerClientDestroyRegionResponseWrittenMeter = CounterStatisticMeter("cacheserver.client.responses.written.count", "Number of destroyRegion responses written to the cache client.", arrayOf("owner", ownerName, "operation", "destroyRegion"))
+    private val cacheServerClientDestroyRegionResponseWrittenTimer = TimerStatisticMeter("cacheserver.client.responses.written.time", "Total time spent in writing destroyRegion responses.", arrayOf("owner", ownerName, "operation", "destroyRegion"), unit = "nanoseconds")
+
+    private val cacheServerClientContainsKeyRequestMeter = CounterStatisticMeter("cacheserver.client.requests.count", "Number of cache client containsKey requests.", arrayOf("owner", ownerName, "operation", "containsKey"))
+    private val cacheServerClientContainsKeyRequestTimer = TimerStatisticMeter("cacheserver.client.requests.time", "Total time spent reading containsKey requests.", arrayOf("owner", ownerName, "operation", "containsKey"), unit = "nanoseconds")
+    private val cacheServerClientContainsKeyProcessTimer = TimerStatisticMeter("cacheserver.operation.process.time", "Total time spent processing a containsKey request.", arrayOf("owner", ownerName, "operation", "containsKey"), unit = "nanoseconds")
+    private val cacheServerClientContainsKeyResponseWrittenMeter = CounterStatisticMeter("cacheserver.client.responses.written.count", "Number of containsKey responses written to the cache client.", arrayOf("owner", ownerName, "operation", "containsKey"))
+    private val cacheServerClientContainsKeyResponseWrittenTimer = TimerStatisticMeter("cacheserver.client.responses.written.time", "Total time spent writing containsKey responses.", arrayOf("owner", ownerName, "operation", "containsKey"), unit = "nanoseconds")
+
+    private val cacheServerProcessBatchRequestMeter = CounterStatisticMeter("cacheserver.client.requests.count", "Number of cache client processBatch requests.", arrayOf("owner", ownerName, "operation", "processBatch"))
+    private val cacheServerProcessBatchRequestTimer = TimerStatisticMeter("cacheserver.client.requests.time", "Total time spent in reading processBatch requests.", arrayOf("owner", ownerName, "operation", "processBatch"), unit = "nanoseconds")
+    private val cacheServerProcessBatchProcessTimer = TimerStatisticMeter("cacheserver.operation.process.time", "Total time spent in processing a cache client processBatch request.", arrayOf("owner", ownerName, "operation", "processBatch"), unit = "nanoseconds")
+    private val cacheServerProcessBatchResponseWrittenMeter = CounterStatisticMeter("cacheserver.client.responses.written.count", "Number of processBatch responses written to the cache client.", arrayOf("owner", ownerName, "operation", "processBatch"))
+    private val cacheServerProcessBatchResponseWrittenTimer = TimerStatisticMeter("cacheserver.client.responses.written.time", "Total time spent in writing processBatch responses.", arrayOf("owner", ownerName, "operation", "processBatch"), unit = "nanoseconds")
+    private val cacheServerProcessBatchSizeMeter = CounterStatisticMeter("cacheserver.client.batch.size", "The size of the batches received.", arrayOf("owner", ownerName, "operation", "processBatch"), unit = "bytes")
+
+    private val cacheServerClientClearRegionRequestMeter = CounterStatisticMeter("cacheserver.client.requests.count", "Number of cache client clearRegion requests.", arrayOf("owner", ownerName, "operation", "clearRegion"))
+    private val cacheServerClientClearRegionRequestTimer = TimerStatisticMeter("cacheserver.client.requests.time", "Total time spent in reading clearRegion requests.", arrayOf("owner", ownerName, "operation", "clearRegion"), unit = "nanoseconds")
+    private val cacheServerClearRegionProcessTimer = TimerStatisticMeter("cacheserver.operation.process.time", "Total time spent in processing a cache client clearRegion request, including the time to destroy the region from the cache.", arrayOf("owner", ownerName, "operation", "clearRegion"), unit = "nanoseconds")
+    private val cacheServerClearRegionResponseWrittenMeter = CounterStatisticMeter("cacheserver.client.responses.written.count", "Number of clearRegion responses written to the cache client.", arrayOf("owner", ownerName, "operation", "clearRegion"))
+    private val cacheServerClearRegionResponseWrittenTimer = TimerStatisticMeter("cacheserver.client.responses.written.time", "Total time spent in writing clearRegion responses.", arrayOf("owner", ownerName, "operation", "clearRegion"), unit = "nanoseconds")
+
+
+    private val cacheServerClientNotificationRequestMeter = CounterStatisticMeter("cacheserver.client.requests.count", "Number of cache client notification requests.", arrayOf("owner", ownerName, "operation", "clientNotification"))
+    private val cacheServerClientNotificationRequestTimer = TimerStatisticMeter("cacheserver.client.requests.time", "Total time spent in reading client notification requests.", arrayOf("owner", ownerName, "operation", "clientNotification"), unit = "nanoseconds")
+    private val cacheServerClientNotificationProcessTimer = TimerStatisticMeter("cacheserver.operation.process.time", "Total time spent in processing a cache client notification request.", arrayOf("owner", ownerName, "operation", "clientNotification"), unit = "nanoseconds")
+
+    private val cacheServerUpdateClientNotificationRequestMeter = CounterStatisticMeter("cacheserver.client.requests.count", "Number of cache client notification update requests.", arrayOf("owner", ownerName, "operation", "updateClientNotification"))
+    private val cacheServerUpdateClientNotificationRequestTimer = TimerStatisticMeter("cacheserver.client.requests.time", "Total time spent in reading client notification update requests.", arrayOf("owner", ownerName, "operation", "updateClientNotification"), unit = "nanoseconds")
+    private val cacheServerUpdateClientNotificationProcessTimer = TimerStatisticMeter("cacheserver.operation.process.time", "Total time spent in processing a client notification update request.", arrayOf("owner", ownerName, "operation", "updateClientNotification"), unit = "nanoseconds")
+
+    private val cacheServerClientReadyRequestMeter = CounterStatisticMeter("cacheserver.client.requests.count", "Number of cache client ready requests.", arrayOf("owner", ownerName, "operation", "clientReady"))
+    private val cacheServerClientReadyRequestTimer = TimerStatisticMeter("cacheserver.client.requests.time", "Total time spent in reading cache client ready requests.", arrayOf("owner", ownerName, "operation", "clientReady"), unit = "nanoseconds")
+    private val cacheServerClientReadyProcessTimer = TimerStatisticMeter("cacheserver.operation.process.time", "Total time spent in processing a cache client ready request, including the time to destroy an object from the cache.", arrayOf("owner", ownerName, "operation", "clientReady"), unit = "nanoseconds")
+    private val cacheServerClientReadyResponseWrittenMeter = CounterStatisticMeter("cacheserver.client.responses.written.count", "Number of client ready responses written to the cache client.", arrayOf("owner", ownerName, "operation", "clientReady"))
+    private val cacheServerClientReadyResponseWrittenTimer = TimerStatisticMeter("cacheserver.client.responses.written.time", "Total time spent in writing client ready responses.", arrayOf("owner", ownerName, "operation", "clientReady"), unit = "nanoseconds")
+
+    private val cacheServerClientCloseConnectionRequestMeter = CounterStatisticMeter("cacheserver.client.requests.count", "Number of cache client close connection requests.", arrayOf("owner", ownerName, "operation", "closeConnection"))
+    private val cacheServerClientCloseConnectionRequestTimer = TimerStatisticMeter("cacheserver.client.requests.time", "Total time spent in reading close connection requests.", arrayOf("owner", ownerName, "operation", "closeConnection"), unit = "nanoseconds")
+    private val cacheServerClientCloseConnectionProcessTimer = TimerStatisticMeter("cacheserver.operation.process.time", "Total time spent in processing a cache client close connection request.", arrayOf("owner", ownerName, "operation", "closeConnection"), unit = "nanoseconds")
+
+
+    private val cacheServerFailedConnectionAttemptsMeter = CounterStatisticMeter("cacheserver.client.connection.failed.count", "Number of failed connection attempts.", arrayOf("owner", ownerName))
+    private val cacheServerCurrentConnectionMeter = GaugeStatisticMeter("cacheserver.client.connection.count", "Number of sockets accepted and used for client to server messaging.", arrayOf("owner", ownerName, "connectionType", "messaging"))
+    private val cacheServerCurrentQueueConnectionMeter = GaugeStatisticMeter("cacheserver.client.connection.count", "Number of sockets accepted and used for server to client queue messaging.", arrayOf("owner", ownerName, "connectionType", "queue"))
+    private val cacheServerClientCountMeter = GaugeStatisticMeter("cacheserver.client.count", "Number of client virtual machines connected.", arrayOf("owner", ownerName))
+    private val cacheServerOutOfOrderGatewayBatchIdMeter = CounterStatisticMeter("cacheserver.gateway.batch.outoforder.count", "Number of Out of order batch IDs.", arrayOf("owner", ownerName))
+
+    private val cacheServerClientWriteRequestAbandondMeter = CounterStatisticMeter("cacheserver.client.requests.abandond.count", "Number of write opertations abandond by clients", arrayOf("owner", ownerName, "requestType", "write"))
+    private val cacheServerClientReadRequestAbandondMeter = CounterStatisticMeter("cacheserver.client.requests.abandond.count", "Number of read opertations abandond by clients", arrayOf("owner", ownerName, "requestType", "read"))
+    private val cacheServerReceivedBytesMeter = CounterStatisticMeter("cacheserver.client.bytes", "Total number of bytes received from clients.", arrayOf("owner", ownerName, "direction", "received"), unit = "bytes")
+    private val cacheServerSentBytesMeter = CounterStatisticMeter("cacheserver.client.bytes", "Total number of bytes sent to clients.", arrayOf("owner", ownerName, "direction", "sent"), unit = "bytes")
+    private val cacheServerMessagesReceivedMeter = GaugeStatisticMeter("cacheserver.client.messages.received.count", "Current number of message being received off the network or being processed after reception.", arrayOf("owner", ownerName))
+    private val cacheServerMessagesReceivedBytesMeter = GaugeStatisticMeter("cacheserver.client.messages.received.bytes", "Current number of bytes consumed by messages being received or processed.", arrayOf("owner", ownerName), unit = "bytes")
+    private val cacheServerConnectionTimeoutMeter = CounterStatisticMeter("cacheserver.client.connection.timeout.count", "Total number of connections that have been timed out by the server because of client inactivity", arrayOf("owner", ownerName))
+    private val cacheServerThreadQueueSizeMeter = GaugeStatisticMeter("cacheserver.client.thread.queue.size", "Current number of connections waiting for a thread to start processing their message.", arrayOf("owner", ownerName))
+    private val cacheServerConnectionAcceptInProgressMeter = GaugeStatisticMeter("cacheserver.client.connection.accept.inprogress.count", "Current number of server accepts that are attempting to do the initial handshake with the client.", arrayOf("owner", ownerName))
+    private val cacheServerConnectionAcceptStartMeter = CounterStatisticMeter("cacheserver.client.connection.accept.start.count", "Total number of threads created to deal with an accepted socket. Note that this is not the current number of threads.", arrayOf("owner", ownerName))
+    private val cacheServerConnectionStartMeter = CounterStatisticMeter("cacheserver.client.connection.start.count", "Total number of threads created to deal with a client connection. Note that this is not the current number of threads.", arrayOf("owner", ownerName))
+    private val cacheServerConnectionThreadsMeter = GaugeStatisticMeter("cacheserver.client.connection.thread.count", "Current number of threads dealing with a client connection.", arrayOf("owner", ownerName))
+    private val cacheServerConnectionLoadMeter = GaugeStatisticMeter("cacheserver.client.connection.load.count", "The load from client to server connections as reported by the load probe installed in this server", arrayOf("owner", ownerName))
+    private val cacheServerLoadPerConnectionMeter = GaugeStatisticMeter("cacheserver.client.load.connection.count", "The estimate of how much load is added for each new connection as reported by the load probe installed in this server", arrayOf("owner", ownerName))
+    private val cacheServerQueueLoadMeter = GaugeStatisticMeter("cacheserver.client.queue.load.count", "The load from queues as reported by the load probe installed in this server", arrayOf("owner", ownerName))
+    private val cacheServerLoadPerQueueMeter = GaugeStatisticMeter("cacheserver.client.load.queue.count", "The estimate of how much load is added for each new connection as reported by the load probe installed in this server", arrayOf("owner", ownerName))
+
+
+    override fun initializeStaticMeters() {
+        registerMeter(cacheServerClientGetRequestMeter)
+        registerMeter(cacheServerClientGetRequestTimer)
+        registerMeter(cacheServerGetProcessTimer)
+        registerMeter(cacheServerGetResponseWrittenMeter)
+        registerMeter(cacheServerGetResponseWrittenTimer)
+
+        registerMeter(cacheServerClientPutRequestMeter)
+        registerMeter(cacheServerClientPutRequestTimer)
+        registerMeter(cacheServerPutProcessTimer)
+        registerMeter(cacheServerPutResponseWrittenMeter)
+        registerMeter(cacheServerPutResponseWrittenTimer)
+
+        registerMeter(cacheServerClientPutAllRequestMeter)
+        registerMeter(cacheServerClientPutAllRequestTimer)
+        registerMeter(cacheServerPutAllProcessTimer)
+        registerMeter(cacheServerPutAllResponseWrittenMeter)
+        registerMeter(cacheServerPutAllResponseWrittenTimer)
+
+        registerMeter(cacheServerClientRemoveAllRequestMeter)
+        registerMeter(cacheServerClientRemoveAllRequestTimer)
+        registerMeter(cacheServerRemoveAllProcessTimer)
+        registerMeter(cacheServerRemoveAllResponseWrittenMeter)
+        registerMeter(cacheServerRemoveAllResponseWrittenTimer)
+
+        registerMeter(cacheServerClientGetAllRequestMeter)
+        registerMeter(cacheServerClientGetAllRequestTimer)
+        registerMeter(cacheServerGetAllProcessTimer)
+        registerMeter(cacheServerGetAllResponseWrittenMeter)
+        registerMeter(cacheServerGetAllResponseWrittenTimer)
+
+        registerMeter(cacheServerClientDestroyRequestMeter)
+        registerMeter(cacheServerClientDestroyRequestTimer)
+        registerMeter(cacheServerDestroyProcessTimer)
+        registerMeter(cacheServerDestroyResponseWrittenMeter)
+        registerMeter(cacheServerDestroyResponseWrittenTimer)
+
+        registerMeter(cacheServerClientInvaldiateRequestMeter)
+        registerMeter(cacheServerClientInvaldiateRequestTimer)
+        registerMeter(cacheServerInvaldiateProcessTimer)
+        registerMeter(cacheServerInvaldiateResponseWrittenMeter)
+        registerMeter(cacheServerInvaldiateResponseWrittenTimer)
+
+        registerMeter(cacheServerClientSizeRequestMeter)
+        registerMeter(cacheServerClientSizeRequestTimer)
+        registerMeter(cacheServerSizeProcessTimer)
+        registerMeter(cacheServerSizeResponseWrittenMeter)
+        registerMeter(cacheServerSizeResponseWrittenTimer)
+
+        registerMeter(cacheServerClientQueryRequestMeter)
+        registerMeter(cacheServerClientQueryRequestTimer)
+        registerMeter(cacheServerQueryProcessTimer)
+        registerMeter(cacheServerQueryResponseWrittenMeter)
+        registerMeter(cacheServerQueryResponseWrittenTimer)
+
+        registerMeter(cacheServerClientDestroyRegionRequestMeter)
+        registerMeter(cacheServerClientDestroyRegionRequestTimer)
+        registerMeter(cacheServerClientDestroyRegionProcessTimer)
+        registerMeter(cacheServerClientDestroyRegionResponseWrittenMeter)
+        registerMeter(cacheServerClientDestroyRegionResponseWrittenTimer)
+
+        registerMeter(cacheServerClientContainsKeyRequestMeter)
+        registerMeter(cacheServerClientContainsKeyRequestTimer)
+        registerMeter(cacheServerClientContainsKeyProcessTimer)
+        registerMeter(cacheServerClientContainsKeyResponseWrittenMeter)
+        registerMeter(cacheServerClientContainsKeyResponseWrittenTimer)
+
+        registerMeter(cacheServerProcessBatchRequestMeter)
+        registerMeter(cacheServerProcessBatchRequestTimer)
+        registerMeter(cacheServerProcessBatchProcessTimer)
+        registerMeter(cacheServerProcessBatchResponseWrittenMeter)
+        registerMeter(cacheServerProcessBatchResponseWrittenTimer)
+        registerMeter(cacheServerProcessBatchSizeMeter)
+
+        registerMeter(cacheServerClientClearRegionRequestMeter)
+        registerMeter(cacheServerClientClearRegionRequestTimer)
+        registerMeter(cacheServerClearRegionProcessTimer)
+        registerMeter(cacheServerClearRegionResponseWrittenMeter)
+        registerMeter(cacheServerClearRegionResponseWrittenTimer)
+
+
+        registerMeter(cacheServerClientNotificationRequestMeter)
+        registerMeter(cacheServerClientNotificationRequestTimer)
+        registerMeter(cacheServerClientNotificationProcessTimer)
+
+        registerMeter(cacheServerUpdateClientNotificationRequestMeter)
+        registerMeter(cacheServerUpdateClientNotificationRequestTimer)
+        registerMeter(cacheServerUpdateClientNotificationProcessTimer)
+
+        registerMeter(cacheServerClientReadyRequestMeter)
+        registerMeter(cacheServerClientReadyRequestTimer)
+        registerMeter(cacheServerClientReadyProcessTimer)
+        registerMeter(cacheServerClientReadyResponseWrittenMeter)
+        registerMeter(cacheServerClientReadyResponseWrittenTimer)
+
+        registerMeter(cacheServerClientCloseConnectionRequestMeter)
+        registerMeter(cacheServerClientCloseConnectionRequestTimer)
+        registerMeter(cacheServerClientCloseConnectionProcessTimer)
+
+
+        registerMeter(cacheServerFailedConnectionAttemptsMeter)
+        registerMeter(cacheServerCurrentConnectionMeter)
+        registerMeter(cacheServerCurrentQueueConnectionMeter)
+        registerMeter(cacheServerClientCountMeter)
+        registerMeter(cacheServerOutOfOrderGatewayBatchIdMeter)
+
+        registerMeter(cacheServerClientWriteRequestAbandondMeter)
+        registerMeter(cacheServerClientReadRequestAbandondMeter)
+        registerMeter(cacheServerReceivedBytesMeter)
+        registerMeter(cacheServerSentBytesMeter)
+        registerMeter(cacheServerMessagesReceivedMeter)
+        registerMeter(cacheServerMessagesReceivedBytesMeter)
+        registerMeter(cacheServerConnectionTimeoutMeter)
+        registerMeter(cacheServerThreadQueueSizeMeter)
+        registerMeter(cacheServerConnectionAcceptInProgressMeter)
+        registerMeter(cacheServerConnectionAcceptStartMeter)
+        registerMeter(cacheServerConnectionStartMeter)
+        registerMeter(cacheServerConnectionThreadsMeter)
+        registerMeter(cacheServerConnectionLoadMeter)
+        registerMeter(cacheServerLoadPerConnectionMeter)
+        registerMeter(cacheServerQueueLoadMeter)
+        registerMeter(cacheServerLoadPerQueueMeter)
+    }
+
+//    var alldescriptors = serverStatDescriptors
+//    if (descriptors != null)
+//    {
+//        alldescriptors = arrayOfNulls<StatisticDescriptor>(descriptors.size + serverStatDescriptors.size)
+//        System.arraycopy(descriptors, 0, alldescriptors, 0, descriptors.size)
+//        System.arraycopy(serverStatDescriptors, 0, alldescriptors, descriptors.size,
+//                serverStatDescriptors.size)
+//    }
+//    statType = factory.createType(typeName, typeName, alldescriptors)
+//    this.stats = factory.createAtomicStatistics(statType, ownerName)
+
+    fun incAcceptThreadsCreated() {
+        cacheServerConnectionAcceptStartMeter.increment()
+    }
+
+    fun incConnectionThreadsCreated() {
+        cacheServerConnectionStartMeter.increment()
+    }
+
+    fun incAcceptsInProgress() {
+        cacheServerConnectionAcceptInProgressMeter.increment()
+    }
+
+    fun decAcceptsInProgress() {
+        cacheServerConnectionAcceptInProgressMeter.decrement()
+    }
+
+    fun incConnectionThreads() {
+        cacheServerConnectionThreadsMeter.increment()
+    }
+
+    fun decConnectionThreads() {
+        cacheServerConnectionThreadsMeter.decrement()
+    }
+
+    fun incAbandonedWriteRequests() {
+        cacheServerClientWriteRequestAbandondMeter.increment()
+    }
+
+    fun incAbandonedReadRequests() {
+        cacheServerClientReadRequestAbandondMeter.increment()
+    }
+
+    fun incFailedConnectionAttempts() {
+        cacheServerFailedConnectionAttemptsMeter.increment()
+    }
+
+    fun incConnectionsTimedOut() {
+        cacheServerConnectionTimeoutMeter.increment()
+    }
+
+    fun incCurrentClientConnections() {
+        cacheServerCurrentConnectionMeter.increment()
+    }
+
+    fun decCurrentClientConnections() {
+        cacheServerCurrentConnectionMeter.decrement()
+    }
+
+    fun incCurrentQueueConnections() {
+        cacheServerCurrentQueueConnectionMeter.increment()
+    }
+
+    fun decCurrentQueueConnections() {
+        cacheServerCurrentQueueConnectionMeter.decrement()
+    }
+
+    fun incCurrentClients() {
+        cacheServerClientCountMeter.increment()
+    }
+
+    fun decCurrentClients() {
+        cacheServerClientCountMeter.decrement()
+    }
+
+    fun incThreadQueueSize() {
+        cacheServerThreadQueueSizeMeter.increment()
+    }
+
+    fun decThreadQueueSize() {
+        cacheServerThreadQueueSizeMeter.decrement()
+    }
+
+    fun incReadGetRequestTime(delta: Long) {
+        cacheServerClientGetRequestTimer.recordValue(delta)
+        cacheServerClientGetRequestMeter.increment()
+    }
+
+    fun incProcessGetTime(delta: Long) {
+        cacheServerGetProcessTimer.recordValue(delta)
+    }
+
+    fun incWriteGetResponseTime(delta: Long) {
+        cacheServerGetResponseWrittenTimer.recordValue(delta)
+        cacheServerGetResponseWrittenMeter.increment()
+    }
+
+    fun incReadPutAllRequestTime(delta: Long) {
+        cacheServerClientPutAllRequestTimer.recordValue(delta)
+        cacheServerClientPutAllRequestMeter.increment()
+    }
+
+    fun incProcessPutAllTime(delta: Long) {
+        cacheServerPutAllProcessTimer.recordValue(delta)
+    }
+
+    fun incWritePutAllResponseTime(delta: Long) {
+        cacheServerPutAllResponseWrittenTimer.recordValue(delta)
+        cacheServerPutAllResponseWrittenMeter.increment()
+    }
+
+    fun incReadRemoveAllRequestTime(delta: Long) {
+        cacheServerClientRemoveAllRequestTimer.recordValue(delta)
+        cacheServerClientRemoveAllRequestMeter.increment()
+    }
+
+    fun incProcessRemoveAllTime(delta: Long) {
+        cacheServerRemoveAllProcessTimer.recordValue(delta)
+    }
+
+    fun incWriteRemoveAllResponseTime(delta: Long) {
+        cacheServerRemoveAllResponseWrittenTimer.recordValue(delta)
+        cacheServerRemoveAllResponseWrittenMeter.increment()
+    }
+
+    fun incReadGetAllRequestTime(delta: Long) {
+        cacheServerClientGetAllRequestTimer.recordValue(delta)
+        cacheServerClientGetAllRequestMeter.increment()
+    }
+
+    fun incProcessGetAllTime(delta: Long) {
+        cacheServerGetAllProcessTimer.recordValue(delta)
+    }
+
+    fun incWriteGetAllResponseTime(delta: Long) {
+        cacheServerGetAllResponseWrittenTimer.recordValue(delta)
+        cacheServerGetAllResponseWrittenMeter.increment()
+    }
+
+    fun incReadPutRequestTime(delta: Long) {
+        cacheServerClientPutRequestTimer.recordValue(delta)
+        cacheServerClientPutRequestMeter.increment()
+    }
+
+    fun incProcessPutTime(delta: Long) {
+        cacheServerPutProcessTimer.recordValue(delta)
+    }
+
+    fun incWritePutResponseTime(delta: Long) {
+        cacheServerPutResponseWrittenTimer.recordValue(delta)
+        cacheServerPutResponseWrittenMeter.increment()
+    }
+
+    fun incReadDestroyRequestTime(delta: Long) {
+        cacheServerClientDestroyRequestTimer.recordValue(delta)
+        cacheServerClientDestroyRequestMeter.increment()
+    }
+
+    fun incProcessDestroyTime(delta: Long) {
+        cacheServerDestroyProcessTimer.recordValue(delta)
+    }
+
+    fun incWriteDestroyResponseTime(delta: Long) {
+        cacheServerDestroyResponseWrittenTimer.recordValue(delta)
+        cacheServerDestroyResponseWrittenMeter.increment()
+    }
+
+
+    fun incReadInvalidateRequestTime(delta: Long) {
+        cacheServerClientInvaldiateRequestTimer.recordValue(delta)
+        cacheServerClientInvaldiateRequestMeter.increment()
+    }
+
+    fun incProcessInvalidateTime(delta: Long) {
+        cacheServerInvaldiateProcessTimer.recordValue(delta)
+    }
+
+    fun incWriteInvalidateResponseTime(delta: Long) {
+        cacheServerInvaldiateResponseWrittenTimer.recordValue(delta)
+        cacheServerInvaldiateResponseWrittenMeter.increment()
+    }
+
+    fun incReadSizeRequestTime(delta: Long) {
+        cacheServerClientSizeRequestTimer.recordValue(delta)
+        cacheServerClientSizeRequestMeter.increment()
+    }
+
+    fun incProcessSizeTime(delta: Long) {
+        cacheServerSizeProcessTimer.recordValue(delta)
+    }
+
+    fun incWriteSizeResponseTime(delta: Long) {
+        cacheServerSizeResponseWrittenTimer.recordValue(delta)
+        cacheServerSizeResponseWrittenMeter.increment()
+    }
+
+    fun incReadQueryRequestTime(delta: Long) {
+        cacheServerClientQueryRequestTimer.recordValue(delta)
+        cacheServerClientQueryRequestMeter.increment()
+    }
+
+    fun incProcessQueryTime(delta: Long) {
+        cacheServerQueryProcessTimer.recordValue(delta)
+    }
+
+    fun incWriteQueryResponseTime(delta: Long) {
+        cacheServerQueryResponseWrittenTimer.recordValue(delta)
+        cacheServerQueryResponseWrittenMeter.increment()
+    }
+
+    fun incReadDestroyRegionRequestTime(delta: Long) {
+        cacheServerClientDestroyRegionRequestTimer.recordValue(delta)
+        cacheServerClientDestroyRegionRequestMeter.increment()
+    }
+
+    fun incProcessDestroyRegionTime(delta: Long) {
+        cacheServerClientDestroyRegionProcessTimer.recordValue(delta)
+    }
+
+    fun incWriteDestroyRegionResponseTime(delta: Long) {
+        cacheServerClientDestroyRegionResponseWrittenTimer.recordValue(delta)
+        cacheServerClientDestroyRegionResponseWrittenMeter.increment()
+    }
+
+    fun incReadContainsKeyRequestTime(delta: Long) {
+        cacheServerClientContainsKeyRequestTimer.recordValue(delta)
+        cacheServerClientContainsKeyRequestMeter.increment()
+    }
+
+    fun incProcessContainsKeyTime(delta: Long) {
+        cacheServerClientContainsKeyProcessTimer.recordValue(delta)
+    }
+
+    fun incWriteContainsKeyResponseTime(delta: Long) {
+        cacheServerClientContainsKeyResponseWrittenTimer.recordValue(delta)
+        cacheServerClientContainsKeyResponseWrittenMeter.increment()
+    }
+
+    fun incReadClearRegionRequestTime(delta: Long) {
+        cacheServerClientClearRegionRequestTimer.recordValue(delta)
+        cacheServerClientClearRegionRequestMeter.increment()
+    }
+
+    fun incProcessClearRegionTime(delta: Long) {
+        cacheServerClearRegionProcessTimer.recordValue(delta)
+    }
+
+    fun incWriteClearRegionResponseTime(delta: Long) {
+        cacheServerClearRegionResponseWrittenTimer.recordValue(delta)
+        cacheServerClearRegionResponseWrittenMeter.increment()
+    }
+
+    fun incReadProcessBatchRequestTime(delta: Long) {
+        cacheServerProcessBatchRequestTimer.recordValue(delta)
+        cacheServerProcessBatchRequestMeter.increment()
+    }
+
+    fun incWriteProcessBatchResponseTime(delta: Long) {
+        cacheServerProcessBatchResponseWrittenTimer.recordValue(delta)
+        cacheServerProcessBatchResponseWrittenMeter.increment()
+    }
+
+    fun incProcessBatchTime(delta: Long) {
+        cacheServerProcessBatchProcessTimer.recordValue(delta)
+    }
+
+    fun incBatchSize(size: Long) {
+        cacheServerProcessBatchSizeMeter.increment(size)
+    }
+
+    fun incReadClientNotificationRequestTime(delta: Long) {
+        cacheServerClientNotificationRequestTimer.recordValue(delta)
+        cacheServerClientNotificationRequestMeter.increment()
+    }
+
+    fun incProcessClientNotificationTime(delta: Long) {
+        cacheServerClientNotificationProcessTimer.recordValue(delta)
+    }
+
+    fun incReadUpdateClientNotificationRequestTime(delta: Long) {
+        cacheServerUpdateClientNotificationRequestTimer.recordValue(delta)
+        cacheServerUpdateClientNotificationRequestMeter.increment()
+    }
+
+    fun incProcessUpdateClientNotificationTime(delta: Long) {
+        cacheServerUpdateClientNotificationProcessTimer.recordValue(delta)
+    }
+
+    fun incReadCloseConnectionRequestTime(delta: Long) {
+        cacheServerClientCloseConnectionRequestTimer.recordValue(delta)
+        cacheServerClientCloseConnectionRequestMeter.increment()
+    }
+
+    fun incProcessCloseConnectionTime(delta: Long) {
+        cacheServerClientCloseConnectionProcessTimer.recordValue(delta)
+    }
+
+    fun incOutOfOrderBatchIds() {
+        cacheServerOutOfOrderGatewayBatchIdMeter.increment()
+    }
+
+    override fun incReceivedBytes(bytes: Long) {
+        cacheServerReceivedBytesMeter.increment(bytes)
+    }
+
+    override fun incSentBytes(bytes: Long) {
+        cacheServerSentBytesMeter.increment(bytes)
+    }
+
+    override fun incMessagesBytesBeingReceived(bytes: Int) {
+        cacheServerMessagesReceivedMeter.increment()
+        cacheServerMessagesReceivedBytesMeter.increment(bytes)
+    }
+
+    override fun decMessagesBytesBeingReceived(bytes: Int) {
+        cacheServerMessagesReceivedMeter.decrement()
+        cacheServerMessagesReceivedBytesMeter.decrement(bytes)
+    }
+
+    fun incReadClientReadyRequestTime(delta: Long) {
+        cacheServerClientReadyRequestTimer.recordValue(delta)
+        cacheServerClientReadyRequestMeter.increment()
+    }
+
+    fun incProcessClientReadyTime(delta: Long) {
+        cacheServerClientReadyProcessTimer.recordValue(delta)
+    }
+
+    fun incWriteClientReadyResponseTime(delta: Long) {
+        cacheServerClientReadyResponseWrittenTimer.recordValue(delta)
+        cacheServerClientReadyResponseWrittenMeter.increment()
+    }
+
+    fun setLoad(connectionLoad: Int, loadPerConnection: Int, queueLoad: Int, loadPerQueue: Int) {
+        cacheServerConnectionLoadMeter.increment(connectionLoad)
+        cacheServerLoadPerConnectionMeter.increment(loadPerConnection)
+        cacheServerQueueLoadMeter.increment(queueLoad)
+        cacheServerLoadPerQueueMeter.increment(loadPerQueue)
+    }
+
+    val cnxPoolHelper: PoolStatHelper
+        get() = object : PoolStatHelper {
+            override fun startJob() {
+                incConnectionThreads()
+            }
+
+            override fun endJob() {
+                decConnectionThreads()
+            }
+        }
+}
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/client/ClientHealthStats.kt b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/client/ClientHealthStats.kt
new file mode 100644
index 0000000..db41400
--- /dev/null
+++ b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/client/ClientHealthStats.kt
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.statistics.client
+
+import java.io.DataInput
+import java.io.DataOutput
+import java.io.IOException
+import java.io.Serializable
+import java.util.Date
+import java.util.HashMap
+import kotlin.collections.Map.Entry
+
+import org.apache.geode.DataSerializer
+import org.apache.geode.internal.DataSerializableFixedID
+import org.apache.geode.internal.Version
+
+/**
+ * Bean class act as container for client stats
+ *
+ */
+
+data class ClientHealthStats : DataSerializableFixedID, Serializable {
+
+
+    var numOfGets: Int = 0
+
+    /**
+     * "numOfPuts", IntCounter, "The total number of times an entry is added or replaced in this cache
+     * as a result of a local operation (put(), create(), or get() which results in load, netsearch,
+     * or netloading a value). Note that this only counts puts done explicitly on this cache. It does
+     * not count updates pushed from other caches." Java: CachePerfStats.puts Native: Not yet Defined
+     */
+    /**
+     * This method returns the total number of successful put requests completed.
+     *
+     * @return Total number of put requests completed.
+     */
+    /**
+     * This method sets the total number of successful put requests completed.
+     *
+     * @param numOfPuts Total number of put requests to be set.
+     */
+    var numOfPuts: Int = 0
+
+    /**
+     * Represents number of cache misses in this client. IntCounter, "Total number of times a get on
+     * the cache did not find a value already in local memory." Java: CachePerfStats.misses
+     */
+    /**
+     * This method returns total number of cache misses in this client.
+     *
+     * @return total number of cache misses.
+     */
+    /**
+     * This method sets total number of cache misses in this client.
+     *
+     * @param numOfMisses total number of cache misses.
+     */
+    var numOfMisses: Int = 0
+
+    /**
+     * Represents number of cache listners calls completed. IntCounter, "Total number of times a cache
+     * listener call has completed." Java: CachePerfStats.cacheListenerCallsCompleted
+     */
+    /**
+     * This method returns total number of cache listener calls completed.
+     *
+     * @return total number of cache listener calls completed.
+     */
+    /**
+     * This method sets total number of cache listener calls compeleted.
+     *
+     * @param numOfCacheListenerCalls total number of cache listener calls completed.
+     */
+    var numOfCacheListenerCalls: Int = 0
+
+    /**
+     * Represents total number of active threads in the client VM. IntCounter, "Current number of live
+     * threads (both daemon and non-daemon) in this VM." Java: VMStats.threads
+     */
+    /**
+     * This method returns total number of threads in the client VM.
+     *
+     * @return total number of threads in the client VM
+     */
+    /**
+     * This method sets the total number of threads in the client VM.
+     *
+     * @param numOfThreads total number of threads in the client VM
+     */
+    var numOfThreads: Int = 0
+
+    /**
+     * Represents the CPU time used by the process (in nanoseconds). LongCounter, "CPU timed used by
+     * the process in nanoseconds." Java: VMStats.processCpuTime
+     */
+    /**
+     * This method returns the CPU time used by the process (in nanoseconds)
+     *
+     * @return CPU time used by the process (in nanoseconds)
+     */
+    /**
+     * This method sets the CPU time used by the process (in nanoseconds).
+     *
+     * @param processCpuTime CPU time used by the process (in nanoseconds)
+     */
+    var processCpuTime: Long = 0
+
+    /**
+     * Represents the number of cpus available to the java VM on its machine. IntCounter, "Number of
+     * cpus available to the java VM on its machine." Java: VMStats.cpus
+     */
+    var cpus: Int = 0
+
+
+    /**
+     * Represents time when this snapshot of the client statistics was taken.
+     */
+    var updateTime: Date
+
+    /**
+     * Represents stats for a poolName .
+     */
+    var poolStats = HashMap<String, String>()
+
+    val dsfid: Int
+        get() = DataSerializableFixedID.CLIENT_HEALTH_STATS
+
+    val serializationVersions: Array<Version>
+        get() = dsfidVersions
+
+    @Throws(IOException::class)
+    fun toData(out: DataOutput) {
+        DataSerializer.writePrimitiveInt(numOfGets, out)
+        DataSerializer.writePrimitiveInt(numOfPuts, out)
+        DataSerializer.writePrimitiveInt(numOfMisses, out)
+        DataSerializer.writePrimitiveInt(numOfCacheListenerCalls, out)
+        DataSerializer.writePrimitiveInt(numOfThreads, out)
+        DataSerializer.writePrimitiveInt(cpus, out)
+        DataSerializer.writePrimitiveLong(processCpuTime, out)
+        DataSerializer.writeDate(updateTime, out)
+        DataSerializer.writeHashMap(poolStats, out)
+    }
+
+    @Throws(IOException::class)
+    fun toDataPre_GFE_8_0_0_0(out: DataOutput) {
+        DataSerializer.writePrimitiveInt(numOfGets, out)
+        DataSerializer.writePrimitiveInt(numOfPuts, out)
+        DataSerializer.writePrimitiveInt(numOfMisses, out)
+        DataSerializer.writePrimitiveInt(numOfCacheListenerCalls, out)
+        DataSerializer.writePrimitiveInt(numOfThreads, out)
+        DataSerializer.writePrimitiveInt(cpus, out)
+        DataSerializer.writePrimitiveLong(processCpuTime, out)
+        DataSerializer.writeDate(updateTime, out)
+    }
+
+    @Throws(IOException::class, ClassNotFoundException::class)
+    fun fromData(`in`: DataInput) {
+        this.numOfGets = DataSerializer.readPrimitiveInt(`in`)
+        this.numOfPuts = DataSerializer.readPrimitiveInt(`in`)
+        this.numOfMisses = DataSerializer.readPrimitiveInt(`in`)
+        this.numOfCacheListenerCalls = DataSerializer.readPrimitiveInt(`in`)
+        this.numOfThreads = DataSerializer.readPrimitiveInt(`in`)
+        this.cpus = DataSerializer.readPrimitiveInt(`in`)
+        this.processCpuTime = DataSerializer.readPrimitiveLong(`in`)
+        this.updateTime = DataSerializer.readDate(`in`)
+        this.poolStats = DataSerializer.readHashMap(`in`)
+    }
+
+    @Throws(IOException::class, ClassNotFoundException::class)
+    fun fromDataPre_GFE_8_0_0_0(`in`: DataInput) {
+        this.numOfGets = DataSerializer.readPrimitiveInt(`in`)
+        this.numOfPuts = DataSerializer.readPrimitiveInt(`in`)
+        this.numOfMisses = DataSerializer.readPrimitiveInt(`in`)
+        this.numOfCacheListenerCalls = DataSerializer.readPrimitiveInt(`in`)
+        this.numOfThreads = DataSerializer.readPrimitiveInt(`in`)
+        this.cpus = DataSerializer.readPrimitiveInt(`in`)
+        this.processCpuTime = DataSerializer.readPrimitiveLong(`in`)
+        this.updateTime = DataSerializer.readDate(`in`)
+    }
+
+    override fun toString(): String {
+
+        val buf = StringBuffer()
+        buf.append("ClientHealthStats [")
+        buf.append("\n numOfGets=" + this.numOfGets)
+        buf.append("\n numOfPuts=" + this.numOfPuts)
+        buf.append("\n numOfMisses=" + this.numOfMisses)
+        buf.append("\n numOfCacheListenerCalls=" + this.numOfCacheListenerCalls)
+        buf.append("\n numOfThreads=" + this.numOfThreads)
+        buf.append("\n cpus=" + this.cpus)
+        buf.append("\n processCpuTime=" + this.processCpuTime)
+        buf.append("\n updateTime=" + this.updateTime)
+        val it = this.poolStats.entries.iterator()
+        val tempBuffer = StringBuffer()
+        while (it.hasNext()) {
+            val entry = it.next()
+            tempBuffer.append(entry.key + " = " + entry.value)
+        }
+        buf.append("\n poolStats $tempBuffer")
+        buf.append("\n]")
+
+        return buf.toString()
+    }
+
+    companion object {
+        private const val serialVersionUID = 4229401714870332766L
+
+        /** The versions in which this message was modified  */
+        private val dsfidVersions = arrayOf<Version>(Version.GFE_80)
+    }
+}
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/client/connection/ConnectionStats.kt b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/client/connection/ConnectionStats.kt
index 19101da..e372238 100644
--- a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/client/connection/ConnectionStats.kt
+++ b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/client/connection/ConnectionStats.kt
@@ -6,7 +6,7 @@ import org.apache.geode.statistics.internal.micrometer.MicrometerMeterGroup
 import org.apache.geode.statistics.internal.micrometer.TimerStatisticMeter
 
 class ConnectionStats : MicrometerMeterGroup, MessageStats {
-    override fun decMessagesBeingReceived(bytes: Int) {
+    override fun decMessagesBytesBeingReceived(bytes: Int) {
         TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
     }
 
@@ -445,7 +445,7 @@ class ConnectionStats : MicrometerMeterGroup, MessageStats {
         this.clientStats.sentBytesMeter.increment(value.toDouble())
     }
 
-    override fun incMessagesBeingReceived(bytes: Int) {
+    override fun incMessagesBytesBeingReceived(bytes: Int) {
         clientStats.messagesReceivedMeter.increment()
         if (bytes > 0) {
             clientStats.messagesReceivedBytesMeter.increment(bytes.toDouble())
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/offheap/OffHeapMemoryStats.kt b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/offheap/OffHeapMemoryStats.kt
new file mode 100644
index 0000000..60b8938
--- /dev/null
+++ b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/offheap/OffHeapMemoryStats.kt
@@ -0,0 +1,82 @@
+package org.apache.geode.statistics.offheap
+
+import org.apache.geode.statistics.internal.micrometer.CounterStatisticMeter
+import org.apache.geode.statistics.internal.micrometer.GaugeStatisticMeter
+import org.apache.geode.statistics.internal.micrometer.MicrometerMeterGroup
+import org.apache.geode.statistics.internal.micrometer.TimerStatisticMeter
+import org.apache.geode.statistics.util.NOW_NANOS
+
+class OffHeapMemoryStats(regionName: String) : MicrometerMeterGroup("OffHeapMemoryStats-$regionName") {
+    override fun initializeStaticMeters() {
+        registerMeter(offheapMemoryUsedBytesMeter)
+        registerMeter(offheapMemoryMaxBytesMeter)
+        registerMeter(offheapMemoryDefragmentationMeter)
+        registerMeter(offheapMemoryDefragmentationInProgressMeter)
+        registerMeter(offheapMemoryDefragmentationTimer)
+        registerMeter(offheapMemoryFragmentationMeter)
+        registerMeter(offheapMemoryFragmentMeter)
+        registerMeter(offheapMemoryFreeBytesMeter)
+        registerMeter(offheapMemoryLargestFragmentBytesMeter)
+        registerMeter(offheapMemoryObjectCountMeter)
+        registerMeter(offheapMemoryReadCountMeter)
+    }
+
+    private val offheapMemoryUsedBytesMeter = GaugeStatisticMeter("offheap.memory.used.bytes", "The amount of off-heap memory, in bytes, that is being used to store data.", arrayOf("offheapRegion", regionName), unit = "bytes")
+    private val offheapMemoryMaxBytesMeter = GaugeStatisticMeter("offheap.memory.max.bytes", "The maximum amount of off-heap memory, in bytes. This is the amount of memory allocated at startup and does not change.", arrayOf("offheapRegion", regionName))
+    private val offheapMemoryDefragmentationMeter = CounterStatisticMeter("offheap.memory.defragmentations.count", "The total number of times off-heap memory has been defragmented.", arrayOf("offheapRegion", regionName))
+    private val offheapMemoryDefragmentationInProgressMeter = GaugeStatisticMeter("offheap.memory.defragmentations.inprogress.count", "Current number of defragment operations currently in progress.", arrayOf("offheapRegion", regionName))
+    private val offheapMemoryDefragmentationTimer = TimerStatisticMeter("offheap.memory.defragmentation.time", "The total time spent defragmenting off-heap memory.", arrayOf("offheapRegion", regionName), unit = "nanoseconds")
+    private val offheapMemoryFragmentationMeter = GaugeStatisticMeter("offheap.memory.fragmentation.percentage", "The percentage of off-heap free memory that is fragmented.  Updated every time a defragmentation is performed.", arrayOf("offheapRegion", regionName))
+    private val offheapMemoryFragmentMeter = GaugeStatisticMeter("offheap.memory.fragments.count", "The number of fragments of free off-heap memory. Updated every time a defragmentation is done.", arrayOf("offheapRegion", regionName))
+    private val offheapMemoryFreeBytesMeter = GaugeStatisticMeter("offheap.memory.free.bytes", "The amount of off-heap memory, in bytes, that is not being used.", arrayOf("offheapRegion", regionName))
+    private val offheapMemoryLargestFragmentBytesMeter = GaugeStatisticMeter("offheap.memory.largest.fragment.bytes", "The largest fragment of memory found by the last defragmentation of off heap memory. Updated every time a defragmentation is done.", arrayOf("offheapRegion", regionName))
+    private val offheapMemoryObjectCountMeter = GaugeStatisticMeter("offheap.memory.object.count", "The number of objects stored in off-heap memory.", arrayOf("offheapRegion", regionName))
+    private val offheapMemoryReadCountMeter = CounterStatisticMeter("offheap.memory.read.count", "The total number of reads of off-heap memory. Only reads of a full object increment this statistic. If only a part of the object is read this statistic is not incremented.", arrayOf("offheapRegion", regionName))
+
+    fun incFreeMemory(value: Long) {
+        offheapMemoryFreeBytesMeter.increment(value)
+    }
+
+    fun incMaxMemory(value: Long) {
+        offheapMemoryMaxBytesMeter.increment(value)
+    }
+
+    fun incUsedMemory(value: Long) {
+        offheapMemoryUsedBytesMeter.increment(value)
+    }
+
+    fun incObjects(value: Int) {
+        offheapMemoryObjectCountMeter.increment(value)
+    }
+
+    fun incReads() {
+        offheapMemoryReadCountMeter.increment()
+    }
+
+    private fun incDefragmentations() {
+        offheapMemoryDefragmentationMeter.increment()
+    }
+
+    fun setFragments(value: Long) {
+        offheapMemoryFragmentMeter.setValue(value)
+    }
+
+    fun setLargestFragment(value: Int) {
+        offheapMemoryLargestFragmentBytesMeter.setValue(value)
+    }
+
+    fun startDefragmentation(): Long {
+        offheapMemoryDefragmentationInProgressMeter.increment()
+        return NOW_NANOS
+    }
+
+    fun endDefragmentation(start: Long) {
+        incDefragmentations()
+        offheapMemoryDefragmentationInProgressMeter.decrement()
+        offheapMemoryDefragmentationTimer.recordValue(NOW_NANOS - start)
+    }
+
+    fun setFragmentation(value: Int) {
+        offheapMemoryFragmentationMeter.setValue(value)
+    }
+}
\ No newline at end of file
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/region/HARegionQueueStats.kt b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/region/HARegionQueueStats.kt
new file mode 100755
index 0000000..3bf2ed1
--- /dev/null
+++ b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/region/HARegionQueueStats.kt
@@ -0,0 +1,80 @@
+package org.apache.geode.statistics.region
+
+import org.apache.geode.statistics.internal.micrometer.CounterStatisticMeter
+import org.apache.geode.statistics.internal.micrometer.MicrometerMeterGroup
+
+class HARegionQueueStats(queueName: String) : MicrometerMeterGroup("ClientSubscriptionStats-$queueName") {
+    override fun initializeStaticMeters() {
+        registerMeter(haRegionQueueEventsQueuedMeter)
+        registerMeter(haRegionQueueEventsConflatedMeter)
+        registerMeter(haRegionQueueMarkerEventsConflatedMeter)
+        registerMeter(haRegionQueueEventsRemovedMeter)
+        registerMeter(haRegionQueueEventsTakenMeter)
+        registerMeter(haRegionQueueEventsExpiredMeter)
+        registerMeter(haRegionQueueEventsTakenQRMMeter)
+        registerMeter(haRegionThreadIdentifiersMeter)
+        registerMeter(haRegionQueueEventsDispatchedMeter)
+        registerMeter(haRegionQueueEventsVoidRemovalMeter)
+        registerMeter(haRegionQueueEventsViolationMeter)
+    }
+
+    private val haRegionQueueEventsQueuedMeter = CounterStatisticMeter("ha.region.queue.events.queued.count", "Number of events added to queue.", arrayOf("queueName", queueName))
+    private val haRegionQueueEventsConflatedMeter = CounterStatisticMeter("ha.region.queue.events.conflated.count", "Number of events conflated for the queue.", arrayOf("queueName", queueName))
+    private val haRegionQueueMarkerEventsConflatedMeter = CounterStatisticMeter("ha.region.queue.events.conflated.count", "Number of marker events conflated for the queue.", arrayOf("queueName", queueName, "eventType", "marker"))
+    private val haRegionQueueEventsRemovedMeter = CounterStatisticMeter("ha.region.queue.events.removed.count", "Number of events removed from the queue.", arrayOf("queueName", queueName))
+    private val haRegionQueueEventsTakenMeter = CounterStatisticMeter("ha.region.queue.events.taken.count", "Number of events taken from the queue.", arrayOf("queueName", queueName))
+    private val haRegionQueueEventsExpiredMeter = CounterStatisticMeter("ha.region.queue.events.expired.count", "Number of events expired from the queue.", arrayOf("queueName", queueName))
+    private val haRegionQueueEventsTakenQRMMeter = CounterStatisticMeter("ha.region.queue.events.taken.count", "Number of events removed by QRM message.", arrayOf("queueName", queueName, "subscriber", "qrm"))
+    private val haRegionThreadIdentifiersMeter = CounterStatisticMeter("ha.region.thread.identifier.count", "Number of ThreadIdenfier objects for the queue.", arrayOf("queueName", queueName))
+    private val haRegionQueueEventsDispatchedMeter = CounterStatisticMeter("ha.region.queue.events.dispatched.count", "Number of events that have been dispatched.", arrayOf("queueName", queueName))
+    private val haRegionQueueEventsVoidRemovalMeter = CounterStatisticMeter("ha.region.queue.events.void.removal.count", "Number of void removals from the queue.", arrayOf("queueName", queueName))
+    private val haRegionQueueEventsViolationMeter = CounterStatisticMeter("ha.region.queue.events.violation.count", "Number of events that has violated sequence.", arrayOf("queueName", queueName))
+
+    fun incEventsEnqued() {
+        haRegionQueueEventsQueuedMeter.increment()
+    }
+
+    fun incEventsConflated() {
+        haRegionQueueEventsConflatedMeter.increment()
+    }
+
+    fun incMarkerEventsConflated() {
+        haRegionQueueMarkerEventsConflatedMeter.increment()
+    }
+
+    fun incEventsRemoved() {
+        haRegionQueueEventsRemovedMeter.increment()
+    }
+
+    fun incEventsTaken() {
+        haRegionQueueEventsTakenMeter.increment()
+    }
+
+    fun incEventsExpired() {
+        haRegionQueueEventsExpiredMeter.increment()
+    }
+
+    fun incEventsRemovedByQrm() {
+        haRegionQueueEventsTakenQRMMeter.increment()
+    }
+
+    fun incThreadIdentifiers() {
+        haRegionThreadIdentifiersMeter.increment()
+    }
+
+    fun decThreadIdentifiers() {
+        haRegionThreadIdentifiersMeter.decrement()
+    }
+
+    fun incEventsDispatched() {
+        haRegionQueueEventsDispatchedMeter.increment()
+    }
+
+    fun incNumVoidRemovals() {
+        haRegionQueueEventsVoidRemovalMeter.increment()
+    }
+
+    fun incNumSequenceViolated() {
+        haRegionQueueEventsViolationMeter.increment()
+    }
+}
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/region/PartitionedRegionStats.kt b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/region/PartitionedRegionStats.kt
new file mode 100644
index 0000000..fb1f4b3
--- /dev/null
+++ b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/region/PartitionedRegionStats.kt
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.statistics.region
+
+import org.apache.geode.statistics.internal.micrometer.CounterStatisticMeter
+import org.apache.geode.statistics.internal.micrometer.GaugeStatisticMeter
+import org.apache.geode.statistics.internal.micrometer.MicrometerMeterGroup
+
+
+class PartitionedRegionStats(name: String) : MicrometerMeterGroup("PartitionRegionStats-$name") {
+    /**
+     * Utility map for temporarily holding stat start times.
+     *
+     *
+     * This was originally added to avoid having to add a long volunteeringStarted variable to every
+     * instance of BucketAdvisor. Majority of BucketAdvisors never volunteer and an instance of
+     * BucketAdvisor exists for every bucket defined in a PartitionedRegion which could result in a
+     * lot of unused longs. Volunteering is a rare event and thus the performance implications of a
+     * HashMap lookup is small and preferrable to so many longs. Key: BucketAdvisor, Value: Long
+     */
+    private val startTimeMap: MutableMap<*, *>
+
+
+    var configuredRedundantCopies: Int
+        set(`val`) {
+            this.stats.setInt(configuredRedundantCopiesId, `val`)
+        }
+
+    var actualRedundantCopies: Int
+        set(`val`) {
+            this.stats.setInt(actualRedundantCopiesId, `val`)
+        }
+
+    private val temp = GaugeStatisticMeter("bucketCount", "Number of buckets in this node.")
+    private val temp = CounterStatisticMeter("putsCompleted", "Number of puts completed.")
+    private val temp = CounterStatisticMeter("putOpsRetried",
+            "Number of put operations which had to be retried due to failures.")
+    private val temp = CounterStatisticMeter("putRetries",
+            "Total number of times put operations had to be retried.")
+    private val temp = CounterStatisticMeter("createsCompleted", "Number of creates completed.")
+    private val temp = CounterStatisticMeter("createOpsRetried",
+            "Number of create operations which had to be retried due to failures.")
+    private val temp = CounterStatisticMeter("createRetries",
+            "Total number of times put operations had to be retried.")
+    private val temp = CounterStatisticMeter("preferredReadLocal", "Number of reads satisfied from local store")
+    private val temp = CounterStatisticMeter(PUTALLS_COMPLETED, "Number of putAlls completed.")
+    private val temp = CounterStatisticMeter(PUTALL_MSGS_RETRIED,
+            "Number of putAll messages which had to be retried due to failures.")
+    private val temp = CounterStatisticMeter(PUTALL_RETRIES,
+            "Total number of times putAll messages had to be retried.")
+    private val temp = CounterStatisticMeter(PUTALL_TIME, "Total time spent doing putAlls.", unit="nanoseconds")
+    private val temp = CounterStatisticMeter(REMOVE_ALLS_COMPLETED, "Number of removeAlls completed.")
+    private val temp = CounterStatisticMeter(REMOVE_ALL_MSGS_RETRIED,
+            "Number of removeAll messages which had to be retried due to failures.")
+    private val temp = CounterStatisticMeter(REMOVE_ALL_RETRIES,
+            "Total number of times removeAll messages had to be retried.")
+    private val temp = CounterStatisticMeter(REMOVE_ALL_TIME, "Total time spent doing removeAlls.",
+            unit="nanoseconds")
+    private val temp = CounterStatisticMeter("preferredReadRemote", "Number of reads satisfied from remote store")
+    private val temp = CounterStatisticMeter("getsCompleted", "Number of gets completed.")
+    private val temp = CounterStatisticMeter("getOpsRetried",
+            "Number of get operations which had to be retried due to failures.")
+    private val temp = CounterStatisticMeter("getRetries",
+            "Total number of times get operations had to be retried.")
+    private val temp = CounterStatisticMeter("destroysCompleted", "Number of destroys completed.")
+    private val temp = CounterStatisticMeter("destroyOpsRetried",
+            "Number of destroy operations which had to be retried due to failures.")
+    private val temp = CounterStatisticMeter("destroyRetries",
+            "Total number of times destroy operations had to be retried.")
+    private val temp = CounterStatisticMeter("invalidatesCompleted", "Number of invalidates completed.")
+
+    private val temp = CounterStatisticMeter("invalidateOpsRetried",
+            "Number of invalidate operations which had to be retried due to failures.")
+    private val temp = CounterStatisticMeter("invalidateRetries",
+            "Total number of times invalidate operations had to be retried.")
+    private val temp = CounterStatisticMeter("containsKeyCompleted", "Number of containsKeys completed.")
+
+    private val temp = CounterStatisticMeter("containsKeyOpsRetried",
+            "Number of containsKey or containsValueForKey operations which had to be retried due to failures.")
+    private val temp = CounterStatisticMeter("containsKeyRetries",
+            "Total number of times containsKey or containsValueForKey operations had to be retried.")
+    private val temp = CounterStatisticMeter("containsValueForKeyCompleted",
+            "Number of containsValueForKeys completed.")
+    private val temp = CounterStatisticMeter("PartitionMessagesSent", "Number of PartitionMessages Sent.")
+    private val temp = CounterStatisticMeter("PartitionMessagesReceived", "Number of PartitionMessages Received.")
+    private val temp = CounterStatisticMeter("PartitionMessagesProcessed",
+            "Number of PartitionMessages Processed.")
+    private val temp = CounterStatisticMeter("putTime", "Total time spent doing puts.", unit="nanoseconds")
+    private val temp = CounterStatisticMeter("createTime", "Total time spent doing create operations.",
+            unit="nanoseconds", false)
+    private val temp = CounterStatisticMeter("getTime", "Total time spent performing get operations.",
+            unit="nanoseconds", false)
+    private val temp = CounterStatisticMeter("destroyTime", "Total time spent doing destroys.", unit="nanoseconds",)
+    private val temp = CounterStatisticMeter("invalidateTime", "Total time spent doing invalidates.",
+            unit="nanoseconds")
+    private val temp = CounterStatisticMeter("containsKeyTime",
+            "Total time spent performing containsKey operations.", unit="nanoseconds")
+    private val temp = CounterStatisticMeter("containsValueForKeyTime",
+            "Total time spent performing containsValueForKey operations.", unit="nanoseconds")
+    private val temp = CounterStatisticMeter("partitionMessagesProcessingTime",
+            "Total time spent on PartitionMessages processing.", unit="nanoseconds"
+    private val temp = GaugeStatisticMeter("dataStoreEntryCount",
+            "The number of entries stored in this Cache for the named Partitioned Region. This does not include entries which are tombstones. See CachePerfStats.tombstoneCount.")
+    private val temp = GaugeStatisticMeter("dataStoreBytesInUse",
+            "The current number of bytes stored in this Cache for the named Partitioned Region")
+    private val temp = GaugeStatisticMeter("volunteeringInProgress",
+            "Current number of attempts to volunteer for primary of a bucket.")
+    private val temp = CounterStatisticMeter("volunteeringBecamePrimary",
+            "Total number of attempts to volunteer that ended when this member became primary.")
+    private val temp = CounterStatisticMeter("volunteeringBecamePrimaryTime",
+            "Total time spent volunteering that ended when this member became primary.",
+            unit="nanoseconds")
+    private val temp = CounterStatisticMeter("volunteeringOtherPrimary",
+            "Total number of attempts to volunteer that ended when this member discovered other primary.")
+    private val temp = CounterStatisticMeter("volunteeringOtherPrimaryTime",
+            "Total time spent volunteering that ended when this member discovered other primary.",
+            unit="nanoseconds")
+    private val temp = CounterStatisticMeter("volunteeringClosed",
+            "Total number of attempts to volunteer that ended when this member's bucket closed.")
+    private val temp = CounterStatisticMeter("volunteeringClosedTime",
+            "Total time spent volunteering that ended when this member's bucket closed.",
+            unit="nanoseconds")
+    private val temp = GaugeStatisticMeter("totalNumBuckets", "The total number of buckets.")
+    private val temp = GaugeStatisticMeter("primaryBucketCount",
+            "Current number of primary buckets hosted locally.")
+    private val temp = GaugeStatisticMeter("volunteeringThreads",
+            "Current number of threads volunteering for primary.")
+    private val temp = GaugeStatisticMeter("lowRedundancyBucketCount",
+            "Current number of buckets without full redundancy.")
+    private val temp = GaugeStatisticMeter("noCopiesBucketCount",
+            "Current number of buckets without any copies remaining.")
+    private val temp = GaugeStatisticMeter("configuredRedundantCopies",
+            "Configured number of redundant copies for this partitioned region.")
+    private val temp = GaugeStatisticMeter("actualRedundantCopies",
+            "Actual number of redundant copies for this partitioned region.")
+    private val temp = CounterStatisticMeter("getEntryCompleted", "Number of getEntry operations completed.")
+    private val temp = CounterStatisticMeter("getEntryTime", "Total time spent performing getEntry operations.",
+            unit="nanoseconds")
+
+    private val temp = GaugeStatisticMeter("recoveriesInProgress",
+            "Current number of redundancy recovery operations in progress for this region.")
+    private val temp = CounterStatisticMeter("recoveriesCompleted",
+            "Total number of redundancy recovery operations performed on this region.")
+    private val temp = CounterStatisticMeter("recoveryTime", "Total number time spent recovering redundancy.")
+    private val temp = GaugeStatisticMeter("bucketCreatesInProgress",
+            "Current number of bucket create operations being performed for rebalancing.")
+    private val temp = CounterStatisticMeter("bucketCreatesCompleted",
+            "Total number of bucket create operations performed for rebalancing.")
+    private val temp = CounterStatisticMeter("bucketCreatesFailed",
+            "Total number of bucket create operations performed for rebalancing that failed.")
+    private val temp = CounterStatisticMeter("bucketCreateTime",
+            "Total time spent performing bucket create operations for rebalancing.",
+            unit="nanoseconds")
+    private val temp = GaugeStatisticMeter("primaryTransfersInProgress",
+            "Current number of primary transfer operations being performed for rebalancing.")
+    private val temp = CounterStatisticMeter("primaryTransfersCompleted",
+            "Total number of primary transfer operations performed for rebalancing.")
+    private val temp = CounterStatisticMeter("primaryTransfersFailed",
+            "Total number of primary transfer operations performed for rebalancing that failed.")
+    private val temp = CounterStatisticMeter("primaryTransferTime",
+            "Total time spent performing primary transfer operations for rebalancing.",
+            unit="nanoseconds")
+
+    private val temp = CounterStatisticMeter("applyReplicationCompleted",
+            "Total number of replicated values sent from a primary to this redundant data store.")
+    private val temp = GaugeStatisticMeter("applyReplicationInProgress",
+            "Current number of replication operations in progress on this redundant data store.")
+    private val temp = CounterStatisticMeter("applyReplicationTime",
+            "Total time spent storing replicated values on this redundant data store.",
+            unit="nanoseconds")
+    private val temp = CounterStatisticMeter("sendReplicationCompleted",
+            "Total number of replicated values sent from this primary to a redundant data store.")
+    private val temp = GaugeStatisticMeter("sendReplicationInProgress",
+            "Current number of replication operations in progress from this primary.")
+    private val temp = CounterStatisticMeter("sendReplicationTime",
+            "Total time spent replicating values from this primary to a redundant data store.",
+            unit="nanoseconds")
+    private val temp = CounterStatisticMeter("putRemoteCompleted",
+            "Total number of completed puts that did not originate in the primary. These puts require an extra network hop to the primary.")
+    private val temp = GaugeStatisticMeter("putRemoteInProgress",
+            "Current number of puts in progress that did not originate in the primary.")
+    private val temp = CounterStatisticMeter("putRemoteTime",
+            "Total time spent doing puts that did not originate in the primary.", unit="nanoseconds")
+    private val temp = CounterStatisticMeter("putLocalCompleted",
+            "Total number of completed puts that did originate in the primary. These puts are optimal.")
+    private val temp = GaugeStatisticMeter("putLocalInProgress",
+            "Current number of puts in progress that did originate in the primary.")
+    private val temp = CounterStatisticMeter("putLocalTime",
+            "Total time spent doing puts that did originate in the primary.", unit="nanoseconds")
+
+    private val temp = GaugeStatisticMeter("rebalanceBucketCreatesInProgress",
+            "Current number of bucket create operations being performed for rebalancing.")
+    private val temp = CounterStatisticMeter("rebalanceBucketCreatesCompleted",
+            "Total number of bucket create operations performed for rebalancing.")
+    private val temp = CounterStatisticMeter("rebalanceBucketCreatesFailed",
+            "Total number of bucket create operations performed for rebalancing that failed.")
+    private val temp = CounterStatisticMeter("rebalanceBucketCreateTime",
+            "Total time spent performing bucket create operations for rebalancing.", unit="nanoseconds")
+    private val temp = GaugeStatisticMeter("rebalancePrimaryTransfersInProgress",
+            "Current number of primary transfer operations being performed for rebalancing.")
+    private val temp = CounterStatisticMeter("rebalancePrimaryTransfersCompleted",
+            "Total number of primary transfer operations performed for rebalancing.")
+    private val temp = CounterStatisticMeter("rebalancePrimaryTransfersFailed",
+            "Total number of primary transfer operations performed for rebalancing that failed.")
+    private val temp = CounterStatisticMeter("rebalancePrimaryTransferTime",
+            "Total time spent performing primary transfer operations for rebalancing.", unit="nanoseconds")
+    private val temp = CounterStatisticMeter("prMetaDataSentCount", "total number of times meta data refreshed sent on client's request.")
+
+    private val temp = GaugeStatisticMeter("localMaxMemory", "local max memory in bytes for this region on this member")
+
+    @JvmOverloads
+    fun endPut(start: Long, numInc: Int = 1) {
+        if (CachePerfStats.enableClockStats) {
+            val delta = CachePerfStats.getStatTime() - start
+            this.stats.incLong(putTimeId, delta)
+        }
+        this.stats.incInt(putsCompletedId, numInc)
+    }
+
+    /**
+     * This method sets the end time for putAll and updates the counters
+     *
+     */
+    @JvmOverloads
+    fun endPutAll(start: Long, numInc: Int = 1) {
+        if (CachePerfStats.enableClockStats) {
+            val delta = CachePerfStats.getStatTime() - start
+            this.stats.incLong(fieldId_PUTALL_TIME, delta)
+            // this.putStatsHistogram.endOp(delta);
+
+        }
+        this.stats.incInt(fieldId_PUTALLS_COMPLETED, numInc)
+    }
+
+    @JvmOverloads
+    fun endRemoveAll(start: Long, numInc: Int = 1) {
+        if (CachePerfStats.enableClockStats) {
+            val delta = CachePerfStats.getStatTime() - start
+            this.stats.incLong(fieldId_REMOVE_ALL_TIME, delta)
+        }
+        this.stats.incInt(fieldId_REMOVE_ALLS_COMPLETED, numInc)
+    }
+
+    @JvmOverloads
+    fun endCreate(start: Long, numInc: Int = 1) {
+        if (CachePerfStats.enableClockStats) {
+            this.stats.incLong(createTimeId, CachePerfStats.getStatTime() - start)
+        }
+        this.stats.incInt(createsCompletedId, numInc)
+    }
+
+    @JvmOverloads
+    fun endGet(start: Long, numInc: Int = 1) {
+        if (CachePerfStats.enableClockStats) {
+            val delta = CachePerfStats.getStatTime() - start
+            this.stats.incLong(getTimeId, delta)
+        }
+        this.stats.incInt(getsCompletedId, numInc)
+    }
+
+    fun endDestroy(start: Long) {
+        if (CachePerfStats.enableClockStats) {
+            this.stats.incLong(destroyTimeId, CachePerfStats.getStatTime() - start)
+        }
+        this.stats.incInt(destroysCompletedId, 1)
+    }
+
+    fun endInvalidate(start: Long) {
+        if (CachePerfStats.enableClockStats) {
+            this.stats.incLong(invalidateTimeId, CachePerfStats.getStatTime() - start)
+        }
+        this.stats.incInt(invalidatesCompletedId, 1)
+    }
+
+    @JvmOverloads
+    fun endContainsKey(start: Long, numInc: Int = 1) {
+        if (CachePerfStats.enableClockStats) {
+            this.stats.incLong(containsKeyTimeId, CachePerfStats.getStatTime() - start)
+        }
+        this.stats.incInt(containsKeyCompletedId, numInc)
+    }
+
+    @JvmOverloads
+    fun endContainsValueForKey(start: Long, numInc: Int = 1) {
+        if (CachePerfStats.enableClockStats) {
+            this.stats.incLong(containsValueForKeyTimeId, CachePerfStats.getStatTime() - start)
+        }
+        this.stats.incInt(containsValueForKeyCompletedId, numInc)
+    }
+
+    fun incContainsKeyValueRetries() {
+        this.stats.incInt(containsKeyRetriesId, 1)
+    }
+
+    fun incContainsKeyValueOpsRetried() {
+        this.stats.incInt(containsKeyOpsRetriedId, 1)
+    }
+
+    fun incInvalidateRetries() {
+        this.stats.incInt(invalidateRetriesId, 1)
+    }
+
+    fun incInvalidateOpsRetried() {
+        this.stats.incInt(invalidateOpsRetriedId, 1)
+    }
+
+    fun incDestroyRetries() {
+        this.stats.incInt(destroyRetriesId, 1)
+    }
+
+    fun incDestroyOpsRetried() {
+        this.stats.incInt(destroyOpsRetriedId, 1)
+    }
+
+    fun incPutRetries() {
+        this.stats.incInt(putRetriesId, 1)
+    }
+
+    fun incPutOpsRetried() {
+        this.stats.incInt(putOpsRetriedId, 1)
+    }
+
+    fun incGetOpsRetried() {
+        this.stats.incInt(getOpsRetriedId, 1)
+    }
+
+    fun incGetRetries() {
+        this.stats.incInt(getRetriesId, 1)
+    }
+
+    fun incCreateOpsRetried() {
+        this.stats.incInt(createOpsRetriedId, 1)
+    }
+
+    fun incCreateRetries() {
+        this.stats.incInt(createRetriesId, 1)
+    }
+
+    // ------------------------------------------------------------------------
+    // preferred read stats
+    // ------------------------------------------------------------------------
+
+    fun incPreferredReadLocal() {
+        this.stats.incInt(preferredReadLocalId, 1)
+    }
+
+    fun incPreferredReadRemote() {
+        this.stats.incInt(preferredReadRemoteId, 1)
+    }
+
+    // ------------------------------------------------------------------------
+    // messaging stats
+    // ------------------------------------------------------------------------
+
+    fun startPartitionMessageProcessing(): Long {
+        this.stats.incInt(partitionMessagesReceivedId, 1)
+        return startTime()
+    }
+
+    fun endPartitionMessagesProcessing(start: Long) {
+        if (CachePerfStats.enableClockStats) {
+            val delta = CachePerfStats.getStatTime() - start
+            this.stats.incLong(partitionMessagesProcessingTimeId, delta)
+        }
+        this.stats.incInt(partitionMessagesProcessedId, 1)
+    }
+
+    fun incPartitionMessagesSent() {
+        this.stats.incInt(partitionMessagesSentId, 1)
+    }
+
+    // ------------------------------------------------------------------------
+    // datastore stats
+    // ------------------------------------------------------------------------
+
+    fun incBucketCount(delta: Int) {
+        this.stats.incInt(bucketCountId, delta)
+    }
+
+    fun setBucketCount(i: Int) {
+        this.stats.setInt(bucketCountId, i)
+    }
+
+    fun incDataStoreEntryCount(amt: Int) {
+        this.stats.incInt(dataStoreEntryCountId, amt)
+    }
+
+    fun incBytesInUse(delta: Long) {
+        this.stats.incLong(dataStoreBytesInUseId, delta)
+    }
+
+    fun incPutAllRetries() {
+        this.stats.incInt(fieldId_PUTALL_RETRIES, 1)
+    }
+
+    fun incPutAllMsgsRetried() {
+        this.stats.incInt(fieldId_PUTALL_MSGS_RETRIED, 1)
+    }
+
+    fun incRemoveAllRetries() {
+        this.stats.incInt(fieldId_REMOVE_ALL_RETRIES, 1)
+    }
+
+    fun incRemoveAllMsgsRetried() {
+        this.stats.incInt(fieldId_REMOVE_ALL_MSGS_RETRIED, 1)
+    }
+
+    fun startVolunteering(): Long {
+        this.stats.incInt(volunteeringInProgressId, 1)
+        return CachePerfStats.getStatTime()
+    }
+
+    fun endVolunteeringBecamePrimary(start: Long) {
+        val ts = CachePerfStats.getStatTime()
+        this.stats.incInt(volunteeringInProgressId, -1)
+        this.stats.incInt(volunteeringBecamePrimaryId, 1)
+        if (CachePerfStats.enableClockStats) {
+            val time = ts - start
+            this.stats.incLong(volunteeringBecamePrimaryTimeId, time)
+        }
+    }
+
+    fun endVolunteeringOtherPrimary(start: Long) {
+        val ts = CachePerfStats.getStatTime()
+        this.stats.incInt(volunteeringInProgressId, -1)
+        this.stats.incInt(volunteeringOtherPrimaryId, 1)
+        if (CachePerfStats.enableClockStats) {
+            val time = ts - start
+            this.stats.incLong(volunteeringOtherPrimaryTimeId, time)
+        }
+    }
+
+    fun endVolunteeringClosed(start: Long) {
+        val ts = CachePerfStats.getStatTime()
+        this.stats.incInt(volunteeringInProgressId, -1)
+        this.stats.incInt(volunteeringClosedId, 1)
+        if (CachePerfStats.enableClockStats) {
+            val time = ts - start
+            this.stats.incLong(volunteeringClosedTimeId, time)
+        }
+    }
+
+    fun incTotalNumBuckets(`val`: Int) {
+        this.stats.incInt(totalNumBucketsId, `val`)
+    }
+
+    fun incPrimaryBucketCount(`val`: Int) {
+        this.stats.incInt(primaryBucketCountId, `val`)
+    }
+
+    fun incVolunteeringThreads(`val`: Int) {
+        this.stats.incInt(volunteeringThreadsId, `val`)
+    }
+
+    fun incLowRedundancyBucketCount(`val`: Int) {
+        this.stats.incInt(lowRedundancyBucketCountId, `val`)
+    }
+
+    fun incNoCopiesBucketCount(`val`: Int) {
+        this.stats.incInt(noCopiesBucketCountId, `val`)
+    }
+
+    fun setLocalMaxMemory(l: Long) {
+        this.stats.setLong(localMaxMemoryId, l)
+    }
+
+    // ------------------------------------------------------------------------
+    // startTimeMap methods
+    // ------------------------------------------------------------------------
+
+    /** Put stat start time in holding map for later removal and use by caller  */
+    fun putStartTime(key: Any, startTime: Long) {
+        if (CachePerfStats.enableClockStats) {
+            this.startTimeMap[key] = java.lang.Long.valueOf(startTime)
+        }
+    }
+
+    /** Remove stat start time from holding map to complete a clock stat  */
+    fun removeStartTime(key: Any): Long {
+        val startTime = this.startTimeMap.remove(key) as Long
+        return startTime ?: 0
+    }
+
+    /**
+     * Statistic to track the [Region.getEntry] call
+     *
+     * @param startTime the time the getEntry operation started
+     */
+    fun endGetEntry(startTime: Long) {
+        endGetEntry(startTime, 1)
+    }
+
+    /**
+     * This method sets the end time for update and updates the counters
+     *
+     */
+    fun endGetEntry(start: Long, numInc: Int) {
+        if (CachePerfStats.enableClockStats) {
+            this.stats.incLong(getEntryTimeId, CachePerfStats.getStatTime() - start)
+        }
+        this.stats.incInt(getEntriesCompletedId, numInc)
+    }
+
+    // ------------------------------------------------------------------------
+    // bucket creation, primary transfer stats (see also rebalancing stats below)
+    // ------------------------------------------------------------------------
+    fun startRecovery(): Long {
+        this.stats.incInt(recoveriesInProgressId, 1)
+        return PartitionedRegionStats.statTime
+    }
+
+    fun endRecovery(start: Long) {
+        val ts = PartitionedRegionStats.statTime
+        this.stats.incInt(recoveriesInProgressId, -1)
+        if (CachePerfStats.enableClockStats) {
+            this.stats.incLong(recoveriesTimeId, ts - start)
+        }
+        this.stats.incInt(recoveriesCompletedId, 1)
+    }
+
+    fun startBucketCreate(isRebalance: Boolean): Long {
+        this.stats.incInt(bucketCreatesInProgressId, 1)
+        if (isRebalance) {
+            startRebalanceBucketCreate()
+        }
+        return PartitionedRegionStats.statTime
+    }
+
+    fun endBucketCreate(start: Long, success: Boolean, isRebalance: Boolean) {
+        val ts = PartitionedRegionStats.statTime
+        this.stats.incInt(bucketCreatesInProgressId, -1)
+        if (CachePerfStats.enableClockStats) {
+            this.stats.incLong(bucketCreateTimeId, ts - start)
+        }
+        if (success) {
+            this.stats.incInt(bucketCreatesCompletedId, 1)
+        } else {
+            this.stats.incInt(bucketCreatesFailedId, 1)
+        }
+        if (isRebalance) {
+            endRebalanceBucketCreate(start, ts, success)
+        }
+    }
+
+    fun startPrimaryTransfer(isRebalance: Boolean): Long {
+        this.stats.incInt(primaryTransfersInProgressId, 1)
+        if (isRebalance) {
+            startRebalancePrimaryTransfer()
+        }
+        return PartitionedRegionStats.statTime
+    }
+
+    fun endPrimaryTransfer(start: Long, success: Boolean, isRebalance: Boolean) {
+        val ts = PartitionedRegionStats.statTime
+        this.stats.incInt(primaryTransfersInProgressId, -1)
+        if (CachePerfStats.enableClockStats) {
+            this.stats.incLong(primaryTransferTimeId, ts - start)
+        }
+        if (success) {
+            this.stats.incInt(primaryTransfersCompletedId, 1)
+        } else {
+            this.stats.incInt(primaryTransfersFailedId, 1)
+        }
+        if (isRebalance) {
+            endRebalancePrimaryTransfer(start, ts, success)
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    // rebalancing stats
+    // ------------------------------------------------------------------------
+
+    private fun startRebalanceBucketCreate() {
+        this.stats.incInt(rebalanceBucketCreatesInProgressId, 1)
+    }
+
+    private fun endRebalanceBucketCreate(start: Long, end: Long, success: Boolean) {
+        this.stats.incInt(rebalanceBucketCreatesInProgressId, -1)
+        if (CachePerfStats.enableClockStats) {
+            this.stats.incLong(rebalanceBucketCreateTimeId, end - start)
+        }
+        if (success) {
+            this.stats.incInt(rebalanceBucketCreatesCompletedId, 1)
+        } else {
+            this.stats.incInt(rebalanceBucketCreatesFailedId, 1)
+        }
+    }
+
+    private fun startRebalancePrimaryTransfer() {
+        this.stats.incInt(rebalancePrimaryTransfersInProgressId, 1)
+    }
+
+    private fun endRebalancePrimaryTransfer(start: Long, end: Long, success: Boolean) {
+        this.stats.incInt(rebalancePrimaryTransfersInProgressId, -1)
+        if (CachePerfStats.enableClockStats) {
+            this.stats.incLong(rebalancePrimaryTransferTimeId, end - start)
+        }
+        if (success) {
+            this.stats.incInt(rebalancePrimaryTransfersCompletedId, 1)
+        } else {
+            this.stats.incInt(rebalancePrimaryTransfersFailedId, 1)
+        }
+    }
+
+    fun startApplyReplication(): Long {
+        stats.incInt(applyReplicationInProgressId, 1)
+        return CachePerfStats.getStatTime()
+    }
+
+    fun endApplyReplication(start: Long) {
+        val delta = CachePerfStats.getStatTime() - start
+        stats.incInt(applyReplicationInProgressId, -1)
+        stats.incInt(applyReplicationCompletedId, 1)
+        stats.incLong(applyReplicationTimeId, delta)
+    }
+
+    fun startSendReplication(): Long {
+        stats.incInt(sendReplicationInProgressId, 1)
+        return CachePerfStats.getStatTime()
+    }
+
+    fun endSendReplication(start: Long) {
+        val delta = CachePerfStats.getStatTime() - start
+        stats.incInt(sendReplicationInProgressId, -1)
+        stats.incInt(sendReplicationCompletedId, 1)
+        stats.incLong(sendReplicationTimeId, delta)
+    }
+
+    fun startPutRemote(): Long {
+        stats.incInt(putRemoteInProgressId, 1)
+        return CachePerfStats.getStatTime()
+    }
+
+    fun endPutRemote(start: Long) {
+        val delta = CachePerfStats.getStatTime() - start
+        stats.incInt(putRemoteInProgressId, -1)
+        stats.incInt(putRemoteCompletedId, 1)
+        stats.incLong(putRemoteTimeId, delta)
+    }
+
+    fun startPutLocal(): Long {
+        stats.incInt(putLocalInProgressId, 1)
+        return CachePerfStats.getStatTime()
+    }
+
+    fun endPutLocal(start: Long) {
+        val delta = CachePerfStats.getStatTime() - start
+        stats.incInt(putLocalInProgressId, -1)
+        stats.incInt(putLocalCompletedId, 1)
+        stats.incLong(putLocalTimeId, delta)
+    }
+
+    fun incPRMetaDataSentCount() {
+        this.stats.incLong(prMetaDataSentCountId, 1)
+    }
+
+    companion object {
+
+        private val PUTALLS_COMPLETED = "putAllsCompleted"
+        private val PUTALL_MSGS_RETRIED = "putAllMsgsRetried"
+        private val PUTALL_RETRIES = "putAllRetries"
+        private val PUTALL_TIME = "putAllTime"
+
+        private val REMOVE_ALLS_COMPLETED = "removeAllsCompleted"
+        private val REMOVE_ALL_MSGS_RETRIED = "removeAllMsgsRetried"
+        private val REMOVE_ALL_RETRIES = "removeAllRetries"
+        private val REMOVE_ALL_TIME = "removeAllTime"
+
+        fun startTime(): Long {
+            return CachePerfStats.getStatTime()
+        }
+
+        val statTime: Long
+            get() = CachePerfStats.getStatTime()
+    }
+}// ------------------------------------------------------------------------
+// region op stats
+// ------------------------------------------------------------------------
+/**
+ * This method sets the end time for putAll and updates the counters
+ *
+ */


Mime
View raw message