geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [02/24] incubator-geode git commit: GEODE-1781: refactor internal statistics classes
Date Thu, 18 Aug 2016 16:30:36 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/750996a0/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsDistributedTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsDistributedTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsDistributedTest.java
new file mode 100755
index 0000000..21a8ee0
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsDistributedTest.java
@@ -0,0 +1,851 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.statistics;
+
+import static com.jayway.awaitility.Awaitility.*;
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
+import static com.gemstone.gemfire.test.dunit.Assert.*;
+import static com.gemstone.gemfire.test.dunit.Host.*;
+import static com.gemstone.gemfire.test.dunit.Invoke.*;
+import static java.util.concurrent.TimeUnit.*;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.StatisticDescriptor;
+import com.gemstone.gemfire.Statistics;
+import com.gemstone.gemfire.StatisticsFactory;
+import com.gemstone.gemfire.StatisticsType;
+import com.gemstone.gemfire.StatisticsTypeFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheListener;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionEvent;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.cache.util.RegionMembershipListenerAdapter;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.NanoTimer;
+import com.gemstone.gemfire.internal.statistics.StatArchiveReader.ResourceInst;
+import com.gemstone.gemfire.internal.statistics.StatArchiveReader.StatSpec;
+import com.gemstone.gemfire.internal.statistics.StatArchiveReader.StatValue;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+import com.gemstone.gemfire.test.junit.rules.serializable.SerializableTemporaryFolder;
+
+/**
+ * Distributed tests for {@link Statistics}.
+ *
+ * <p>VM0 performs puts and VM1 receives updates. Both use custom statistics for start/end with increment to add up puts
+ * and updates. Then validation tests values in stat resource instances and uses StatArchiveReader. Both are tested
+ * against static counters in both VMs.
+ *
+ * <p>This test mimics hydratest/locators/cacheDS.conf in an attempt to reproduce bug #45478. So far this test passes
+ * consistently.
+ *
+ * @since GemFire 7.0
+ */
+@Category(DistributedTest.class)
+@SuppressWarnings({ "rawtypes", "serial", "unused" })
+public class StatisticsDistributedTest extends JUnit4CacheTestCase {
+
+  private static final int MAX_PUTS = 1000;
+  private static final int NUM_KEYS = 100;
+  private static final int NUM_PUB_THREADS = 2;
+  private static final int NUM_PUBS = 2;
+  private static final boolean RANDOMIZE_PUTS = true;
+  
+  private static AtomicInteger updateEvents = new AtomicInteger();
+  private static AtomicInteger puts = new AtomicInteger();
+  private static AtomicReference<PubSubStats> subStatsRef = new AtomicReference<>();
+  private static AtomicReferenceArray<PubSubStats> pubStatsRef = new AtomicReferenceArray<>(NUM_PUB_THREADS);
+  private static AtomicReference<RegionMembershipListener> rmlRef = new AtomicReference<>();
+
+  private File directory;
+
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+
+  @Override
+  public final void postSetUp() throws Exception {
+    this.directory = this.temporaryFolder.getRoot();
+  }
+
+  @Override
+  public final void preTearDownCacheTestCase() throws Exception {
+    invokeInEveryVM(() -> cleanup());
+    disconnectAllFromDS(); // because this test enabled stat sampling!
+  }
+  
+  @Test
+  public void testPubAndSubCustomStats() throws Exception {
+    String regionName = "region_" + getName();
+    VM[] pubs = new VM[NUM_PUBS];
+    for (int pubVM = 0; pubVM < NUM_PUBS; pubVM++) {
+      pubs[pubVM] = getHost(0).getVM(pubVM);
+    }
+    VM sub = getHost(0).getVM(NUM_PUBS);
+
+    String subArchive = this.directory.getAbsolutePath() + File.separator + getName() + "_sub" + ".gfs";
+    String[] pubArchives = new String[NUM_PUBS];
+    for (int pubVM = 0; pubVM < NUM_PUBS; pubVM++) {
+      pubArchives[pubVM] = this.directory.getAbsolutePath() + File.separator + getName() + "_pub-" + pubVM + ".gfs";
+    }
+    
+    for (int i = 0; i < NUM_PUBS; i++) {
+      final int pubVM = i;
+      pubs[pubVM].invoke("pub-connect-and-create-data-" + pubVM, () -> {
+        Properties props = new Properties();
+        props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
+        props.setProperty(STATISTIC_SAMPLE_RATE, "1000");
+        props.setProperty(STATISTIC_ARCHIVE_FILE, pubArchives[pubVM]);
+
+        InternalDistributedSystem system = getSystem(props);
+
+        // assert that sampler is working as expected
+        GemFireStatSampler sampler = system.getStatSampler();
+        assertTrue(sampler.isSamplingEnabled());
+        assertTrue(sampler.isAlive());
+        assertEquals(new File(pubArchives[pubVM]), sampler.getArchiveFileName());
+
+        await("awaiting SampleCollector to exist").atMost(30, SECONDS).until(() -> sampler.getSampleCollector() != null);
+
+        SampleCollector sampleCollector = sampler.getSampleCollector();
+        assertNotNull(sampleCollector);
+
+        StatArchiveHandler archiveHandler = sampleCollector.getStatArchiveHandler();
+        assertNotNull(archiveHandler);
+        assertTrue(archiveHandler.isArchiving());
+
+        // create cache and region
+        Cache cache = getCache();
+        RegionFactory<String, Number> factory = cache.createRegionFactory();
+        factory.setScope(Scope.DISTRIBUTED_ACK);
+
+        RegionMembershipListener rml = new RegionMembershipListener();
+        rmlRef.set(rml);
+        factory.addCacheListener(rml);
+        Region<String, Number> region = factory.create(regionName);
+
+        // create the keys
+        if (region.getAttributes().getScope() == Scope.DISTRIBUTED_ACK) {
+          for (int key = 0; key < NUM_KEYS; key++) {
+            region.create("KEY-"+key, null);
+          }
+        }
+      });
+    }
+    
+    DistributedMember subMember = sub.invoke("sub-connect-and-create-keys", () -> {
+      Properties props = new Properties();
+      props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
+      props.setProperty(STATISTIC_SAMPLE_RATE, "1000");
+      props.setProperty(STATISTIC_ARCHIVE_FILE, subArchive);
+
+      InternalDistributedSystem system = getSystem(props);
+
+      PubSubStats statistics = new PubSubStats(system, "sub-1", 1);
+      subStatsRef.set(statistics);
+
+      // assert that sampler is working as expected
+      GemFireStatSampler sampler = system.getStatSampler();
+      assertTrue(sampler.isSamplingEnabled());
+      assertTrue(sampler.isAlive());
+      assertEquals(new File(subArchive), sampler.getArchiveFileName());
+
+      await("awaiting SampleCollector to exist").atMost(30, SECONDS).until(() -> sampler.getSampleCollector() != null);
+
+      SampleCollector sampleCollector = sampler.getSampleCollector();
+      assertNotNull(sampleCollector);
+
+      StatArchiveHandler archiveHandler = sampleCollector.getStatArchiveHandler();
+      assertNotNull(archiveHandler);
+      assertTrue(archiveHandler.isArchiving());
+
+      // create cache and region with UpdateListener
+      Cache cache = getCache();
+      RegionFactory<String, Number> factory = cache.createRegionFactory();
+      factory.setScope(Scope.DISTRIBUTED_ACK);
+
+      CacheListener<String, Number> cl = new UpdateListener(statistics);
+      factory.addCacheListener(cl);
+      Region<String, Number> region = factory.create(regionName);
+
+      // create the keys
+      if (region.getAttributes().getScope() == Scope.DISTRIBUTED_ACK) {
+        for (int key = 0; key < NUM_KEYS; key++) {
+          region.create("KEY-"+key, null);
+        }
+      }
+
+      assertEquals(0, statistics.getUpdateEvents());
+      return system.getDistributedMember();
+    });
+    
+    for (int i = 0; i < NUM_PUBS; i++) {
+      final int pubVM = i;
+      AsyncInvocation[] publishers = new AsyncInvocation[NUM_PUB_THREADS];
+      for (int j = 0; j < NUM_PUB_THREADS; j++) {
+        final int pubThread = j;
+        publishers[pubThread] = pubs[pubVM].invokeAsync("pub-connect-and-put-data-" + pubVM + "-thread-" + pubThread, () -> {
+          PubSubStats statistics = new PubSubStats(basicGetSystem(), "pub-" + pubThread, pubVM);
+          pubStatsRef.set(pubThread, statistics);
+
+          RegionMembershipListener rml = rmlRef.get();
+          Region<String, Number> region = getCache().getRegion(regionName);
+
+          // assert that sub is in rml membership
+          assertNotNull(rml);
+
+          await("awaiting Membership to contain subMember").atMost(30, SECONDS).until(() -> rml.contains(subMember) && rml.size() == NUM_PUBS);
+
+          // publish lots of puts cycling through the NUM_KEYS
+          assertEquals(0, statistics.getPuts());
+
+          // cycle through the keys randomly
+          if (RANDOMIZE_PUTS) {
+            Random randomGenerator = new Random();
+            int key = 0;
+            for (int idx = 0; idx < MAX_PUTS; idx++) {
+              long start = statistics.startPut();
+              key = randomGenerator.nextInt(NUM_KEYS);
+              region.put("KEY-"+key, idx);
+              statistics.endPut(start);
+            }
+
+          // cycle through he keys in order and wrapping back around
+          } else {
+            int key = 0;
+            for (int idx = 0; idx < MAX_PUTS; idx++) {
+              long start = statistics.startPut();
+              region.put("KEY-"+key, idx);
+              key++; // cycle through the keys...
+              if (key >= NUM_KEYS) {
+                key = 0;
+              }
+              statistics.endPut(start);
+            }
+          }
+          assertEquals(MAX_PUTS, statistics.getPuts());
+
+          // wait for 2 samples to ensure all stats have been archived
+          StatisticsType statSamplerType = getSystem().findType("StatSampler");
+          Statistics[] statsArray = getSystem().findStatisticsByType(statSamplerType);
+          assertEquals(1, statsArray.length);
+
+          Statistics statSamplerStats = statsArray[0];
+          int initialSampleCount = statSamplerStats.getInt(StatSamplerStats.SAMPLE_COUNT);
+
+          await("awaiting sampleCount >= 2").atMost(30, SECONDS).until(() -> statSamplerStats.getInt(StatSamplerStats.SAMPLE_COUNT) >= initialSampleCount + 2);
+        });
+      }
+
+      for (int pubThread = 0; pubThread < publishers.length; pubThread++) {
+        publishers[pubThread].join();
+        if (publishers[pubThread].exceptionOccurred()) {
+          fail("Test failed", publishers[pubThread].getException());
+        }
+      }
+    }
+    
+    sub.invoke("sub-wait-for-samples", () -> {
+      // wait for 2 samples to ensure all stats have been archived
+      StatisticsType statSamplerType = getSystem().findType("StatSampler");
+      Statistics[] statsArray = getSystem().findStatisticsByType(statSamplerType);
+      assertEquals(1, statsArray.length);
+
+      Statistics statSamplerStats = statsArray[0];
+      int initialSampleCount = statSamplerStats.getInt(StatSamplerStats.SAMPLE_COUNT);
+
+      await("awaiting sampleCount >= 2").atMost(30, SECONDS).until(() -> statSamplerStats.getInt(StatSamplerStats.SAMPLE_COUNT) >= initialSampleCount + 2);
+
+      // now post total updateEvents to static
+      PubSubStats statistics = subStatsRef.get();
+      assertNotNull(statistics);
+      updateEvents.set(statistics.getUpdateEvents());
+    });
+    
+    // validate pub values against sub values
+    int totalUpdateEvents = sub.invoke(() -> getUpdateEvents());
+    
+    // validate pub values against pub statistics against pub archive
+    for (int i = 0; i < NUM_PUBS; i++) {
+      final int pubIdx = i;
+      pubs[pubIdx].invoke("pub-validation", () -> {
+        // add up all the puts
+        assertEquals(NUM_PUB_THREADS, pubStatsRef.length());
+        int totalPuts = 0;
+        for (int pubThreadIdx = 0; pubThreadIdx < NUM_PUB_THREADS; pubThreadIdx++) {
+          PubSubStats statistics = pubStatsRef.get(pubThreadIdx);
+          assertNotNull(statistics);
+          totalPuts += statistics.getPuts();
+        }
+
+        // assert that total puts adds up to max puts times num threads
+        assertEquals(MAX_PUTS * NUM_PUB_THREADS, totalPuts);
+
+        // assert that archive file contains same values as statistics
+        File archive = new File(pubArchives[pubIdx]);
+        assertTrue(archive.exists());
+
+        StatArchiveReader reader = new StatArchiveReader(new File[]{archive}, null, false);
+
+        double combinedPuts = 0;
+
+        List resources = reader.getResourceInstList();
+        assertNotNull(resources);
+        assertFalse(resources.isEmpty());
+
+        for (Iterator<ResourceInst> iter = resources.iterator(); iter.hasNext();) {
+          ResourceInst ri = iter.next();
+          if (!ri.getType().getName().equals(PubSubStats.TYPE_NAME)) {
+            continue;
+          }
+
+          StatValue[] statValues = ri.getStatValues();
+          for (int idx = 0; idx < statValues.length; idx++) {
+            String statName = ri.getType().getStats()[idx].getName();
+            assertNotNull(statName);
+
+            if (statName.equals(PubSubStats.PUTS)) {
+              StatValue sv = statValues[idx];
+              sv.setFilter(StatValue.FILTER_NONE);
+
+              double mostRecent = sv.getSnapshotsMostRecent();
+              double min = sv.getSnapshotsMinimum();
+              double max = sv.getSnapshotsMaximum();
+              double maxMinusMin = sv.getSnapshotsMaximum() - sv.getSnapshotsMinimum();
+              double mean = sv.getSnapshotsAverage();
+              double stdDev = sv.getSnapshotsStandardDeviation();
+
+              assertEquals(mostRecent, max, 0f);
+
+              double summation = 0;
+              double[] rawSnapshots = sv.getRawSnapshots();
+              for (int j = 0; j < rawSnapshots.length; j++) {
+                summation += rawSnapshots[j];
+              }
+              assertEquals(mean, summation / sv.getSnapshotsSize(), 0);
+
+              combinedPuts += mostRecent;
+            }
+          }
+        }
+
+        // assert that sum of mostRecent values for all puts equals totalPuts
+        assertEquals((double)totalPuts, combinedPuts, 0);
+        puts.getAndAdd(totalPuts);
+      });
+    }
+    
+    // validate pub values against sub values
+    int totalCombinedPuts = 0;
+    for (int i = 0; i < NUM_PUBS; i++) {
+      int pubIdx = i;
+      int totalPuts = pubs[pubIdx].invoke(() -> getPuts());
+      assertEquals(MAX_PUTS * NUM_PUB_THREADS, totalPuts);
+      totalCombinedPuts += totalPuts;
+    }
+    assertEquals(totalCombinedPuts, totalUpdateEvents);
+    assertEquals(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS, totalCombinedPuts);
+    
+    // validate sub values against sub statistics against sub archive
+    final int totalPuts = totalCombinedPuts;
+    sub.invoke("sub-validation", () -> {
+      PubSubStats statistics = subStatsRef.get();
+      assertNotNull(statistics);
+      int updateEvents = statistics.getUpdateEvents();
+      assertEquals(totalPuts, updateEvents);
+      assertEquals(totalUpdateEvents, updateEvents);
+      assertEquals(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS, updateEvents);
+
+      // assert that archive file contains same values as statistics
+      File archive = new File(subArchive);
+      assertTrue(archive.exists());
+
+      StatArchiveReader reader = new StatArchiveReader(new File[]{archive}, null, false);
+
+      double combinedUpdateEvents = 0;
+
+      List resources = reader.getResourceInstList();
+      for (Iterator<ResourceInst> iter = resources.iterator(); iter.hasNext();) {
+        ResourceInst ri = iter.next();
+        if (!ri.getType().getName().equals(PubSubStats.TYPE_NAME)) {
+          continue;
+        }
+
+        StatValue[] statValues = ri.getStatValues();
+        for (int i = 0; i < statValues.length; i++) {
+          String statName = ri.getType().getStats()[i].getName();
+          assertNotNull(statName);
+
+          if (statName.equals(PubSubStats.UPDATE_EVENTS)) {
+            StatValue sv = statValues[i];
+            sv.setFilter(StatValue.FILTER_NONE);
+
+            double mostRecent = sv.getSnapshotsMostRecent();
+            double min = sv.getSnapshotsMinimum();
+            double max = sv.getSnapshotsMaximum();
+            double maxMinusMin = sv.getSnapshotsMaximum() - sv.getSnapshotsMinimum();
+            double mean = sv.getSnapshotsAverage();
+            double stdDev = sv.getSnapshotsStandardDeviation();
+
+            assertEquals(mostRecent, max,0);
+
+            double summation = 0;
+            double[] rawSnapshots = sv.getRawSnapshots();
+            for (int j = 0; j < rawSnapshots.length; j++) {
+              summation += rawSnapshots[j];
+            }
+            assertEquals(mean, summation / sv.getSnapshotsSize(),0);
+
+            combinedUpdateEvents += mostRecent;
+          }
+        }
+      }
+      assertEquals((double)totalUpdateEvents, combinedUpdateEvents,0);
+    });
+    
+    int updateEvents = sub.invoke(() -> readIntStat(new File(subArchive), "PubSubStats", "updateEvents"));
+    assertTrue(updateEvents > 0);
+    assertEquals(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS, updateEvents);
+    
+    int puts = 0;
+    for (int pubVM = 0; pubVM < NUM_PUBS; pubVM++) {
+      int currentPubVM = pubVM;
+      int vmPuts = pubs[pubVM].invoke(() -> readIntStat(new File(pubArchives[currentPubVM]), "PubSubStats", "puts"));
+      assertTrue(vmPuts > 0);
+      assertEquals(MAX_PUTS * NUM_PUB_THREADS, vmPuts);
+      puts += vmPuts;
+    }
+    assertTrue(puts > 0);
+    assertEquals(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS, puts);
+    
+    // use regex "testPubAndSubCustomStats"
+    
+    MultipleArchiveReader reader = new MultipleArchiveReader(this.directory, ".*" + getTestMethodName() + ".*\\.gfs");
+
+    int combinedUpdateEvents = reader.readIntStat(PubSubStats.TYPE_NAME, PubSubStats.UPDATE_EVENTS);
+    assertTrue("Failed to read updateEvents stat values", combinedUpdateEvents > 0);
+    
+    int combinedPuts = reader.readIntStat(PubSubStats.TYPE_NAME, PubSubStats.PUTS);
+    assertTrue("Failed to read puts stat values", combinedPuts > 0);
+    
+    assertTrue("updateEvents is " + combinedUpdateEvents + " but puts is " + combinedPuts, 
+        combinedUpdateEvents == combinedPuts);
+  }
+  
+  static int readIntStat(final File archive, final String typeName, final String statName) throws IOException {
+    MultipleArchiveReader reader = new MultipleArchiveReader(archive);
+    return reader.readIntStat(typeName, statName);
+  }
+
+  /** invoked by reflection */
+  private static void cleanup() {
+    updateEvents.set(0);
+    rmlRef.set(null);
+  }
+
+  /** invoked by reflection */
+  private static int getUpdateEvents() {
+    return updateEvents.get();
+  }
+
+  /** invoked by reflection */
+  private static int getPuts() {
+    return puts.get();
+  }
+
+  public static void main(final String[] args) throws Exception {
+    if (args.length == 2) {
+      final String statType = args[0];
+      final String statName = args[1];
+
+      MultipleArchiveReader reader = new MultipleArchiveReader(new File("."));
+      int value = reader.readIntStat(statType, statName);
+      System.out.println(statType + "#" + statName + "=" + value);
+
+    } else if (args.length == 3) {
+      final String archiveName = args[0];
+      final String statType = args[1];
+      final String statName = args[2];
+
+      File archive = new File(archiveName).getAbsoluteFile();
+      assertTrue("File " + archive + " does not exist!", archive.exists());
+      assertTrue(archive + " exists but is not a file!", archive.isFile());
+
+      MultipleArchiveReader reader = new MultipleArchiveReader(archive);
+      int value = reader.readIntStat(statType, statName);
+      System.out.println(archive + ": " + statType + "#" + statName + "=" + value);
+
+    } else if (args.length == 4) {
+      final String statType1 = args[0];
+      final String statName1 = args[1];
+      final String statType2 = args[2];
+      final String statName2 = args[3];
+
+      MultipleArchiveReader reader = new MultipleArchiveReader(new File("."));
+      int value1 = reader.readIntStat(statType1, statName1);
+      int value2 = reader.readIntStat(statType2, statName2);
+
+      assertTrue(statType1 + "#" + statName1 + "=" + value1 + " does not equal " + statType2 + "#" + statName2 + "=" + value2,
+          value1 == value2);
+    } else {
+      assertEquals("Minimum two args are required: statType statName", 2, args.length);
+    }
+  }
+  
+  /**
+   * @since GemFire 7.0
+   */
+  static class PubSubStats {
+    
+    private static final String TYPE_NAME = "PubSubStats";
+    private static final String TYPE_DESCRIPTION = "Statistics for StatisticsDistributedTest with Pub/Sub.";
+    
+    private static final String INSTANCE_PREFIX = "pubSubStats_";
+
+    private static final String PUTS = "puts";
+    private static final String PUT_TIME = "putTime";
+
+    private static final String UPDATE_EVENTS = "updateEvents";
+
+    private static StatisticsType createType(final StatisticsFactory f) {
+      StatisticsTypeFactory stf = StatisticsTypeFactoryImpl.singleton();
+      StatisticsType type = stf.createType(TYPE_NAME, TYPE_DESCRIPTION, createDescriptors(f));
+      return type;
+    }
+    
+    private static StatisticDescriptor[] createDescriptors(final StatisticsFactory f) {
+      boolean largerIsBetter = true;
+      return new StatisticDescriptor[] {
+        f.createIntCounter
+          ( 
+          PUTS,
+          "Number of puts completed.",
+          "operations",
+          largerIsBetter
+          ),
+        f.createLongCounter
+          ( 
+          PUT_TIME,
+          "Total time spent doing puts.",
+          "nanoseconds",
+          !largerIsBetter
+          ),
+        f.createIntCounter
+          ( 
+          UPDATE_EVENTS,
+          "Number of update events.",
+          "events",
+          largerIsBetter
+          )
+      };
+    }
+    
+    private final Statistics statistics;
+
+    PubSubStats(final StatisticsFactory f, final String name, final int id) {
+      this.statistics = f.createAtomicStatistics(createType(f), INSTANCE_PREFIX + "_" + name, id);
+    }
+    
+    Statistics statistics() {
+      return this.statistics;
+    }
+    
+    void close() {
+      this.statistics.close();
+    }
+    
+    int getUpdateEvents() {
+      return statistics().getInt(UPDATE_EVENTS);
+    }
+    
+    void incUpdateEvents() {
+      incUpdateEvents(1);
+    }
+    
+    void incUpdateEvents(final int amount) {
+      incStat(UPDATE_EVENTS, amount);
+    }
+    
+    int getPuts() {
+      return statistics().getInt(PUTS);
+    }
+    
+    void incPuts() {
+      incPuts(1);
+    }
+    
+    void incPuts(final int amount) {
+      incStat(PUTS, amount);
+    }
+    
+    void incPutTime(final long amount) {
+      incStat(PUT_TIME, amount);
+    }
+    
+    long startPut() {
+      return NanoTimer.getTime();
+    }
+
+    void endPut(final long start) {
+      endPut(start, 1);
+    }
+    
+    void endPut(final long start, final int amount) {
+      long elapsed = NanoTimer.getTime() - start;
+      incPuts(amount);
+      incPutTime(elapsed);
+    }
+    
+    private void incStat(final String statName, final int intValue) {
+      statistics().incInt(statName, intValue);
+    }
+
+    private void incStat(final String statName, final long longValue) {
+      statistics().incLong(statName, longValue);
+    }
+  }
+  
+  /**
+   * @since GemFire 7.0
+   */
+  static class UpdateListener extends CacheListenerAdapter<String, Number> {
+    
+    private final PubSubStats statistics;
+    
+    UpdateListener(final PubSubStats statistics) {
+      this.statistics = statistics;
+    }
+    
+    @Override
+    public void afterUpdate(final EntryEvent<String, Number> event) {
+      this.statistics.incUpdateEvents( 1 );
+    }
+  }
+  
+  /**
+   * @since GemFire 7.0
+   */
+  static class RegionMembershipListener extends RegionMembershipListenerAdapter<String, Number> {
+    
+    private final List<DistributedMember> members = new ArrayList<>();
+    
+    int size() {
+      return this.members.size();
+    }
+    
+    List<DistributedMember> getMembers() {
+      return Collections.unmodifiableList(new ArrayList<>(this.members));
+    }
+    
+    boolean containsId(final DistributedMember member) {
+      for (DistributedMember peer : getMembers()) {
+        if (peer.getId().equals(member.getId())) {
+          return true;
+        }
+      }
+      return false;
+    }
+    
+    boolean contains(final DistributedMember member) {
+      return this.members.contains(member);
+    }
+
+    String debugContains(final DistributedMember member) {
+      StringBuilder sb = new StringBuilder();
+      for (DistributedMember peer : getMembers()) {
+        if (!peer.equals(member)) {
+          InternalDistributedMember peerIDM = (InternalDistributedMember)peer;
+          InternalDistributedMember memberIDM = (InternalDistributedMember)member;
+          sb.append("peer port=").append(peerIDM.getPort()).append(" ");
+          sb.append("member port=").append(memberIDM.getPort()).append(" ");
+        }
+      }
+      return sb.toString();
+    }
+    
+    @Override
+    public void initialMembers(final Region<String, Number> region, final DistributedMember[] initialMembers) {
+      for (int i = 0; i < initialMembers.length; i++) {
+        this.members.add(initialMembers[i]);
+      }
+    }
+    
+    @Override
+    public void afterRemoteRegionCreate(final RegionEvent<String, Number> event) {
+      this.members.add(event.getDistributedMember());
+    }
+    
+    @Override
+    public void afterRemoteRegionDeparture(final RegionEvent<String, Number> event) {
+      this.members.remove(event.getDistributedMember());
+    }
+    
+    @Override
+    public void afterRemoteRegionCrash(final RegionEvent<String, Number> event) {
+      this.members.remove(event.getDistributedMember());
+    }
+  }
+
+  static class MultipleArchiveReader {
+    
+    private final File dir;
+    private final String regex;
+    
+    MultipleArchiveReader(final File dir, final String regex) {
+      this.dir = dir;
+      this.regex = regex;
+    }
+    
+    MultipleArchiveReader(final File dir) {
+      this.dir = dir;
+      this.regex = null;
+    }
+    
+    int readIntStat(final String typeName, final String statName) throws IOException {
+      // directory (maybe directories) with one or more archives
+      if (this.dir.exists() && this.dir.isDirectory()) {
+        List<File> archives = findFilesWithSuffix(this.dir, this.regex, ".gfs");
+        return readIntStatFromArchives(archives, typeName, statName);
+        
+      // one archive file
+      } else if (this.dir.exists() && this.dir.isFile()) {
+        List<File> archives = new ArrayList<File>();
+        archives.add(this.dir);
+        return readIntStatFromArchives(archives, typeName, statName);
+        
+      // failure
+      } else {
+        throw new IllegalStateException(this.dir + " does not exist!");
+      }
+    }
+
+    private int readIntStatFromArchives(final List<File> archives, final String typeName, final String statName) throws IOException {
+      StatValue[] statValues = readStatValues(archives, typeName, statName);
+      assertNotNull("statValues is null!", statValues);
+      assertTrue("statValues is empty!", statValues.length > 0);
+      
+      int value = 0;
+      for (int i = 0; i < statValues.length; i++) {
+        statValues[i].setFilter(StatValue.FILTER_NONE);
+        value += (int)statValues[i].getSnapshotsMaximum();
+      }
+      return value;
+    }
+    
+    private static List<File> findFilesWithSuffix(final File dir, final String regex, final String suffix) {
+      Pattern p = null;
+      if (regex != null) {
+        p = Pattern.compile(regex);
+      }
+      final Pattern pattern = p;
+      
+      return findFiles(
+          dir,
+          (final File file) -> {
+            boolean value = true;
+            if (regex != null) {
+              final Matcher matcher = pattern.matcher(file.getName());
+              value = matcher.matches();
+            }
+            if (suffix != null) {
+              value = value && file.getName().endsWith(suffix);
+            }
+            return value;
+          },
+          true);
+    }
+
+    private static List<File> findFiles(final File dir, final FileFilter filter, final boolean recursive) {
+      File[] tmpfiles = dir.listFiles(filter);
+      List<File> matches;
+      if (tmpfiles == null) {
+        matches = new ArrayList<>();
+      } else {
+        matches = new ArrayList<>(Arrays.asList(tmpfiles));
+      }
+      if (recursive) {
+        File[] files = dir.listFiles();
+        if (files != null) {
+          for (int i = 0; i < files.length; i++) {
+            File file = files[i];
+            if (file.isDirectory()) {
+              matches.addAll(findFiles(file, filter, recursive));
+            }
+          }
+        }
+      }
+      return matches;
+    }
+    
+    private static StatValue[] readStatValues(final List<File> archives, final String typeName, final String statName) throws IOException {
+      final StatSpec statSpec = new StatSpec() {
+        @Override
+        public boolean archiveMatches(File value) {
+          return true;
+        }
+        @Override
+        public boolean typeMatches(String value) {
+          return typeName.equals(value);
+        }
+        @Override
+        public boolean statMatches(String value) {
+          return statName.equals(value);
+        }
+        @Override
+        public boolean instanceMatches(String textId, long numericId) {
+          return true;
+        }
+        @Override
+        public int getCombineType() {
+          return StatSpec.FILE;
+        }
+      };
+  
+      File[] archiveFiles = archives.toArray(new File[archives.size()]);
+      StatSpec[] filters = new StatSpec[] { statSpec };
+      StatArchiveReader reader = new StatArchiveReader(archiveFiles, filters, true);
+      StatValue[] values = reader.matchSpec(statSpec);
+      return values;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/750996a0/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsImplTest.java
new file mode 100644
index 0000000..ab8d587
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsImplTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.statistics;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.function.DoubleSupplier;
+import java.util.function.IntSupplier;
+import java.util.function.LongSupplier;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+/**
+ * Unit tests for {@link StatisticsImpl}.
+ */
+@Category(UnitTest.class)
+public class StatisticsImplTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private StatisticsImpl stats;
+
+  @Before
+  public void createStats() {
+    final StatisticsTypeImpl type = mock(StatisticsTypeImpl.class);
+    when(type.getIntStatCount()).thenReturn(5);
+    when(type.getDoubleStatCount()).thenReturn(5);
+    when(type.getLongStatCount()).thenReturn(5);
+    final String textId = "";
+    final long numbericId = 0;
+    final long uniqueId = 0;
+    final int osStatFlags = 0;
+    final boolean atomicIncrements = false;
+    final StatisticsManager system = mock(StatisticsManager.class);
+    stats = new LocalStatisticsImpl(type, textId, numbericId, uniqueId, atomicIncrements, osStatFlags, system);
+  }
+
+  @Test
+  public void invokeIntSuppliersShouldUpdateStats() {
+    IntSupplier supplier1 = mock(IntSupplier.class);
+    when(supplier1.getAsInt()).thenReturn(23);
+    stats.setIntSupplier(4, supplier1);
+    assertEquals(0, stats.invokeSuppliers());
+
+    verify(supplier1).getAsInt();
+    assertEquals(23, stats.getInt(4));
+  }
+
+  @Test
+  public void invokeLongSuppliersShouldUpdateStats() {
+    LongSupplier supplier1 = mock(LongSupplier.class);
+    when(supplier1.getAsLong()).thenReturn(23L);
+    stats.setLongSupplier(4, supplier1);
+    assertEquals(0, stats.invokeSuppliers());
+
+    verify(supplier1).getAsLong();
+    assertEquals(23L, stats.getLong(4));
+  }
+
+  @Test
+  public void invokeDoubleSuppliersShouldUpdateStats() {
+    DoubleSupplier supplier1 = mock(DoubleSupplier.class);
+    when(supplier1.getAsDouble()).thenReturn(23.3);
+    stats.setDoubleSupplier(4, supplier1);
+    assertEquals(0, stats.invokeSuppliers());
+
+    verify(supplier1).getAsDouble();
+    assertEquals(23.3, stats.getDouble(4), 0.1f);
+  }
+
+  @Test
+  public void getSupplierCountShouldReturnCorrectCount() {
+    IntSupplier supplier1 = mock(IntSupplier.class);
+    stats.setIntSupplier(4, supplier1);
+    assertEquals(1, stats.getSupplierCount());
+  }
+
+  @Test
+  public void invokeSuppliersShouldCatchSupplierErrorsAndReturnCount() {
+    IntSupplier supplier1 = mock(IntSupplier.class);
+    when(supplier1.getAsInt()).thenThrow(NullPointerException.class);
+    stats.setIntSupplier(4, supplier1);
+    assertEquals(1, stats.invokeSuppliers());
+
+    verify(supplier1).getAsInt();
+  }
+
+  @Test
+  public void invokeSuppliersShouldLogErrorOnlyOnce() {
+    final Logger originalLogger = StatisticsImpl.logger;
+    try {
+      final Logger logger = mock(Logger.class);
+      StatisticsImpl.logger = logger;
+      IntSupplier supplier1 = mock(IntSupplier.class);
+      when(supplier1.getAsInt()).thenThrow(NullPointerException.class);
+      stats.setIntSupplier(4, supplier1);
+      assertEquals(1, stats.invokeSuppliers());
+      verify(logger, times(1)).warn(anyString(), anyString(), anyInt(), isA(NullPointerException.class));
+      assertEquals(1, stats.invokeSuppliers());
+      //Make sure the logger isn't invoked again
+      verify(logger, times(1)).warn(anyString(), anyString(), anyInt(), isA(NullPointerException.class));
+    } finally {
+      StatisticsImpl.logger = originalLogger;
+    }
+  }
+
+  @Test
+  public void badSupplierParamShouldThrowError() {
+    IntSupplier supplier1 = mock(IntSupplier.class);
+    when(supplier1.getAsInt()).thenReturn(23);
+    thrown.expect(IllegalArgumentException.class);
+    stats.setIntSupplier(23, supplier1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/750996a0/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsMonitorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsMonitorJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsMonitorJUnitTest.java
deleted file mode 100755
index b995451..0000000
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsMonitorJUnitTest.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.statistics;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-import java.io.File;
-import java.util.List;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.internal.NanoTimer;
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-/**
- * Unit tests for the StatisticsMonitor class. No disk IO.
- *   
- * @since GemFire 7.0
- */
-@Category(UnitTest.class)
-public class StatisticsMonitorJUnitTest {
-  
-  private TestStatisticsManager manager;
-  private SampleCollector sampleCollector;
-
-  @Before
-  public void setUp() throws Exception {
-    final long startTime = System.currentTimeMillis();
-    this.manager = new TestStatisticsManager(1, getClass().getSimpleName(), startTime);
-    
-    final StatArchiveHandlerConfig mockStatArchiveHandlerConfig = mock(StatArchiveHandlerConfig.class, getClass().getSimpleName() + "$" + StatArchiveHandlerConfig.class.getSimpleName());
-    when(mockStatArchiveHandlerConfig.getArchiveFileName()).thenReturn(new File(""));
-    when(mockStatArchiveHandlerConfig.getArchiveFileSizeLimit()).thenReturn(0L);
-    when(mockStatArchiveHandlerConfig.getArchiveDiskSpaceLimit()).thenReturn(0L);
-    when(mockStatArchiveHandlerConfig.getSystemId()).thenReturn(0L);
-    when(mockStatArchiveHandlerConfig.getSystemStartTime()).thenReturn(0L);
-    when(mockStatArchiveHandlerConfig.getSystemDirectoryPath()).thenReturn("");
-    when(mockStatArchiveHandlerConfig.getProductDescription()).thenReturn(getClass().getSimpleName());
-
-    StatisticsSampler sampler = new TestStatisticsSampler(manager);
-    this.sampleCollector = new SampleCollector(sampler);
-    this.sampleCollector.initialize(mockStatArchiveHandlerConfig, NanoTimer.getTime());
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    if (this.sampleCollector != null) {
-      this.sampleCollector.close();
-      this.sampleCollector = null;
-    }
-    this.manager = null;
-  }
-  
-  @Test
-  public void testAddListener() {
-    TestStatisticsMonitor monitor = new TestStatisticsMonitor();
-    assertTrue(monitor.getStatisticsListenersSnapshot().isEmpty());
-    StatisticsListener listener = new StatisticsListener() {
-      @Override
-      public void handleNotification(StatisticsNotification notification) {
-      }
-    };
-    
-    assertNull(this.sampleCollector.getStatMonitorHandlerSnapshot());
-        
-    monitor.addListener(listener);
-
-    assertFalse(monitor.getStatisticsListenersSnapshot().isEmpty());
-    assertTrue(monitor.getStatisticsListenersSnapshot().contains(listener));
-    assertEquals(1, monitor.getStatisticsListenersSnapshot().size());
-
-    assertNotNull(this.sampleCollector.getStatMonitorHandlerSnapshot());
-    assertFalse(this.sampleCollector.getStatMonitorHandlerSnapshot().getMonitorsSnapshot().isEmpty());
-  }
-  
-  @Test
-  public void testAddExistingListener() {
-    TestStatisticsMonitor monitor = new TestStatisticsMonitor();
-    assertTrue(monitor.getStatisticsListenersSnapshot().isEmpty());
-    StatisticsListener listener = new StatisticsListener() {
-      @Override
-      public void handleNotification(StatisticsNotification notification) {
-      }
-    };
-    
-    monitor.addListener(listener);
-    assertFalse(monitor.getStatisticsListenersSnapshot().isEmpty());
-    assertTrue(monitor.getStatisticsListenersSnapshot().contains(listener));
-    assertEquals(1, monitor.getStatisticsListenersSnapshot().size());
-
-    monitor.addListener(listener);
-    assertFalse(monitor.getStatisticsListenersSnapshot().isEmpty());
-    assertTrue(monitor.getStatisticsListenersSnapshot().contains(listener));
-    assertEquals(1, monitor.getStatisticsListenersSnapshot().size());
-  }
-  
-  @Test
-  public void testRemoveListener() {
-    TestStatisticsMonitor monitor = new TestStatisticsMonitor();
-    assertTrue(monitor.getStatisticsListenersSnapshot().isEmpty());
-    StatisticsListener listener = new StatisticsListener() {
-      @Override
-      public void handleNotification(StatisticsNotification notification) {
-      }
-    };
-    
-    assertNull(this.sampleCollector.getStatMonitorHandlerSnapshot());
-
-    monitor.addListener(listener);
-    assertFalse(monitor.getStatisticsListenersSnapshot().isEmpty());
-    assertTrue(monitor.getStatisticsListenersSnapshot().contains(listener));
-    assertEquals(1, monitor.getStatisticsListenersSnapshot().size());
-    
-    assertNotNull(this.sampleCollector.getStatMonitorHandlerSnapshot());
-    assertFalse(this.sampleCollector.getStatMonitorHandlerSnapshot().getMonitorsSnapshot().isEmpty());
-    
-    monitor.removeListener(listener);
-    assertTrue(monitor.getStatisticsListenersSnapshot().isEmpty());
-    assertFalse(monitor.getStatisticsListenersSnapshot().contains(listener));
-
-    assertNotNull(this.sampleCollector.getStatMonitorHandlerSnapshot());
-    assertTrue(this.sampleCollector.getStatMonitorHandlerSnapshot().getMonitorsSnapshot().isEmpty());
-  }
-  
-  @Test
-  public void testRemoveMissingListener() {
-    TestStatisticsMonitor monitor = new TestStatisticsMonitor();
-    assertTrue(monitor.getStatisticsListenersSnapshot().isEmpty());
-    StatisticsListener listener = new StatisticsListener() {
-      @Override
-      public void handleNotification(StatisticsNotification notification) {
-      }
-    };
-    
-    assertTrue(monitor.getStatisticsListenersSnapshot().isEmpty());
-    assertFalse(monitor.getStatisticsListenersSnapshot().contains(listener));
-
-    monitor.removeListener(listener);
-
-    assertTrue(monitor.getStatisticsListenersSnapshot().isEmpty());
-    assertFalse(monitor.getStatisticsListenersSnapshot().contains(listener));
-  }
-  
-  // TODO: test addStatistic
-  // TODO: test removeStatistic
-  // TODO: test monitor and/or monitorStatisticIds
-  // TODO: test notifyListeners
-  
-  /**
-   * @since GemFire 7.0
-   */
-  static class TestStatisticsMonitor extends StatisticsMonitor {
-    private volatile long timeStamp;
-    private volatile List<ResourceInstance> resourceInstances;
-    private volatile int notificationCount;
-    
-    public TestStatisticsMonitor() {
-      super();
-    }
-    
-    @Override
-    protected void monitor(long timeStamp, List<ResourceInstance> resourceInstances) {
-      this.timeStamp = timeStamp;
-      this.resourceInstances = resourceInstances;
-      this.notificationCount++;
-    }
-    
-    long getTimeStamp() {
-      return this.timeStamp;
-    }
-    
-    List<ResourceInstance> getResourceInstances() {
-      return this.resourceInstances;
-    }
-    
-    int getNotificationCount() {
-      return this.notificationCount;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/750996a0/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsMonitorTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsMonitorTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsMonitorTest.java
new file mode 100755
index 0000000..bfd27c4
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/StatisticsMonitorTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.statistics;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.internal.NanoTimer;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+/**
+ * Unit tests for the StatisticsMonitor class. No disk IO.
+ *   
+ * @since GemFire 7.0
+ */
+@Category(UnitTest.class)
+public class StatisticsMonitorTest {
+  
+  private TestStatisticsManager manager;
+  private SampleCollector sampleCollector;
+
+  @Before
+  public void setUp() throws Exception {
+    final long startTime = System.currentTimeMillis();
+    this.manager = new TestStatisticsManager(1, getClass().getSimpleName(), startTime);
+    
+    final StatArchiveHandlerConfig mockStatArchiveHandlerConfig = mock(StatArchiveHandlerConfig.class, getClass().getSimpleName() + "$" + StatArchiveHandlerConfig.class.getSimpleName());
+    when(mockStatArchiveHandlerConfig.getArchiveFileName()).thenReturn(new File(""));
+    when(mockStatArchiveHandlerConfig.getArchiveFileSizeLimit()).thenReturn(0L);
+    when(mockStatArchiveHandlerConfig.getArchiveDiskSpaceLimit()).thenReturn(0L);
+    when(mockStatArchiveHandlerConfig.getSystemId()).thenReturn(0L);
+    when(mockStatArchiveHandlerConfig.getSystemStartTime()).thenReturn(0L);
+    when(mockStatArchiveHandlerConfig.getSystemDirectoryPath()).thenReturn("");
+    when(mockStatArchiveHandlerConfig.getProductDescription()).thenReturn(getClass().getSimpleName());
+
+    StatisticsSampler sampler = new TestStatisticsSampler(manager);
+    this.sampleCollector = new SampleCollector(sampler);
+    this.sampleCollector.initialize(mockStatArchiveHandlerConfig, NanoTimer.getTime());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (this.sampleCollector != null) {
+      this.sampleCollector.close();
+      this.sampleCollector = null;
+    }
+    this.manager = null;
+  }
+  
+  @Test
+  public void testAddListener() {
+    TestStatisticsMonitor monitor = new TestStatisticsMonitor();
+    assertTrue(monitor.getStatisticsListenersSnapshot().isEmpty());
+    StatisticsListener listener = new StatisticsListener() {
+      @Override
+      public void handleNotification(StatisticsNotification notification) {
+      }
+    };
+    
+    assertNull(this.sampleCollector.getStatMonitorHandlerSnapshot());
+        
+    monitor.addListener(listener);
+
+    assertFalse(monitor.getStatisticsListenersSnapshot().isEmpty());
+    assertTrue(monitor.getStatisticsListenersSnapshot().contains(listener));
+    assertEquals(1, monitor.getStatisticsListenersSnapshot().size());
+
+    assertNotNull(this.sampleCollector.getStatMonitorHandlerSnapshot());
+    assertFalse(this.sampleCollector.getStatMonitorHandlerSnapshot().getMonitorsSnapshot().isEmpty());
+  }
+  
+  @Test
+  public void testAddExistingListener() {
+    TestStatisticsMonitor monitor = new TestStatisticsMonitor();
+    assertTrue(monitor.getStatisticsListenersSnapshot().isEmpty());
+    StatisticsListener listener = new StatisticsListener() {
+      @Override
+      public void handleNotification(StatisticsNotification notification) {
+      }
+    };
+    
+    monitor.addListener(listener);
+    assertFalse(monitor.getStatisticsListenersSnapshot().isEmpty());
+    assertTrue(monitor.getStatisticsListenersSnapshot().contains(listener));
+    assertEquals(1, monitor.getStatisticsListenersSnapshot().size());
+
+    monitor.addListener(listener);
+    assertFalse(monitor.getStatisticsListenersSnapshot().isEmpty());
+    assertTrue(monitor.getStatisticsListenersSnapshot().contains(listener));
+    assertEquals(1, monitor.getStatisticsListenersSnapshot().size());
+  }
+  
+  @Test
+  public void testRemoveListener() {
+    TestStatisticsMonitor monitor = new TestStatisticsMonitor();
+    assertTrue(monitor.getStatisticsListenersSnapshot().isEmpty());
+    StatisticsListener listener = new StatisticsListener() {
+      @Override
+      public void handleNotification(StatisticsNotification notification) {
+      }
+    };
+    
+    assertNull(this.sampleCollector.getStatMonitorHandlerSnapshot());
+
+    monitor.addListener(listener);
+    assertFalse(monitor.getStatisticsListenersSnapshot().isEmpty());
+    assertTrue(monitor.getStatisticsListenersSnapshot().contains(listener));
+    assertEquals(1, monitor.getStatisticsListenersSnapshot().size());
+    
+    assertNotNull(this.sampleCollector.getStatMonitorHandlerSnapshot());
+    assertFalse(this.sampleCollector.getStatMonitorHandlerSnapshot().getMonitorsSnapshot().isEmpty());
+    
+    monitor.removeListener(listener);
+    assertTrue(monitor.getStatisticsListenersSnapshot().isEmpty());
+    assertFalse(monitor.getStatisticsListenersSnapshot().contains(listener));
+
+    assertNotNull(this.sampleCollector.getStatMonitorHandlerSnapshot());
+    assertTrue(this.sampleCollector.getStatMonitorHandlerSnapshot().getMonitorsSnapshot().isEmpty());
+  }
+  
+  @Test
+  public void testRemoveMissingListener() {
+    TestStatisticsMonitor monitor = new TestStatisticsMonitor();
+    assertTrue(monitor.getStatisticsListenersSnapshot().isEmpty());
+    StatisticsListener listener = new StatisticsListener() {
+      @Override
+      public void handleNotification(StatisticsNotification notification) {
+      }
+    };
+    
+    assertTrue(monitor.getStatisticsListenersSnapshot().isEmpty());
+    assertFalse(monitor.getStatisticsListenersSnapshot().contains(listener));
+
+    monitor.removeListener(listener);
+
+    assertTrue(monitor.getStatisticsListenersSnapshot().isEmpty());
+    assertFalse(monitor.getStatisticsListenersSnapshot().contains(listener));
+  }
+  
+  // TODO: test addStatistic
+  // TODO: test removeStatistic
+  // TODO: test monitor and/or monitorStatisticIds
+  // TODO: test notifyListeners
+  
+  /**
+   * @since GemFire 7.0
+   */
+  static class TestStatisticsMonitor extends StatisticsMonitor {
+    private volatile long timeStamp;
+    private volatile List<ResourceInstance> resourceInstances;
+    private volatile int notificationCount;
+    
+    public TestStatisticsMonitor() {
+      super();
+    }
+    
+    @Override
+    protected void monitor(long timeStamp, List<ResourceInstance> resourceInstances) {
+      this.timeStamp = timeStamp;
+      this.resourceInstances = resourceInstances;
+      this.notificationCount++;
+    }
+    
+    long getTimeStamp() {
+      return this.timeStamp;
+    }
+    
+    List<ResourceInstance> getResourceInstances() {
+      return this.resourceInstances;
+    }
+    
+    int getNotificationCount() {
+      return this.notificationCount;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/750996a0/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/TestStatArchiveWriter.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/TestStatArchiveWriter.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/TestStatArchiveWriter.java
index 81d963e..bc3be89 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/TestStatArchiveWriter.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/TestStatArchiveWriter.java
@@ -18,10 +18,6 @@ package com.gemstone.gemfire.internal.statistics;
 
 import java.util.TimeZone;
 
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.internal.NanoTimer;
-import com.gemstone.gemfire.internal.StatArchiveWriter;
-
 /**
  * @since GemFire 7.0
  */
@@ -32,7 +28,7 @@ public class TestStatArchiveWriter extends StatArchiveWriter {
   public static final String WRITER_OS_INFO = "Linux 2.6.18-262.el5";
   public static final String WRITER_MACHINE_INFO = "i386 kuwait";
   
-  public TestStatArchiveWriter(StatArchiveDescriptor archiveDescriptor, LogWriterI18n logger) {
+  public TestStatArchiveWriter(final StatArchiveDescriptor archiveDescriptor) {
     super(archiveDescriptor);
     initialize(WRITER_PREVIOUS_TIMESTAMP_NANOS);
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/750996a0/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/TestStatisticsManager.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/TestStatisticsManager.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/TestStatisticsManager.java
index fbb3dce..01c59f1 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/TestStatisticsManager.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/TestStatisticsManager.java
@@ -18,24 +18,19 @@ package com.gemstone.gemfire.internal.statistics;
 
 import com.gemstone.gemfire.Statistics;
 import com.gemstone.gemfire.StatisticsType;
-import com.gemstone.gemfire.internal.AbstractStatisticsFactory;
-import com.gemstone.gemfire.internal.OsStatisticsFactory;
-import com.gemstone.gemfire.internal.StatisticsManager;
+import com.gemstone.gemfire.internal.statistics.platform.OsStatisticsFactory;
 
 /**
  * @since GemFire 7.0
  */
-public class TestStatisticsManager extends AbstractStatisticsFactory 
-    implements StatisticsManager, OsStatisticsFactory {
+public class TestStatisticsManager extends AbstractStatisticsFactory  implements StatisticsManager, OsStatisticsFactory {
 
-  public TestStatisticsManager(long id, String name, long startTime) {
+  public TestStatisticsManager(final long id, final String name, final long startTime) {
     super(id, name, startTime);
   }
 
   @Override
-  public Statistics createOsStatistics(StatisticsType type, String textId,
-      long numericId, int osStatFlags) {
-    // TODO ?
+  public Statistics createOsStatistics(final StatisticsType type, final String textId, final long numericId, final int osStatFlags) {
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/750996a0/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/TestStatisticsSampler.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/TestStatisticsSampler.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/TestStatisticsSampler.java
index 0905167..b69d480 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/TestStatisticsSampler.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/TestStatisticsSampler.java
@@ -19,16 +19,16 @@ package com.gemstone.gemfire.internal.statistics;
 import java.util.List;
 
 import com.gemstone.gemfire.Statistics;
-import com.gemstone.gemfire.internal.StatisticsManager;
 
 /**
  * @since GemFire 7.0
  */
+@SuppressWarnings("unchecked")
 public class TestStatisticsSampler implements StatisticsSampler {
   
   private final StatisticsManager manager;
   
-  public TestStatisticsSampler(StatisticsManager manager) {
+  public TestStatisticsSampler(final StatisticsManager manager) {
     this.manager = manager;
   }
   
@@ -39,20 +39,19 @@ public class TestStatisticsSampler implements StatisticsSampler {
   
   @Override
   public Statistics[] getStatistics() {
-    @SuppressWarnings("unchecked")
-    List<Statistics> statsList = (List<Statistics>)this.manager.getStatsList();
+    List<Statistics> statsList = this.manager.getStatsList();
     synchronized (statsList) {
-      return (Statistics[])statsList.toArray(new Statistics[statsList.size()]);
+      return statsList.toArray(new Statistics[statsList.size()]);
     }
   }
 
   @Override
-  public boolean waitForSample(long timeout) throws InterruptedException {
+  public boolean waitForSample(final long timeout) throws InterruptedException {
     return false;
   }
   
   @Override
-  public SampleCollector waitForSampleCollector(long timeout) throws InterruptedException {
+  public SampleCollector waitForSampleCollector(final long timeout) throws InterruptedException {
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/750996a0/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/ValueMonitorIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/ValueMonitorIntegrationTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/ValueMonitorIntegrationTest.java
new file mode 100755
index 0000000..9afbe22
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/ValueMonitorIntegrationTest.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.statistics;
+
+import static com.gemstone.gemfire.test.dunit.Wait.*;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.lib.legacy.ClassImposteriser;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import com.gemstone.gemfire.StatisticDescriptor;
+import com.gemstone.gemfire.Statistics;
+import com.gemstone.gemfire.StatisticsType;
+import com.gemstone.gemfire.internal.NanoTimer;
+import com.gemstone.gemfire.internal.statistics.StatisticsNotification.Type;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+/**
+ * Integration test for the SampleCollector class.
+ *   
+ * @since GemFire 7.0
+ */
+@Category(IntegrationTest.class)
+public class ValueMonitorIntegrationTest {
+
+  private Mockery mockContext;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Before
+  public void setUp() throws Exception {
+    this.mockContext = new Mockery() {{
+      setImposteriser(ClassImposteriser.INSTANCE);
+    }};
+  }
+  
+  @After
+  public void tearDown() throws Exception {
+    this.mockContext.assertIsSatisfied();
+    this.mockContext = null;
+  }
+  
+  @Test
+  public void testAddRemoveListener() throws Exception {
+    long startTime = System.currentTimeMillis();
+    List<Statistics> statsList = new ArrayList<Statistics>();
+    StatisticsManager mockStatisticsManager = this.mockContext.mock(StatisticsManager.class, testName.getMethodName() + "$StatisticsManager");
+    this.mockContext.checking(new Expectations() {{
+      allowing(mockStatisticsManager).getName();
+      will(returnValue("mockStatisticsManager"));
+      allowing(mockStatisticsManager).getId();
+      will(returnValue(1));
+      allowing(mockStatisticsManager).getStartTime();
+      will(returnValue(startTime));
+      allowing(mockStatisticsManager).getStatListModCount();
+      will(returnValue(0));
+      allowing(mockStatisticsManager).getStatsList();
+      will(returnValue(statsList));
+    }});
+    
+    StatisticsSampler mockStatisticsSampler = this.mockContext.mock(StatisticsSampler.class, testName.getMethodName() + "$StatisticsSampler");
+    this.mockContext.checking(new Expectations() {{
+      allowing(mockStatisticsSampler).getStatisticsModCount();
+      will(returnValue(0));
+      allowing(mockStatisticsSampler).getStatistics();
+      will(returnValue(new Statistics[]{}));
+    }});
+    
+    StatArchiveHandlerConfig mockStatArchiveHandlerConfig = this.mockContext.mock(StatArchiveHandlerConfig.class, testName.getMethodName() + "$StatArchiveHandlerConfig");
+    this.mockContext.checking(new Expectations() {{
+      allowing(mockStatArchiveHandlerConfig).getArchiveFileName();
+      will(returnValue(new File("")));
+      allowing(mockStatArchiveHandlerConfig).getArchiveFileSizeLimit();
+      will(returnValue(0));
+      allowing(mockStatArchiveHandlerConfig).getArchiveDiskSpaceLimit();
+      will(returnValue(0));
+      allowing(mockStatArchiveHandlerConfig).getSystemId();
+      will(returnValue(1));
+      allowing(mockStatArchiveHandlerConfig).getSystemStartTime();
+      will(returnValue(startTime));
+      allowing(mockStatArchiveHandlerConfig).getSystemDirectoryPath();
+      will(returnValue(""));
+      allowing(mockStatArchiveHandlerConfig).getProductDescription();
+      will(returnValue("testAddRemoveListener"));
+    }});
+    
+    // need a real SampleCollector for this test or the monitor can't get the handler
+    SampleCollector sampleCollector = new SampleCollector(mockStatisticsSampler);
+    sampleCollector.initialize(mockStatArchiveHandlerConfig, NanoTimer.getTime());
+    
+    List<StatisticsNotification> notifications = new ArrayList<>();
+    StatisticsListener listener = (final StatisticsNotification notification) -> {
+      notifications.add(notification);
+    };
+    ValueMonitor monitor = new ValueMonitor();
+    
+    long timeStamp = System.currentTimeMillis();
+    Type type = Type.VALUE_CHANGED;
+    Number value = 43;
+    
+    StatisticsNotification notification = createStatisticsNotification(timeStamp, type, value);
+    monitor.notifyListeners(notification);
+    
+    assertTrue(notifications.isEmpty());
+    
+    monitor.addListener(listener);
+    monitor.notifyListeners(notification);
+    
+    assertEquals(1, notifications.size());
+    notification = notifications.remove(0);
+    assertNotNull(notification);
+
+    assertEquals(timeStamp, notification.getTimeStamp());
+    assertEquals(type, notification.getType());
+    StatisticId statId = createStatisticId(null, null);
+    assertEquals(value, notification.getValue(statId));
+
+    monitor.removeListener(listener);
+    monitor.notifyListeners(notification);
+    
+    assertTrue(notifications.isEmpty());
+  }
+  
+  @Test
+  public void testValueMonitorListener() throws Exception {
+    long startTime = System.currentTimeMillis();
+    TestStatisticsManager manager = new TestStatisticsManager(1, "ValueMonitorIntegrationTest", startTime);
+    StatisticsSampler sampler = new TestStatisticsSampler(manager);
+    
+    StatArchiveHandlerConfig mockStatArchiveHandlerConfig = this.mockContext.mock(StatArchiveHandlerConfig.class, testName.getMethodName() + "$StatArchiveHandlerConfig");
+    this.mockContext.checking(new Expectations() {{
+      allowing(mockStatArchiveHandlerConfig).getArchiveFileName();
+      will(returnValue(new File("")));
+      allowing(mockStatArchiveHandlerConfig).getArchiveFileSizeLimit();
+      will(returnValue(0));
+      allowing(mockStatArchiveHandlerConfig).getArchiveDiskSpaceLimit();
+      will(returnValue(0));
+      allowing(mockStatArchiveHandlerConfig).getSystemId();
+      will(returnValue(1));
+      allowing(mockStatArchiveHandlerConfig).getSystemStartTime();
+      will(returnValue(startTime));
+      allowing(mockStatArchiveHandlerConfig).getSystemDirectoryPath();
+      will(returnValue(""));
+      allowing(mockStatArchiveHandlerConfig).getProductDescription();
+      will(returnValue("testFoo"));
+    }});
+    
+    SampleCollector sampleCollector = new SampleCollector(sampler);
+    sampleCollector.initialize(mockStatArchiveHandlerConfig, NanoTimer.getTime());
+
+    StatisticDescriptor[] statsST1 = new StatisticDescriptor[] {
+        manager.createDoubleCounter("double_counter_1", "double_counter_1_desc", "double_counter_1_units"),
+        manager.createIntCounter(   "int_counter_2",    "int_counter_2_desc",    "int_counter_2_units"),
+        manager.createLongCounter(  "long_counter_3",   "long_counter_3_desc",   "long_counter_3_units")
+    };
+    StatisticsType ST1 = manager.createType("ST1_name", "ST1_desc", statsST1);
+    Statistics st1_1 = manager.createAtomicStatistics(ST1, "st1_1_text", 1);
+    Statistics st1_2 = manager.createAtomicStatistics(ST1, "st1_2_text", 2);
+    
+    st1_1.incDouble("double_counter_1", 1000.0001);
+    st1_1.incInt("int_counter_2", 2);
+    st1_1.incLong("long_counter_3", 3333333333L);
+    
+    st1_2.incDouble("double_counter_1", 2000.0002);
+    st1_2.incInt("int_counter_2", 3);
+    st1_2.incLong("long_counter_3", 4444444444L);
+    
+    List<StatisticsNotification> notifications = new ArrayList<>();
+    StatisticsListener listener = (final StatisticsNotification notification) -> {
+      notifications.add(notification);
+    };
+    ValueMonitor monitor = new ValueMonitor().addStatistics(st1_1);
+    monitor.addListener(listener);
+    
+    assertTrue("Unexpected notifications: " + notifications, notifications.isEmpty());
+
+    long timeStamp = NanoTimer.getTime();
+    sampleCollector.sample(timeStamp);
+
+    awaitAtLeastTimeoutOrUntilNotifications(notifications, 2 * 1000);
+    assertEquals("Unexpected notifications: " + notifications, 1, notifications.size());
+    
+    StatisticsNotification notification = notifications.remove(0);
+    assertEquals(StatisticsNotification.Type.VALUE_CHANGED, notification.getType());
+    
+    // validate 1 notification occurs with all 3 stats of st1_1
+    
+    st1_1.incDouble("double_counter_1", 1.1);
+    st1_1.incInt("int_counter_2", 2);
+    st1_1.incLong("long_counter_3", 3);
+    
+    timeStamp += NanoTimer.millisToNanos(1000);
+    sampleCollector.sample(timeStamp);
+
+    awaitAtLeastTimeoutOrUntilNotifications(notifications, 2 * 1000);
+    assertEquals("Unexpected notifications: " + notifications, 1, notifications.size());
+    
+    notification = notifications.remove(0);
+    assertEquals(StatisticsNotification.Type.VALUE_CHANGED, notification.getType());
+    
+    int statCount = 0;
+    Map<String, Number> expectedValues = new HashMap<>();
+    expectedValues.put("double_counter_1", 1001.1001);
+    expectedValues.put("int_counter_2", 4);
+    expectedValues.put("long_counter_3", 3333333336L);
+    
+    for (StatisticId statId : notification) {
+      Number value = expectedValues.remove(statId.getStatisticDescriptor().getName());
+      assertNotNull(value);
+      assertEquals(value, notification.getValue(statId));
+      statCount++;
+    }
+    assertEquals(3, statCount);
+    
+    // validate no notification occurs when no stats are updated
+    
+    timeStamp += NanoTimer.millisToNanos(1000);
+    sampleCollector.sample(timeStamp);
+
+    awaitAtLeastTimeoutOrUntilNotifications(notifications, 2 * 1000);
+    assertTrue("Unexpected notifications: " + notifications, notifications.isEmpty());
+    
+    // validate no notification occurs when only other stats are updated
+    
+    st1_2.incDouble("double_counter_1", 3.3);
+    st1_2.incInt("int_counter_2", 1);
+    st1_2.incLong("long_counter_3", 2);
+    
+    timeStamp += NanoTimer.millisToNanos(1000);
+    sampleCollector.sample(timeStamp);
+
+    awaitAtLeastTimeoutOrUntilNotifications(notifications, 2 * 1000);
+    assertTrue("Unexpected notifications: " + notifications, notifications.isEmpty());
+    
+    // validate notification only contains stats added to monitor
+    
+    st1_1.incInt("int_counter_2", 100);
+    st1_2.incInt("int_counter_2", 200);
+
+    assertEquals(2, sampleCollector.currentHandlersForTesting().size());
+    
+    timeStamp += NanoTimer.millisToNanos(1000);
+    sampleCollector.sample(timeStamp);
+
+    awaitAtLeastTimeoutOrUntilNotifications(notifications, 2 * 1000);
+    assertEquals("Unexpected notifications: " + notifications, 1, notifications.size());
+    
+    notification = notifications.remove(0);
+    assertEquals(StatisticsNotification.Type.VALUE_CHANGED, notification.getType());
+    
+    statCount = 0;
+    expectedValues = new HashMap<>();
+    expectedValues.put("int_counter_2", 104);
+
+    for (StatisticId statId : notification) {
+      Number value = expectedValues.remove(statId.getStatisticDescriptor().getName());
+      assertNotNull(value);
+      assertEquals(value, notification.getValue(statId));
+      statCount++;
+    }
+    assertEquals(1, statCount);
+  }
+  
+  private StatisticId createStatisticId(final StatisticDescriptor descriptor, final Statistics stats) {
+    return new StatisticId() {
+
+      @Override
+      public StatisticDescriptor getStatisticDescriptor() {
+        return descriptor;
+      }
+
+      @Override
+      public Statistics getStatistics() {
+        return stats;
+      }
+    };
+  }
+  
+  protected StatisticsNotification createStatisticsNotification(final long timeStamp, final Type type, final Number value) {
+    return new StatisticsNotification() {
+
+      @Override
+      public long getTimeStamp() {
+        return timeStamp;
+      }
+
+      @Override
+      public Type getType() {
+        return type;
+      }
+
+      @Override
+      public Iterator<StatisticId> iterator() {
+        return null;
+      }
+
+      @Override
+      public Iterator<StatisticId> iterator(final StatisticDescriptor statDesc) {
+        return null;
+      }
+
+      @Override
+      public Iterator<StatisticId> iterator(final Statistics statistics) {
+        return null;
+      }
+
+      @Override
+      public Iterator<StatisticId> iterator(final StatisticsType statisticsType) {
+        return null;
+      }
+
+      @Override
+      public Number getValue(final StatisticId statId) throws StatisticNotFoundException {
+        return value;
+      }
+    };
+  }
+
+  /**
+   * Wait for at least the specified time or until notifications is >0.
+   */
+  private static void awaitAtLeastTimeoutOrUntilNotifications(final List<StatisticsNotification> notifications, final long timeoutMillis) {
+    long pollingIntervalMillis = 10;
+    boolean throwOnTimeout = false;
+    WaitCriterion wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        return notifications.size() > 0;
+      }
+      @Override
+      public String description() {
+        return "waiting for notification";
+      }
+    };
+    waitForCriterion(wc, timeoutMillis, pollingIntervalMillis, throwOnTimeout);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/750996a0/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/ValueMonitorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/ValueMonitorJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/ValueMonitorJUnitTest.java
deleted file mode 100755
index b671250..0000000
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/statistics/ValueMonitorJUnitTest.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.statistics;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.jmock.Expectations;
-import org.jmock.Mockery;
-import org.jmock.lib.legacy.ClassImposteriser;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.StatisticDescriptor;
-import com.gemstone.gemfire.Statistics;
-import com.gemstone.gemfire.StatisticsType;
-import com.gemstone.gemfire.internal.NanoTimer;
-import com.gemstone.gemfire.internal.StatisticsManager;
-import com.gemstone.gemfire.internal.statistics.StatisticsNotification.Type;
-import com.gemstone.gemfire.test.dunit.Wait;
-import com.gemstone.gemfire.test.dunit.WaitCriterion;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-
-/**
- * Integration test for the SampleCollector class.
- *   
- * @since GemFire 7.0
- */
-@Category(IntegrationTest.class)
-public class ValueMonitorJUnitTest {
-
-  private Mockery mockContext;
-  
-  @Before
-  public void setUp() throws Exception {
-    this.mockContext = new Mockery() {{
-      setImposteriser(ClassImposteriser.INSTANCE);
-    }};
-  }
-  
-  @After
-  public void tearDown() throws Exception {
-    this.mockContext.assertIsSatisfied();
-    this.mockContext = null;
-  }
-  
-  @Test
-  public void testAddRemoveListener() throws Exception {
-    final long startTime = System.currentTimeMillis();
-    final List<Statistics> statsList = new ArrayList<Statistics>();
-    final StatisticsManager mockStatisticsManager = this.mockContext.mock(StatisticsManager.class, "testAddRemoveListener$StatisticsManager");
-    this.mockContext.checking(new Expectations() {{
-      allowing(mockStatisticsManager).getName();
-      will(returnValue("mockStatisticsManager"));
-      allowing(mockStatisticsManager).getId();
-      will(returnValue(1));
-      allowing(mockStatisticsManager).getStartTime();
-      will(returnValue(startTime));
-      allowing(mockStatisticsManager).getStatListModCount();
-      will(returnValue(0));
-      allowing(mockStatisticsManager).getStatsList();
-      will(returnValue(statsList));
-    }});
-    
-    final StatisticsSampler mockStatisticsSampler = this.mockContext.mock(StatisticsSampler.class, "testAddRemoveListener$StatisticsSampler");
-    this.mockContext.checking(new Expectations() {{
-      allowing(mockStatisticsSampler).getStatisticsModCount();
-      will(returnValue(0));
-      allowing(mockStatisticsSampler).getStatistics();
-      will(returnValue(new Statistics[]{}));
-    }});
-    
-    final StatArchiveHandlerConfig mockStatArchiveHandlerConfig = this.mockContext.mock(StatArchiveHandlerConfig.class, "testAddRemoveListener$StatArchiveHandlerConfig");
-    this.mockContext.checking(new Expectations() {{
-      allowing(mockStatArchiveHandlerConfig).getArchiveFileName();
-      will(returnValue(new File("")));
-      allowing(mockStatArchiveHandlerConfig).getArchiveFileSizeLimit();
-      will(returnValue(0));
-      allowing(mockStatArchiveHandlerConfig).getArchiveDiskSpaceLimit();
-      will(returnValue(0));
-      allowing(mockStatArchiveHandlerConfig).getSystemId();
-      will(returnValue(1));
-      allowing(mockStatArchiveHandlerConfig).getSystemStartTime();
-      will(returnValue(startTime));
-      allowing(mockStatArchiveHandlerConfig).getSystemDirectoryPath();
-      will(returnValue(""));
-      allowing(mockStatArchiveHandlerConfig).getProductDescription();
-      will(returnValue("testAddRemoveListener"));
-    }});
-    
-    // need a real SampleCollector for this test or the monitor can't get the handler
-    SampleCollector sampleCollector = new SampleCollector(mockStatisticsSampler);
-    sampleCollector.initialize(mockStatArchiveHandlerConfig, NanoTimer.getTime());
-    
-    final List<StatisticsNotification> notifications = new ArrayList<StatisticsNotification>();
-    StatisticsListener listener = new StatisticsListener() {
-      @Override
-      public void handleNotification(StatisticsNotification notification) {
-        notifications.add(notification);
-      }
-    };
-    ValueMonitor monitor = new ValueMonitor();
-    
-    long timeStamp = System.currentTimeMillis();
-    Type type = Type.VALUE_CHANGED;
-    Number value = 43;
-    
-    StatisticsNotification notification = createStatisticsNotification(timeStamp, type, value);
-    monitor.notifyListeners(notification);
-    
-    assertTrue(notifications.isEmpty());
-    
-    monitor.addListener(listener);
-    monitor.notifyListeners(notification);
-    
-    assertEquals(1, notifications.size());
-    notification = notifications.remove(0);
-    assertNotNull(notification);
-
-    assertEquals(timeStamp, notification.getTimeStamp());
-    assertEquals(type, notification.getType());
-    StatisticId statId = createStatisticId(null, null);
-    assertEquals(value, notification.getValue(statId));
-
-    monitor.removeListener(listener);
-    monitor.notifyListeners(notification);
-    
-    assertTrue(notifications.isEmpty());
-  }
-  
-  @Test
-  public void testValueMonitorListener() throws Exception {
-    final long startTime = System.currentTimeMillis();
-    TestStatisticsManager manager = new TestStatisticsManager(
-        1, 
-        "ValueMonitorJUnitTest", 
-        startTime);
-    StatisticsSampler sampler = new TestStatisticsSampler(manager);
-    
-    final StatArchiveHandlerConfig mockStatArchiveHandlerConfig = 
-        this.mockContext.mock(StatArchiveHandlerConfig.class, "testFoo$StatArchiveHandlerConfig");
-    this.mockContext.checking(new Expectations() {{
-      allowing(mockStatArchiveHandlerConfig).getArchiveFileName();
-      will(returnValue(new File("")));
-      allowing(mockStatArchiveHandlerConfig).getArchiveFileSizeLimit();
-      will(returnValue(0));
-      allowing(mockStatArchiveHandlerConfig).getArchiveDiskSpaceLimit();
-      will(returnValue(0));
-      allowing(mockStatArchiveHandlerConfig).getSystemId();
-      will(returnValue(1));
-      allowing(mockStatArchiveHandlerConfig).getSystemStartTime();
-      will(returnValue(startTime));
-      allowing(mockStatArchiveHandlerConfig).getSystemDirectoryPath();
-      will(returnValue(""));
-      allowing(mockStatArchiveHandlerConfig).getProductDescription();
-      will(returnValue("testFoo"));
-    }});
-    
-    SampleCollector sampleCollector = new SampleCollector(sampler);
-    sampleCollector.initialize(mockStatArchiveHandlerConfig, NanoTimer.getTime());
-
-    StatisticDescriptor[] statsST1 = new StatisticDescriptor[] {
-        manager.createDoubleCounter("double_counter_1", "double_counter_1_desc", "double_counter_1_units"),
-        manager.createIntCounter(   "int_counter_2",    "int_counter_2_desc",    "int_counter_2_units"),
-        manager.createLongCounter(  "long_counter_3",   "long_counter_3_desc",   "long_counter_3_units")
-    };
-    StatisticsType ST1 = manager.createType("ST1_name", "ST1_desc", statsST1);
-    Statistics st1_1 = manager.createAtomicStatistics(ST1, "st1_1_text", 1);
-    Statistics st1_2 = manager.createAtomicStatistics(ST1, "st1_2_text", 2);
-    
-    st1_1.incDouble("double_counter_1", 1000.0001);
-    st1_1.incInt("int_counter_2", 2);
-    st1_1.incLong("long_counter_3", 3333333333L);
-    
-    st1_2.incDouble("double_counter_1", 2000.0002);
-    st1_2.incInt("int_counter_2", 3);
-    st1_2.incLong("long_counter_3", 4444444444L);
-    
-    final List<StatisticsNotification> notifications = new ArrayList<StatisticsNotification>();
-    StatisticsListener listener = new StatisticsListener() {
-      @Override
-      public void handleNotification(StatisticsNotification notification) {
-        notifications.add(notification);
-      }
-    };
-    ValueMonitor monitor = new ValueMonitor().addStatistics(st1_1);
-    monitor.addListener(listener);
-    
-    assertTrue("Unexpected notifications: " + notifications, notifications.isEmpty());
-
-    long timeStamp = NanoTimer.getTime();
-    sampleCollector.sample(timeStamp);
-    
-    waitForNotification(notifications, 2*1000, 10, false);
-    assertEquals("Unexpected notifications: " + notifications, 1, notifications.size());
-    
-    StatisticsNotification notification = notifications.remove(0);
-    assertEquals(StatisticsNotification.Type.VALUE_CHANGED, notification.getType());
-    
-    // validate 1 notification occurs with all 3 stats of st1_1
-    
-    st1_1.incDouble("double_counter_1", 1.1);
-    st1_1.incInt("int_counter_2", 2);
-    st1_1.incLong("long_counter_3", 3);
-    
-    timeStamp += NanoTimer.millisToNanos(1000);
-    sampleCollector.sample(timeStamp);
-    
-    waitForNotification(notifications, 2*1000, 10, false);
-    assertEquals("Unexpected notifications: " + notifications, 1, notifications.size());
-    
-    notification = notifications.remove(0);
-    assertEquals(StatisticsNotification.Type.VALUE_CHANGED, notification.getType());
-    
-    int statCount = 0;
-    Map<String, Number> expectedValues = new HashMap<String, Number>();
-    expectedValues.put("double_counter_1", 1001.1001);
-    expectedValues.put("int_counter_2", 4);
-    expectedValues.put("long_counter_3", 3333333336L);
-    
-    for (StatisticId statId : notification) {
-      Number value = expectedValues.remove(statId.getStatisticDescriptor().getName());
-      assertNotNull(value);
-      assertEquals(value, notification.getValue(statId));
-      statCount++;
-    }
-    assertEquals(3, statCount);
-    
-    // validate no notification occurs when no stats are updated
-    
-    timeStamp += NanoTimer.millisToNanos(1000);
-    sampleCollector.sample(timeStamp);
-    
-    waitForNotification(notifications, 2*1000, 10, false);
-    assertTrue("Unexpected notifications: " + notifications, notifications.isEmpty());
-    
-    // validate no notification occurs when only other stats are updated
-    
-    st1_2.incDouble("double_counter_1", 3.3);
-    st1_2.incInt("int_counter_2", 1);
-    st1_2.incLong("long_counter_3", 2);
-    
-    timeStamp += NanoTimer.millisToNanos(1000);
-    sampleCollector.sample(timeStamp);
-    
-    waitForNotification(notifications, 2*1000, 10, false);
-    assertTrue("Unexpected notifications: " + notifications, notifications.isEmpty());
-    
-    // validate notification only contains stats added to monitor
-    
-    st1_1.incInt("int_counter_2", 100);
-    st1_2.incInt("int_counter_2", 200);
-
-    assertEquals(2, sampleCollector.currentHandlersForTesting().size());
-    
-    timeStamp += NanoTimer.millisToNanos(1000);
-    sampleCollector.sample(timeStamp);
-    
-    waitForNotification(notifications, 2*1000, 10, false);
-    assertEquals("Unexpected notifications: " + notifications, 1, notifications.size());
-    
-    notification = notifications.remove(0);
-    assertEquals(StatisticsNotification.Type.VALUE_CHANGED, notification.getType());
-    
-    statCount = 0;
-    expectedValues = new HashMap<String, Number>();
-    expectedValues.put("int_counter_2", 104);
-
-    for (StatisticId statId : notification) {
-      Number value = expectedValues.remove(statId.getStatisticDescriptor().getName());
-      assertNotNull(value);
-      assertEquals(value, notification.getValue(statId));
-      statCount++;
-    }
-    assertEquals(1, statCount);
-  }
-  
-  private StatisticId createStatisticId(final StatisticDescriptor descriptor, final Statistics stats) {
-    return new StatisticId() {
-
-      @Override
-      public StatisticDescriptor getStatisticDescriptor() {
-        return descriptor;
-      }
-
-      @Override
-      public Statistics getStatistics() {
-        return stats;
-      }
-      
-    };
-  }
-  
-  protected StatisticsNotification createStatisticsNotification(final long timeStamp, final Type type, final Number value) {
-    return new StatisticsNotification() {
-
-      @Override
-      public long getTimeStamp() {
-        return timeStamp;
-      }
-
-      @Override
-      public Type getType() {
-        return type;
-      }
-
-      @Override
-      public Iterator<StatisticId> iterator() {
-        return null;
-      }
-
-      @Override
-      public Iterator<StatisticId> iterator(StatisticDescriptor statDesc) {
-        return null;
-      }
-
-      @Override
-      public Iterator<StatisticId> iterator(Statistics statistics) {
-        return null;
-      }
-
-      @Override
-      public Iterator<StatisticId> iterator(StatisticsType statisticsType) {
-        return null;
-      }
-
-      @Override
-      public Number getValue(StatisticId statId) throws StatisticNotFoundException {
-        return value;
-      }
-      
-    };
-  }
-  
-  private static void waitForNotification(final List<StatisticsNotification> notifications, long ms, long interval, boolean throwOnTimeout) {
-    WaitCriterion wc = new WaitCriterion() {
-      @Override
-      public boolean done() {
-        return notifications.size() > 0;
-      }
-      @Override
-      public String description() {
-        return "waiting for notification";
-      }
-    };
-    Wait.waitForCriterion(wc, ms, interval, throwOnTimeout);
-  }
-}



Mime
View raw message