geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [geode] 02/02: GEODE-925: Replaced all Wait.pause with Awaitility. Replaced all ThreadUtils.join with an Awaitility. General code and lambda clean up Modify test to use test rules remove TODO from test
Date Tue, 24 Apr 2018 20:48:24 GMT
This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch feature/GEODE-925
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 53327dcbf5cb61f29ff044e7d8c8219a86f5c0ac
Author: Udo Kohlmeyer <ukohlmeyer@pivotal.io>
AuthorDate: Thu Apr 19 10:55:46 2018 -0700

    GEODE-925: Replaced all Wait.pause with Awaitility. Replaced all ThreadUtils.join with
an Awaitility. General code and lambda clean up
    Modify test to use test rules
    remove TODO from test
---
 .../dunit/QueryDataInconsistencyDUnitTest.java     | 745 +++++++++------------
 1 file changed, 324 insertions(+), 421 deletions(-)

diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
index 20d3a78..3e6c192 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
@@ -14,10 +14,23 @@
  */
 package org.apache.geode.cache.query.dunit;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.fail;
 
+import java.io.Serializable;
 import java.util.Properties;
-
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -39,18 +52,20 @@ import org.apache.geode.cache.query.data.Position;
 import org.apache.geode.cache.query.internal.QueryObserverHolder;
 import org.apache.geode.cache.query.internal.index.IndexManager;
 import org.apache.geode.cache30.CacheSerializableRunnable;
+import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.cache.execute.PRClientServerTestBase;
+import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.Invoke;
-import org.apache.geode.test.dunit.LogWriterUtils;
 import org.apache.geode.test.dunit.SerializableRunnable;
-import org.apache.geode.test.dunit.ThreadUtils;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
 import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.CleanupDUnitVMsRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.apache.geode.test.junit.categories.FlakyTest;
 import org.apache.geode.test.junit.categories.OQLIndexTest;
@@ -59,530 +74,418 @@ import org.apache.geode.test.junit.categories.OQLIndexTest;
  * This tests the data inconsistency during update on an index and querying the same UNLOCKED
index.
  */
 @Category({DistributedTest.class, OQLIndexTest.class})
