geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [geode] 12/13: GEODE-1279: Rename QueueRemovalMessageProcessingDistributedTest
Date Mon, 23 Apr 2018 05:46:50 GMT
This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 130a13d803d0dc5bf46bba8e1f9f4f408b952135
Author: Kirk Lund <klund@apache.org>
AuthorDate: Fri Apr 20 16:14:36 2018 -0700

    GEODE-1279: Rename QueueRemovalMessageProcessingDistributedTest
    
    * Bug47388DUnitTest -> QueueRemovalMessageProcessingDistributedTest
---
 .../cache/partitioned/Bug47388DUnitTest.java       | 265 --------------------
 ...eueRemovalMessageProcessingDistributedTest.java | 269 +++++++++++++++++++++
 2 files changed, 269 insertions(+), 265 deletions(-)

diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/Bug47388DUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/Bug47388DUnitTest.java
deleted file mode 100644
index 11e0ab1..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/Bug47388DUnitTest.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information
regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain
a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under
the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
- * or implied. See the License for the specific language governing permissions and limitations
under
- * the License.
- */
-package org.apache.geode.internal.cache.partitioned;
-
-import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
-import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
-
-import java.util.Properties;
-
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.ExpirationAction;
-import org.apache.geode.cache.ExpirationAttributes;
-import org.apache.geode.cache.PartitionAttributesFactory;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionFactory;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.client.ClientCacheFactory;
-import org.apache.geode.cache.client.ClientRegionFactory;
-import org.apache.geode.cache.client.ClientRegionShortcut;
-import org.apache.geode.cache.server.CacheServer;
-import org.apache.geode.cache.util.CacheListenerAdapter;
-import org.apache.geode.distributed.DistributedSystem;
-import org.apache.geode.internal.AvailablePort;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.ha.HARegionQueueStats;
-import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
-import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.DistributedTest;
-
-/**
- * The test creates two datastores with a partitioned region, and also running a cache server
each.
- * A publisher client is connected to one server while a subscriber client is connected to
both the
- * servers. The partitioned region has entry expiry set with ttl of 3 seconds and action
as DESTROY.
- * The test ensures that the EXPIRE_DESTROY events are propagated to the subscriber client
and the
- * secondary server does process the QRMs for the EXPIRE_DESTROY events.
- */
-@Category(DistributedTest.class)
-@SuppressWarnings("serial")
-public class Bug47388DUnitTest extends JUnit4DistributedTestCase {
-
-  private static VM vm0 = null;
-  private static VM vm1 = null;
-  private static VM vm2 = null;
-  private static VM vm3 = null;
-
-  private static GemFireCacheImpl cache;
-
-  private static volatile boolean lastKeyDestroyed = false;
-
-  public static final String REGION_NAME = Bug47388DUnitTest.class.getSimpleName() + "_region";
-
-  @Override
-  public final void postSetUp() throws Exception {
-    disconnectFromDS();
-    Host host = Host.getHost(0);
-    vm0 = host.getVM(0); // datastore and server
-    vm1 = host.getVM(1); // datastore and server
-    vm2 = host.getVM(2); // durable client with subscription
-    vm3 = host.getVM(3); // durable client without subscription
-
-    int port0 = (Integer) vm0.invoke(() -> Bug47388DUnitTest.createCacheServerWithPRDatastore());
-    int port1 = (Integer) vm1.invoke(() -> Bug47388DUnitTest.createCacheServerWithPRDatastore());
-
-    vm2.invoke(Bug47388DUnitTest.class, "createClientCache",
-        new Object[] {vm2.getHost(), new Integer[] {port0, port1}, Boolean.TRUE});
-    vm3.invoke(Bug47388DUnitTest.class, "createClientCache",
-        new Object[] {vm3.getHost(), new Integer[] {port0}, Boolean.FALSE});
-
-  }
-
-  @Override
-  public final void preTearDown() throws Exception {
-    closeCache();
-
-    vm2.invoke(() -> Bug47388DUnitTest.closeCache());
-    vm3.invoke(() -> Bug47388DUnitTest.closeCache());
-
-    vm0.invoke(() -> Bug47388DUnitTest.closeCache());
-    vm1.invoke(() -> Bug47388DUnitTest.closeCache());
-  }
-
-  public static void closeCache() throws Exception {
-    if (cache != null) {
-      cache.close();
-    }
-    lastKeyDestroyed = false;
-  }
-
-  @SuppressWarnings("deprecation")
-  public static Integer createCacheServerWithPRDatastore() throws Exception {
-    Properties props = new Properties();
-    Bug47388DUnitTest test = new Bug47388DUnitTest();
-    DistributedSystem ds = test.getSystem(props);
-    ds.disconnect();
-    cache = (GemFireCacheImpl) CacheFactory.create(test.getSystem());
-
-    RegionFactory<String, String> rf = cache.createRegionFactory(RegionShortcut.PARTITION);
-
-    rf.setEntryTimeToLive(new ExpirationAttributes(3, ExpirationAction.DESTROY))
-        .setPartitionAttributes(new PartitionAttributesFactory<String, String>()
-            .setRedundantCopies(1).setTotalNumBuckets(4).create())
-        .setConcurrencyChecksEnabled(false);
-
-    rf.create(REGION_NAME);
-
-    CacheServer server = cache.addCacheServer();
-    server.setPort(AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET));
-    server.start();
-    return server.getPort();
-  }
-
-  @SuppressWarnings("deprecation")
-  public static void createClientCache(Host host, Integer[] ports, Boolean doRI) throws Exception
{
-    Properties props = new Properties();
-    props.setProperty(DURABLE_CLIENT_ID, "my-durable-client-" + ports.length);
-    props.setProperty(DURABLE_CLIENT_TIMEOUT, "300000");
-
-    DistributedSystem ds = new Bug47388DUnitTest().getSystem(props);
-    ds.disconnect();
-    ClientCacheFactory ccf = new ClientCacheFactory(props);
-    ccf.setPoolSubscriptionEnabled(doRI);
-    ccf.setPoolSubscriptionAckInterval(50);
-    ccf.setPoolSubscriptionRedundancy(1);
-    for (int port : ports) {
-      ccf.addPoolServer(host.getHostName(), port);
-    }
-    cache = (GemFireCacheImpl) ccf.create();
-
-    ClientRegionFactory<String, String> crf =
-        cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
-
-    if (doRI) {
-      crf.addCacheListener(new CacheListenerAdapter<String, String>() {
-        @Override
-        public void afterDestroy(EntryEvent<String, String> event) {
-          if (event.getKey().equalsIgnoreCase("LAST_KEY")) {
-            lastKeyDestroyed = true;
-          }
-        }
-      });
-    }
-
-    Region<String, String> region = crf.create(REGION_NAME);
-
-    if (doRI) {
-      region.registerInterest("ALL_KEYS", true);
-      cache.readyForEvents();
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public static void doPuts(Integer numOfSets, Integer numOfPuts) throws Exception {
-    Region<String, String> region = cache.getRegion(REGION_NAME);
-
-    for (int i = 0; i < numOfSets; i++) {
-      for (int j = 0; j < numOfPuts; j++) {
-        region.put("KEY_" + i + "_" + j, "VALUE_" + j);
-      }
-    }
-    region.put("LAST_KEY", "LAST_KEY");
-  }
-
-  public static Boolean isPrimaryServer() {
-    return ((CacheClientProxy) CacheClientNotifier.getInstance().getClientProxies().toArray()[0])
-        .isPrimary();
-  }
-
-  public static void verifyClientSubscriptionStats(final Boolean isPrimary, final Integer
events)
-      throws Exception {
-
-    WaitCriterion wc = new WaitCriterion() {
-      private long dispatched;
-      private long qrmed;
-
-      @Override
-      public boolean done() {
-        HARegionQueueStats stats =
-            ((CacheClientProxy) CacheClientNotifier.getInstance().getClientProxies().toArray()[0])
-                .getHARegionQueue().getStatistics();
-
-        final int numOfEvents;
-        if (!isPrimary) {
-          numOfEvents = events - 1; // No marker
-        } else {
-          numOfEvents = events;
-        }
-
-        if (isPrimary) {
-          this.dispatched = stats.getEventsDispatched();
-          return numOfEvents == this.dispatched;
-        } else {
-          this.qrmed = stats.getEventsRemovedByQrm();
-          return this.qrmed == numOfEvents || (this.qrmed + 1) == numOfEvents;
-          // Why +1 above? Because sometimes(TODO: explain further) there may
-          // not be any QRM sent to the secondary for the last event dispatched
-          // at primary.
-        }
-      }
-
-      @Override
-      public String description() {
-        return "Expected events: " + events + " but actual eventsDispatched: " + this.dispatched
-            + " and actual eventsRemovedByQrm: " + this.qrmed;
-      }
-    };
-
-    Wait.waitForCriterion(wc, 60 * 1000, 500, true);
-  }
-
-  public static void waitForLastKeyDestroyed() throws Exception {
-    WaitCriterion wc = new WaitCriterion() {
-      @Override
-      public boolean done() {
-        return lastKeyDestroyed;
-      }
-
-      @Override
-      public String description() {
-        return "Last key's destroy not received";
-      }
-
-    };
-
-    Wait.waitForCriterion(wc, 60 * 1000, 500, true);
-  }
-
-  @Ignore("TODO: test is disabled due to bug51931")
-  @Test
-  public void testQRMOfExpiredEventsProcessedSuccessfully() throws Exception {
-    int numOfSets = 2, numOfPuts = 5;
-    int totalEvents = 23; // = (numOfSets * numOfPuts) * 2 [eviction-destroys] +
-                          // 2 [last key's put and eviction-destroy] + 1 [marker
-                          // message]
-    vm3.invoke(() -> Bug47388DUnitTest.doPuts(numOfSets, numOfPuts));
-
-    boolean isvm0Primary = (Boolean) vm0.invoke(() -> Bug47388DUnitTest.isPrimaryServer());
-
-    vm2.invoke(() -> Bug47388DUnitTest.waitForLastKeyDestroyed());
-
-    vm0.invoke(() -> Bug47388DUnitTest.verifyClientSubscriptionStats(isvm0Primary, totalEvents));
-    vm1.invoke(() -> Bug47388DUnitTest.verifyClientSubscriptionStats(!isvm0Primary, totalEvents));
-  }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/QueueRemovalMessageProcessingDistributedTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/QueueRemovalMessageProcessingDistributedTest.java
new file mode 100644
index 0000000..52e6eb0
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/QueueRemovalMessageProcessingDistributedTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
+import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.ExpirationAction;
+import org.apache.geode.cache.ExpirationAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.internal.cache.ha.HARegionQueueStats;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+/**
+ * The test creates two datastores with a partitioned region, and also running a cache server
each.
+ * A publisher client is connected to one server while a subscriber client is connected to
both the
+ * servers. The partitioned region has entry expiry set with ttl of 3 seconds and action
as DESTROY.
+ * The test ensures that the EXPIRE_DESTROY events are propagated to the subscriber client
and the
+ * secondary server does process the QRMs for the EXPIRE_DESTROY events.
+ */
+@Category(DistributedTest.class)
+@SuppressWarnings("serial")
+public class QueueRemovalMessageProcessingDistributedTest implements Serializable {
+
+  private static final int TOTAL_NUM_BUCKETS = 4;
+
+  private static final AtomicInteger destroyedCount = new AtomicInteger();
+
+  private String regionName;
+  private String hostName;
+
+  private VM primary;
+  private VM secondary;
+
+  private VM server1;
+  private VM server2;
+  private VM client1;
+  private VM client2;
+
+  @Rule
+  public DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Before
+  public void setUp() throws Exception {
+    server1 = getVM(0); // datastore and server
+    server2 = getVM(1); // datastore and server
+    client1 = getVM(2); // durable client with subscription
+    client2 = getVM(3); // durable client without subscription
+
+    regionName = getClass().getSimpleName() + "_" + testName.getMethodName();
+    hostName = getHostName();
+
+    int port1 = server1.invoke(() -> createCacheServerWithPRDatastore());
+    int port2 = server2.invoke(() -> createCacheServerWithPRDatastore());
+
+    client1.invoke(() -> createClientCacheWithRI(port1, port2, "client1"));
+    client2.invoke(() -> createClientCache(port1, "client2"));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    destroyedCount.set(0);
+    invokeInEveryVM(() -> {
+      destroyedCount.set(0);
+    });
+  }
+
+  @Test
+  public void testQRMOfExpiredEventsProcessedSuccessfully() throws Exception {
+    int putCount = 10;
+
+    // totalEvents = putCount * 2 [eviction-destroys] + 1 [marker message]
+    int totalEvents = putCount * 2 + 1;
+
+    client2.invoke(() -> doPuts(putCount));
+
+    identifyPrimaryServer();
+
+    client1.invoke(() -> await().atMost(1, MINUTES).until(() -> destroyedCount.get()
== 10));
+
+    primary.invoke(() -> verifyClientSubscriptionStatsOnPrimary(totalEvents));
+    secondary.invoke(() -> verifyClientSubscriptionStatsOnSecondary(totalEvents));
+  }
+
+  private int createCacheServerWithPRDatastore() throws IOException {
+    cacheRule.createCache();
+
+    PartitionAttributesFactory<String, String> paf = new PartitionAttributesFactory<>();
+    paf.setRedundantCopies(1);
+    paf.setTotalNumBuckets(TOTAL_NUM_BUCKETS);
+
+    RegionFactory<String, String> rf =
+        cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION);
+    rf.setConcurrencyChecksEnabled(false);
+    rf.setEntryTimeToLive(new ExpirationAttributes(3, ExpirationAction.DESTROY));
+    rf.setPartitionAttributes(paf.create());
+
+    rf.create(regionName);
+
+    CacheServer server = cacheRule.getCache().addCacheServer();
+    server.setPort(0);
+    server.start();
+    return server.getPort();
+  }
+
+  private void createClientCacheWithRI(int port1, int port2, String durableClientId) {
+    Properties config = new Properties();
+    config.setProperty(DURABLE_CLIENT_ID, durableClientId);
+    config.setProperty(DURABLE_CLIENT_TIMEOUT, "300000");
+
+    ClientCacheFactory ccf = new ClientCacheFactory(config);
+    ccf.addPoolServer(hostName, port1);
+    ccf.addPoolServer(hostName, port2);
+    ccf.setPoolSubscriptionAckInterval(50);
+    ccf.setPoolSubscriptionEnabled(true);
+    ccf.setPoolSubscriptionRedundancy(1);
+
+    clientCacheRule.createClientCache(ccf);
+
+    ClientRegionFactory<String, String> crf = clientCacheRule.getClientCache()
+        .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
+
+    crf.addCacheListener(new CacheListenerAdapter<String, String>() {
+      @Override
+      public void afterDestroy(EntryEvent<String, String> event) {
+        destroyedCount.incrementAndGet();
+      }
+    });
+
+    Region<String, String> region = crf.create(regionName);
+
+    region.registerInterest("ALL_KEYS", true);
+    clientCacheRule.getClientCache().readyForEvents();
+  }
+
+  private void createClientCache(int port, String durableClientId) {
+    Properties config = new Properties();
+    config.setProperty(DURABLE_CLIENT_ID, durableClientId);
+    config.setProperty(DURABLE_CLIENT_TIMEOUT, "300000");
+
+    ClientCacheFactory ccf = new ClientCacheFactory(config);
+    ccf.setPoolSubscriptionAckInterval(50);
+    ccf.setPoolSubscriptionRedundancy(1);
+    ccf.addPoolServer(hostName, port);
+
+    clientCacheRule.createClientCache(ccf);
+
+    ClientRegionFactory<String, String> crf = clientCacheRule.getClientCache()
+        .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
+
+    crf.create(regionName);
+  }
+
+  private void doPuts(int putCount) {
+    Region<String, String> region = clientCacheRule.getClientCache().getRegion(regionName);
+
+    for (int i = 1; i <= putCount; i++) {
+      region.put("KEY-" + i, "VALUE-" + i);
+    }
+  }
+
+  private void verifyClientSubscriptionStatsOnPrimary(final int eventsCount) {
+    await().atMost(1, MINUTES).until(() -> allEventsHaveBeenDispatched(eventsCount));
+  }
+
+  private boolean allEventsHaveBeenDispatched(final int eventsCount) {
+    HARegionQueueStats stats = getCacheClientProxy().getHARegionQueue().getStatistics();
+
+    int numOfEvents = eventsCount;
+    long dispatched = stats.getEventsDispatched();
+
+    return numOfEvents == dispatched;
+  }
+
+  private void verifyClientSubscriptionStatsOnSecondary(final int eventsCount) {
+    await().atMost(1, MINUTES).until(() -> {
+      HARegionQueueStats stats = getCacheClientProxy().getHARegionQueue().getStatistics();
+
+      int numOfEvents = eventsCount - 1; // No marker
+
+      long qrmed = stats.getEventsRemovedByQrm();
+
+      return qrmed == numOfEvents || (qrmed + 1) == numOfEvents;
+      // Why +1 above? Because sometimes(TODO: explain further) there may
+      // not be any QRM sent to the secondary for the last event dispatched
+      // at primary.
+    });
+  }
+
+  private void identifyPrimaryServer() {
+    boolean primaryIsServer1 = server1.invoke(() -> isPrimaryServerForClient());
+
+    assertThat(primaryIsServer1).isNotEqualTo(server2.invoke(() -> isPrimaryServerForClient()));
+
+    if (primaryIsServer1) {
+      primary = server1;
+      secondary = server2;
+    } else {
+      primary = server2;
+      secondary = server1;
+    }
+
+    assertThat(primary).isNotNull();
+    assertThat(secondary).isNotNull();
+    assertThat(primary).isNotEqualTo(secondary);
+  }
+
+  private static CacheClientProxy getCacheClientProxy() {
+    return (CacheClientProxy) CacheClientNotifier.getInstance().getClientProxies().toArray()[0];
+  }
+
+  private static boolean isPrimaryServerForClient() {
+    return getCacheClientProxy().isPrimary();
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
klund@apache.org.

Mime
View raw message