-public class QueryDataInconsistencyDUnitTest extends JUnit4CacheTestCase {
+public class QueryDataInconsistencyDUnitTest implements Serializable {
 
   private static final int cnt = 0;
-
   private static final int cntDest = 10;
+  private static VM server = null;
+  private static String PartitionedRegionName1 = "TestPartitionedRegion1"; // default name
+  private static String repRegionName = "TestRepRegion"; // default name
+  private static volatile boolean hooked = false;
 
-  static VM server = null;
-
-  static VM client = null;
-
-  static Cache cache = null;
-
-  static String PartitionedRegionName1 = "TestPartitionedRegion1"; // default
-                                                                   // name
-  static String repRegionName = "TestRepRegion"; // default name
-
-  static Integer serverPort1 = null;
-
-  public static int numOfBuckets = 20;
-
-  public static String[] queries =
-      new String[] {"select * from /" + PartitionedRegionName1 + " where ID=1",};
+  private Logger logger = LogService.getLogger(QueryDataInconsistencyDUnitTest.class.getName());
 
-  public static String[] queriesForRR =
-      new String[] {"<trace> select * from /" + repRegionName + " where ID=1"};
+  @ClassRule
+  public static DistributedTestRule distributedTestRule = new DistributedTestRule(1);
 
-  public static volatile boolean hooked = false;
+  @Rule
+  public CacheRule cacheRule = CacheRule.builder().createCacheInAll().disconnectAfter().build();
 
-  public QueryDataInconsistencyDUnitTest() {
-    super();
+  @Before
+  public void initialize()
+  {
+    server = Host.getHost(0).getVM(0);
   }
 
-  @Override
-  public final void postTearDownCacheTestCase() throws Exception {
-    Invoke.invokeInEveryVM(JUnit4DistributedTestCase::disconnectFromDS);
+  @After
+  public final void postTearDownCacheTestCase() {
     Invoke.invokeInEveryVM(QueryObserverHolder::reset);
   }
 
-  @Override
-  public final void postSetUp() throws Exception {
-    Host host = Host.getHost(0);
-    server = host.getVM(0);
+  @BeforeClass
+  public static final void postSetUp() {
+    Awaitility.setDefaultPollInterval(200, TimeUnit.MILLISECONDS);
+    Awaitility.waitAtMost(30, TimeUnit.SECONDS);
   }
 
   @Test
   public void testCompactRangeIndex() {
     // Create caches
-    Properties props = new Properties();
-    server.invoke(() -> PRClientServerTestBase.createCacheInVm(props));
+    server.invoke("create indexes", () -> {
+      Cache cache = cacheRule.getCache();
+      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
 
-    server.invoke(new CacheSerializableRunnable("create indexes") {
-
-      @Override
-      public void run2() throws CacheException {
-        cache = CacheFactory.getAnyInstance();
-        Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
-
-        // Create common Portflios and NewPortfolios
-        for (int j = cnt; j < cntDest; j++) {
-          region.put(new Integer(j), new Portfolio(j));
-        }
+      // Create common Portflios and NewPortfolios
+      for (int j = cnt; j < cntDest; j++) {
+        region.put(new Integer(j), new Portfolio(j));
+      }
 
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        try {
-          Index index = qs.createIndex("idIndex", "ID", "/" + repRegionName);
-          assertEquals(10, index.getStatistics().getNumberOfKeys());
-        } catch (Exception e) {
-          fail("Index creation failed");
-        }
+      QueryService queryService = cache.getQueryService();
+      try {
+        Index index = queryService.createIndex("idIndex", "ID", "/" + repRegionName);
+        assertEquals(10, index.getStatistics().getNumberOfKeys());
+      } catch (Exception e) {
+        logger.error(e);
+        fail("Index creation failed");
       }
     });
     // Invoke update from client and stop in updateIndex
-    // first before updating the RegionEntry and second after updating
+    // firesultSett before updating the RegionEntry and second after updating
     // the RegionEntry.
-    AsyncInvocation putThread =
-        server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
-
-          @Override
-          public void run2() throws CacheException {
-            Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
-            IndexManager.testHook = new IndexManagerTestHook();
-            repRegion.put(new Integer("1"), new Portfolio(cntDest + 1));
-            // above call must be hooked in BEFORE_UPDATE_OP call.
-          }
-        });
-    server.invoke(new CacheSerializableRunnable("query on server") {
-
-      @Override
-      public void run2() throws CacheException {
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        while (!hooked) {
-          Wait.pause(100);
-        }
-        Object rs = null;
-        try {
-          rs = qs.newQuery("<trace> select * from /" + repRegionName + " where ID =
1").execute();
-        } catch (Exception e) {
-          e.printStackTrace();
-          fail("Query execution failed on server.");
-          IndexManager.testHook = null;
-        }
-        assertTrue(rs instanceof SelectResults);
-        assertEquals(1, ((SelectResults) rs).size());
-        Portfolio p1 = (Portfolio) ((SelectResults) rs).asList().get(0);
-        if (p1.getID() != 1) {
-          fail("Query thread did not verify index results even when RE is under update");
-          IndexManager.testHook = null;
-        }
-        hooked = false;// Let client put go further.
+    AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
+      Region repRegion = cacheRule.getCache().getRegion(repRegionName);
+      IndexManager.testHook = new IndexManagerTestHook();
+      repRegion.put(new Integer("1"), new Portfolio(cntDest + 1));
+      // above call must be hooked in BEFORE_UPDATE_OP call.
+    });
+    server.invoke("query on server", () -> {
+      QueryService queryService = cacheRule.getCache().getQueryService();
+      Awaitility.await().until(() -> hooked);
+      Object resultSet = null;
+      try {
+        resultSet =
+            queryService.newQuery("<trace> select * from /" + repRegionName + " where
ID = 1")
+                .execute();
+      } catch (Exception e) {
+        logger.error(e);
+        fail("Query execution failed on server.");
+        IndexManager.testHook = null;
       }
+      assertTrue(resultSet instanceof SelectResults);
+      assertEquals(1, ((SelectResults) resultSet).size());
+      Portfolio p1 = (Portfolio) ((SelectResults) resultSet).asList().get(0);
+      if (p1.getID() != 1) {
+        fail("Query thread did not verify index results even when RE is under update");
+        IndexManager.testHook = null;
+      }
+      hooked = false;// Let client put go further.
     });
 
     // Client put is again hooked in AFTER_UPDATE_OP call in updateIndex.
-    server.invoke(new CacheSerializableRunnable("query on server") {
-
-      @Override
-      public void run2() throws CacheException {
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        while (!hooked) {
-          Wait.pause(100);
-        }
-        Object rs = null;
-        try {
-          rs = qs.newQuery("<trace> select * from /" + repRegionName + " where ID =
1").execute();
-        } catch (Exception e) {
-          e.printStackTrace();
-          fail("Query execution failed on server." + e.getMessage());
-        } finally {
+    server.invoke("query on server", () -> {
+      QueryService queryService = cacheRule.getCache().getQueryService();
+      Awaitility.await().until(() -> hooked);
+      Object resultSet = null;
+      try {
+        resultSet =
+            queryService.newQuery("<trace> select * from /" + repRegionName + " where
ID = 1")
+                .execute();
+      } catch (Exception e) {
+        logger.error(e);
+        fail("Query execution failed on server." + e.getMessage());
+      } finally {
+        IndexManager.testHook = null;
+      }
+      assertTrue(resultSet instanceof SelectResults);
+      if (((SelectResults) resultSet).size() > 0) {
+        Portfolio p1 = (Portfolio) ((SelectResults) resultSet).iterator().next();
+        if (p1.getID() != 1) {
+          fail("Query thread did not verify index results even when RE is under update and
"
+              + "RegionEntry value has been modified before releasing the lock");
           IndexManager.testHook = null;
         }
-        assertTrue(rs instanceof SelectResults);
-        if (((SelectResults) rs).size() > 0) {
-          Portfolio p1 = (Portfolio) ((SelectResults) rs).iterator().next();
-          if (p1.getID() != 1) {
-            fail("Query thread did not verify index results even when RE is under update
and "
-                + "RegionEntry value has been modified before releasing the lock");
-            IndexManager.testHook = null;
-          }
-        }
-        hooked = false;// Let client put go further.
       }
+      hooked = false;// Let client put go further.
     });
-    ThreadUtils.join(putThread, 200);
+    Awaitility.await().until(joinThread(putThread));
+    // ThreadUtils.join(putThread, 200);
   }
 
   @Test
   public void testRangeIndex() {
     // Create caches
-    Properties props = new Properties();
-    server.invoke(() -> PRClientServerTestBase.createCacheInVm(props));
-
-    server.invoke(new CacheSerializableRunnable("create indexes") {
-
-      @Override
-      public void run2() throws CacheException {
-        cache = CacheFactory.getAnyInstance();
-        Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
-        IndexManager.testHook = null;
-        // Create common Portfolios and NewPortfolios
-        Position.cnt = 0;
-        for (int j = cnt; j < cntDest; j++) {
-          Portfolio p = new Portfolio(j);
-          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: portfolio " + j + " :
" + p);
-          region.put(new Integer(j), p);
-        }
+    server.invoke("create indexes", () -> {
+      Cache cache = cacheRule.getCache();
+      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
+      IndexManager.testHook = null;
+      // Create common Portfolios and NewPortfolios
+      Position.cnt = 0;
+      for (int j = cnt; j < cntDest; j++) {
+        Portfolio p = new Portfolio(j);
+        cache.getLogger().fine("Shobhit: portfolio " + j + " : " + p);
+        region.put(j, p);
+      }
 
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        try {
-          Index index = qs.createIndex("posIndex", "pos.secId",
-              "/" + repRegionName + " p, p.positions.values pos");
-          assertEquals(12, index.getStatistics().getNumberOfKeys());
-        } catch (Exception e) {
-          fail("Index creation failed");
-        }
+      QueryService queryService = cache.getQueryService();
+      try {
+        Index index = queryService.createIndex("posIndex", "pos.secId",
+            "/" + repRegionName + " p, p.positions.values pos");
+        assertEquals(12, index.getStatistics().getNumberOfKeys());
+      } catch (Exception e) {
+        logger.error(e);
+        fail("Index creation failed");
       }
     });
     // Invoke update from client and stop in updateIndex
-    // first before updating the RegionEntry and second after updating
+    // firesultSett before updating the RegionEntry and second after updating
     // the RegionEntry.
-    AsyncInvocation putThread =
-        server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
-
-          @Override
-          public void run2() throws CacheException {
-            Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
-            IndexManager.testHook = new IndexManagerTestHook();
-            Portfolio newPort = new Portfolio(cntDest + 1);
-            CacheFactory.getAnyInstance().getLogger().fine("Shobhit: New Portfolio" + newPort);
-            repRegion.put(new Integer("1"), newPort);
-            // above call must be hooked in BEFORE_UPDATE_OP call.
-          }
-        });
-
-    server.invoke(new CacheSerializableRunnable("query on server") {
+    AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
+      Cache cache = cacheRule.getCache();
+      Region repRegion = cache.getRegion(repRegionName);
+      IndexManager.testHook = new IndexManagerTestHook();
+      Portfolio newPort = new Portfolio(cntDest + 1);
+      cache.getLogger().fine("Shobhit: New Portfolio" + newPort);
+      repRegion.put(new Integer("1"), newPort);
+      // above call must be hooked in BEFORE_UPDATE_OP call.
+    });
 
-      @Override
-      public void run2() throws CacheException {
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        Position pos1 = null;
-        while (!hooked) {
-          Wait.pause(100);
-        }
-        try {
-          Object rs = qs.newQuery("<trace> select pos from /" + repRegionName
-              + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
-          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
-          assertTrue(rs instanceof SelectResults);
-          pos1 = (Position) ((SelectResults) rs).iterator().next();
-          if (!pos1.secId.equals("APPL")) {
-            fail("Query thread did not verify index results even when RE is under update");
-            IndexManager.testHook = null;
-          }
-        } catch (Exception e) {
-          e.printStackTrace();
-          Assert.fail("Query execution failed on server.", e);
+    server.invoke("query on server", () -> {
+      Cache cache = cacheRule.getCache();
+      QueryService queryService = cache.getQueryService();
+      Position pos1 = null;
+      Awaitility.await().until(() -> hooked);
+      try {
+        Object resultSet = queryService.newQuery("<trace> select pos from /" + repRegionName
+            + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
+        cache.getLogger().fine("Shobhit: " + resultSet);
+        assertTrue(resultSet instanceof SelectResults);
+        pos1 = (Position) ((SelectResults) resultSet).iterator().next();
+        if (!pos1.secId.equals("APPL")) {
+          fail("Query thread did not verify index results even when RE is under update");
           IndexManager.testHook = null;
-        } finally {
-          hooked = false;// Let client put go further.
         }
-        while (!hooked) {
-          Wait.pause(100);
-        }
-        try {
-          Object rs = qs.newQuery("<trace> select pos from /" + repRegionName
-              + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
-          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
-          assertTrue(rs instanceof SelectResults);
-          if (((SelectResults) rs).size() > 0) {
-            Position pos2 = (Position) ((SelectResults) rs).iterator().next();
-            if (pos2.equals(pos1)) {
-              fail("Query thread did not verify index results even when RE is under update
and "
-                  + "RegionEntry value has been modified before releasing the lock");
-            }
+      } catch (Exception e) {
+        logger.error(e);
+        fail("Query execution failed on server.", e);
+        IndexManager.testHook = null;
+      } finally {
+        hooked = false;// Let client put go further.
+      }
+      Awaitility.await().until(() -> hooked);
+      try {
+        Object resultSet = queryService.newQuery("<trace> select pos from /" + repRegionName
+            + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
+        cache.getLogger().fine("Shobhit: " + resultSet);
+        assertTrue(resultSet instanceof SelectResults);
+        if (((SelectResults) resultSet).size() > 0) {
+          Position pos2 = (Position) ((SelectResults) resultSet).iterator().next();
+          if (pos2.equals(pos1)) {
+            fail("Query thread did not verify index results even when RE is under update
and "
+                + "RegionEntry value has been modified before releasing the lock");
           }
-        } catch (Exception e) {
-          e.printStackTrace();
-          fail("Query execution failed on server.");
-        } finally {
-          hooked = false;// Let client put go further.
-          IndexManager.testHook = null;
         }
+      } catch (Exception e) {
+        logger.error(e);
+        fail("Query execution failed on server.");
+      } finally {
+        hooked = false;// Let client put go further.
+        IndexManager.testHook = null;
       }
     });
-    ThreadUtils.join(putThread, 200);
+    Awaitility.await().until(joinThread(putThread));
   }
 
   @Category(FlakyTest.class) // GEODE-925: time sensitive, async actions, short timeouts
   @Test
-  public void testRangeIndexWithIndexAndQueryFromCluaseMisMatch() { // TODO: fix misspelling
+  public void testRangeIndexWithIndexAndQueryFromClauseMisMatch() {
     // Create caches
-    Properties props = new Properties();
-    server.invoke(() -> PRClientServerTestBase.createCacheInVm(props));
-
-    server.invoke(new CacheSerializableRunnable("create indexes") {
-
-      @Override
-      public void run2() throws CacheException {
-        cache = CacheFactory.getAnyInstance();
-        Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
-        IndexManager.testHook = null;
-        // Create common Portfolios and NewPortfolios
-        Position.cnt = 0;
-        for (int j = cnt; j < cntDest; j++) {
-          region.put(new Integer(j), new Portfolio(j));
-        }
+    server.invoke("create indexes", () -> {
+      Cache cache = cacheRule.getCache();
+      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
+      IndexManager.testHook = null;
+      // Create common Portfolios and NewPortfolios
+      Position.cnt = 0;
+      for (int j = cnt; j < cntDest; j++) {
+        region.put(j, new Portfolio(j));
+      }
 
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        try {
-          Index index = qs.createIndex("posIndex", "pos.secId", "/" + repRegionName
-              + " p, p.collectionHolderMap.values coll, p.positions.values pos");
-          assertEquals(12, index.getStatistics().getNumberOfKeys());
-        } catch (Exception e) {
-          fail("Index creation failed");
-        }
+      QueryService queryService = cache.getQueryService();
+      try {
+        Index index = queryService.createIndex("posIndex", "pos.secId",
+            "/" + repRegionName + " p, p.collectionHolderMap.values coll, p.positions.values
pos");
+        assertEquals(12, index.getStatistics().getNumberOfKeys());
+      } catch (Exception e) {
+        logger.error(e);
+        fail("Index creation failed");
       }
     });
     // Invoke update from client and stop in updateIndex
-    // first before updating the RegionEntry and second after updating
+    // firesultSett before updating the RegionEntry and second after updating
     // the RegionEntry.
-    AsyncInvocation putThread =
-        server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
-
-          @Override
-          public void run2() throws CacheException {
-            Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
-            IndexManager.testHook = new IndexManagerTestHook();
-            // This portfolio with same ID must have different positions.
-            repRegion.put(new Integer("1"), new Portfolio(1));
-            // above call must be hooked in BEFORE_UPDATE_OP call.
-          }
-        });
-
-    server.invoke(new CacheSerializableRunnable("query on server") {
+    AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
+      Region repRegion = cacheRule.getCache().getRegion(repRegionName);
+      IndexManager.testHook = new IndexManagerTestHook();
+      // This portfolio with same ID must have different positions.
+      repRegion.put(new Integer("1"), new Portfolio(1));
+      // above call must be hooked in BEFORE_UPDATE_OP call.
+    });
 
-      @Override
-      public void run2() throws CacheException {
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        Position pos1 = null;
-        while (!hooked) {
-          Wait.pause(100);
-        }
-        try {
-          Object rs = qs.newQuery("<trace> select pos from /" + repRegionName
-              + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
-          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
-          assertTrue(rs instanceof SelectResults);
-          pos1 = (Position) ((SelectResults) rs).iterator().next();
-          if (!pos1.secId.equals("APPL")) {
-            fail("Query thread did not verify index results even when RE is under update");
-            IndexManager.testHook = null;
-          }
-        } catch (Exception e) {
-          e.printStackTrace();
-          Assert.fail("Query execution failed on server.", e);
+    server.invoke("query on server", () -> {
+      Cache cache = cacheRule.getCache();
+      QueryService queryService = cache.getQueryService();
+      Position pos1 = null;
+      Awaitility.await().until(() -> hooked);
+      try {
+        Object resultSet = queryService.newQuery("<trace> select pos from /" + repRegionName
+            + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
+        cache.getLogger().fine("Shobhit: " + resultSet);
+        assertTrue(resultSet instanceof SelectResults);
+        pos1 = (Position) ((SelectResults) resultSet).iterator().next();
+        if (!pos1.secId.equals("APPL")) {
+          fail("Query thread did not verify index results even when RE is under update");
           IndexManager.testHook = null;
-        } finally {
-          hooked = false;// Let client put go further.
-        }
-        while (!hooked) {
-          Wait.pause(100);
         }
-        try {
-          Object rs = qs.newQuery("select pos from /" + repRegionName
-              + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
-          assertTrue(rs instanceof SelectResults);
-          if (((SelectResults) rs).size() > 0) {
-            Position pos2 = (Position) ((SelectResults) rs).iterator().next();
-            if (pos2.equals(pos1)) {
-              fail("Query thread did not verify index results even when RE is under update
and "
-                  + "RegionEntry value has been modified before releasing the lock");
-            }
+      } catch (Exception e) {
+        logger.error(e);
+        fail("Query execution failed on server.");
+        IndexManager.testHook = null;
+      } finally {
+        hooked = false;// Let client put go further.
+      }
+      Awaitility.await().until(() -> hooked);
+      try {
+        Object resultSet = queryService.newQuery("select pos from /" + repRegionName
+            + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
+        assertTrue(resultSet instanceof SelectResults);
+        if (((SelectResults) resultSet).size() > 0) {
+          Position pos2 = (Position) ((SelectResults) resultSet).iterator().next();
+          if (pos2.equals(pos1)) {
+            fail("Query thread did not verify index results even when RE is under update
and "
+                + "RegionEntry value has been modified before releasing the lock");
           }
-        } catch (Exception e) {
-          e.printStackTrace();
-          fail("Query execution failed on server.");
-        } finally {
-          hooked = false;// Let client put go further.
-          IndexManager.testHook = null;
         }
+      } catch (Exception e) {
+        logger.error(e);
+        fail("Query execution failed on server.");
+      } finally {
+        hooked = false;// Let client put go further.
+        IndexManager.testHook = null;
       }
     });
-    ThreadUtils.join(putThread, 200); // GEODE-925 occurs here and this is very short join
200
-                                      // millis
+    Awaitility.await().until(joinThread(putThread));
+    // ThreadUtils.join(putThread, 200); // GEODE-925 occuresultSet here and this is very
short join 200
+    // millis
   }
 
   @Test
-  public void testRangeIndexWithIndexAndQueryFromCluaseMisMatch2() {
+  public void testRangeIndexWithIndexAndQueryFromClauseMisMatch2() {
     // Create caches
-    Properties props = new Properties();
-    server.invoke(() -> PRClientServerTestBase.createCacheInVm(props));
-
-    server.invoke(new CacheSerializableRunnable("create indexes") {
-
-      @Override
-      public void run2() throws CacheException {
-        cache = CacheFactory.getAnyInstance();
-        Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
-        IndexManager.testHook = null;
-        // Create common Portfolios and NewPortfolios
-        Position.cnt = 0;
-        for (int j = cnt; j < cntDest; j++) {
-          region.put(new Integer(j), new Portfolio(j));
-        }
+    server.invoke("create indexes", () -> {
+      Cache cache = cacheRule.getCache();
+      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
+      IndexManager.testHook = null;
+      // Create common Portfolios and NewPortfolios
+      Position.cnt = 0;
+      for (int j = cnt; j < cntDest; j++) {
+        region.put(new Integer(j), new Portfolio(j));
+      }
 
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        try {
-          Index index = qs.createIndex("posIndex", "pos.secId",
-              "/" + repRegionName + " p, p.positions.values pos");
-          assertEquals(12, index.getStatistics().getNumberOfKeys());
-        } catch (Exception e) {
-          fail("Index creation failed");
-        }
+      QueryService queryService = cache.getQueryService();
+      try {
+        Index index = queryService.createIndex("posIndex", "pos.secId",
+            "/" + repRegionName + " p, p.positions.values pos");
+        assertEquals(12, index.getStatistics().getNumberOfKeys());
+      } catch (Exception e) {
+        logger.error(e);
+        fail("Index creation failed");
       }
     });
     // Invoke update from client and stop in updateIndex
-    // first before updating the RegionEntry and second after updating
+    // firesultSett before updating the RegionEntry and second after updating
     // the RegionEntry.
-    AsyncInvocation putThread =
-        server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
-
-          @Override
-          public void run2() throws CacheException {
-            Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
-            IndexManager.testHook = new IndexManagerTestHook();
-            // This portfolio with same ID must have different positions.
-            repRegion.put(new Integer("1"), new Portfolio(1));
-            // above call must be hooked in BEFORE_UPDATE_OP call.
-          }
-        });
-
-    server.invoke(new CacheSerializableRunnable("query on server") {
+    AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
+      Cache cache = cacheRule.getCache();
+      Region repRegion = cache.getRegion(repRegionName);
+      IndexManager.testHook = new IndexManagerTestHook();
+      // This portfolio with same ID must have different positions.
+      repRegion.put(new Integer("1"), new Portfolio(1));
+      // above call must be hooked in BEFORE_UPDATE_OP call.
+    });
 
-      @Override
-      public void run2() throws CacheException {
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        Position pos1 = null;
-        while (!hooked) {
-          Wait.pause(100);
-        }
-        try {
-          Object rs = qs
-              .newQuery("<trace> select pos from /" + repRegionName
-                  + " p, p.collectionHolderMap.values coll, p.positions.values pos where
pos.secId = 'APPL' AND p.ID = 1")
-              .execute();
-          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
-          assertTrue(rs instanceof SelectResults);
-          pos1 = (Position) ((SelectResults) rs).iterator().next();
-          if (!pos1.secId.equals("APPL")) {
-            fail("Query thread did not verify index results even when RE is under update");
-            IndexManager.testHook = null;
-          }
-        } catch (Exception e) {
-          e.printStackTrace();
-          Assert.fail("Query execution failed on server.", e);
+    server.invoke("query on server", () -> {
+      Cache cache = cacheRule.getCache();
+      QueryService queryService = cache.getQueryService();
+      Position pos1 = null;
+      Awaitility.await().until(() -> hooked);
+      try {
+        Object resultSet = queryService
+            .newQuery("<trace> select pos from /" + repRegionName
+                + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId
= 'APPL' AND p.ID = 1")
+            .execute();
+        cache.getLogger().fine("Shobhit: " + resultSet);
+        assertTrue(resultSet instanceof SelectResults);
+        pos1 = (Position) ((SelectResults) resultSet).iterator().next();
+        if (!pos1.secId.equals("APPL")) {
+          fail("Query thread did not verify index results even when RE is under update");
           IndexManager.testHook = null;
-        } finally {
-          hooked = false;// Let client put go further.
-        }
-        while (!hooked) {
-          Wait.pause(100);
         }
-        try {
-          Object rs = qs
-              .newQuery("select pos from /" + repRegionName
-                  + " p, p.collectionHolderMap.values coll, p.positions.values pos where
pos.secId = 'APPL' AND p.ID = 1")
-              .execute();
-          assertTrue(rs instanceof SelectResults);
-          if (((SelectResults) rs).size() > 0) {
-            Position pos2 = (Position) ((SelectResults) rs).iterator().next();
-            if (pos2.equals(pos1)) {
-              fail("Query thread did not verify index results even when RE is under update
and "
-                  + "RegionEntry value has been modified before releasing the lock");
-            }
+      } catch (Exception e) {
+        logger.error(e);
+        Assertions.fail("Query execution failed on server.", e);
+        IndexManager.testHook = null;
+      } finally {
+        hooked = false;// Let client put go further.
+      }
+      Awaitility.await().until(() -> hooked);
+
+      try {
+        Object resultSet = queryService
+            .newQuery("select pos from /" + repRegionName
+                + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId
= 'APPL' AND p.ID = 1")
+            .execute();
+        assertTrue(resultSet instanceof SelectResults);
+        if (((SelectResults) resultSet).size() > 0) {
+          Position pos2 = (Position) ((SelectResults) resultSet).iterator().next();
+          if (pos2.equals(pos1)) {
+            fail("Query thread did not verify index results even when RE is under update
and "
+                + "RegionEntry value has been modified before releasing the lock");
           }
-        } catch (Exception e) {
-          e.printStackTrace();
-          fail("Query execution failed on server.");
-        } finally {
-          IndexManager.testHook = null;
-          hooked = false;// Let client put go further.
         }
+      } catch (Exception e) {
+        logger.error(e);
+        fail("Query execution failed on server.");
+      } finally {
+        IndexManager.testHook = null;
+        hooked = false;// Let client put go further.
       }
     });
-    ThreadUtils.join(putThread, 200);
+    Awaitility.await().until(joinThread(putThread));
   }
 
-  public static void createProxyRegions() {
-    new QueryDataInconsistencyDUnitTest().createProxyRegs();
+  private Callable<Boolean> joinThread(AsyncInvocation thread) {
+    return () -> {
+      try {
+        thread.join(100L);
+      } catch (InterruptedException e) {
+        return false;
+      }
+      if (thread.isAlive()) {
+        return false;
+      }
+      return true;
+    };
   }
 
   private void createProxyRegs() {
-    ClientCache cache = (ClientCache) CacheFactory.getAnyInstance();
+    ClientCache cache = (ClientCache) cacheRule.getCache();
     cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(repRegionName);
-
-    /*
-     * cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create( PartitionedRegionName1);
-     */
-  }
-
-  public static void createNewPR() {
-    new QueryDataInconsistencyDUnitTest().createPR();
   }
 
   public void createPR() {
     PartitionResolver testKeyBasedResolver = new QueryAPITestPartitionResolver();
-    cache = CacheFactory.getAnyInstance();
+    Cache cache = cacheRule.getCache();
+    int numOfBuckets = 20;
     cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT)
         .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(numOfBuckets)
             .setPartitionResolver(testKeyBasedResolver).create())
         .create(PartitionedRegionName1);
   }
 
-  public static void createCacheClientWithoutRegion(String host, Integer port1) {
-    new QueryDataInconsistencyDUnitTest().createCacheClientWithoutReg(host, port1);
-  }
-
-  private void createCacheClientWithoutReg(String host, Integer port1) {
-    disconnectFromDS();
-    new ClientCacheFactory().addPoolServer(host, port1).create();
-  }
-
-  /**
-   * This function puts portfolio objects into the created Region (PR or Local) *
-   *
-   * @return cacheSerializable object
-   */
-  public CacheSerializableRunnable getCacheSerializableRunnableForPRPuts(final String regionName,
-      final Object[] portfolio, final int from, final int to) {
-    SerializableRunnable puts = new CacheSerializableRunnable("Region Puts") {
-      @Override
-      public void run2() throws CacheException {
-        Cache cache = CacheFactory.getAnyInstance();
-        Region region = cache.getRegion(repRegionName);
-        for (int j = from; j < to; j++)
-          region.put(new Integer(j), portfolio[j]);
-        LogWriterUtils.getLogWriter().info(
-            "PRQueryDUnitHelper#getCacheSerializableRunnableForPRPuts: Inserted Portfolio
data on Region "
-                + regionName);
-      }
-    };
-    return (CacheSerializableRunnable) puts;
-  }
-
   public class IndexManagerTestHook
       implements org.apache.geode.cache.query.internal.index.IndexManager.TestHook {
     public void hook(final int spot) throws RuntimeException {
       switch (spot) {
         case 9: // Before Index update and after region entry lock.
           hooked = true;
-          LogWriterUtils.getLogWriter()
+          logger
               .info("QueryDataInconsistency.IndexManagerTestHook is hooked in Update Index
Entry.");
-          while (hooked) {
-            Wait.pause(100);
-          }
-          assertEquals(hooked, false);
+          Awaitility.await().until(() -> !hooked);
           break;
         case 10: // Before Region update and after Index Remove call.
           hooked = true;
-          LogWriterUtils.getLogWriter()
+          logger
               .info("QueryDataInconsistency.IndexManagerTestHook is hooked in Remove Index
Entry.");
-          while (hooked) {
-            Wait.pause(100);
-          }
-          assertEquals(hooked, false);
+          Awaitility.await().until(() -> !hooked);
           break;
         default:
           break;

-- 
To stop receiving notification emails like this one, please contact
udo@apache.org.

Mime
View raw message