geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [geode] 01/01: GEODE-925: Replaced all Wait.pause with Awaitility. Replaced all ThreadUtils.join with an Awaitility. General code and lambda clean up
Date Thu, 19 Apr 2018 18:01:31 GMT
This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch feature/GEODE-925
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 89181954a39951bf29b5729535103a5fa4f1ea52
Author: Udo Kohlmeyer <ukohlmeyer@pivotal.io>
AuthorDate: Thu Apr 19 10:55:46 2018 -0700

    GEODE-925: Replaced all Wait.pause with Awaitility. Replaced all ThreadUtils.join with
an Awaitility. General code and lambda clean up
---
 .../dunit/QueryDataInconsistencyDUnitTest.java     | 648 +++++++++------------
 1 file changed, 286 insertions(+), 362 deletions(-)

diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
index 20d3a78..cc7c336 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
@@ -14,10 +14,16 @@
  */
 package org.apache.geode.cache.query.dunit;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.logging.log4j.Logger;
+import org.awaitility.Awaitility;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -40,15 +46,13 @@ import org.apache.geode.cache.query.internal.QueryObserverHolder;
 import org.apache.geode.cache.query.internal.index.IndexManager;
 import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.internal.cache.execute.PRClientServerTestBase;
+import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.Invoke;
-import org.apache.geode.test.dunit.LogWriterUtils;
 import org.apache.geode.test.dunit.SerializableRunnable;
-import org.apache.geode.test.dunit.ThreadUtils;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
 import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 import org.apache.geode.test.junit.categories.DistributedTest;
@@ -62,45 +66,34 @@ import org.apache.geode.test.junit.categories.OQLIndexTest;
 public class QueryDataInconsistencyDUnitTest extends JUnit4CacheTestCase {
 
   private static final int cnt = 0;
-
   private static final int cntDest = 10;
-
   static VM server = null;
-
   static VM client = null;
-
   static Cache cache = null;
-
-  static String PartitionedRegionName1 = "TestPartitionedRegion1"; // default
-                                                                   // name
-  static String repRegionName = "TestRepRegion"; // default name
-
-  static Integer serverPort1 = null;
-
-  public static int numOfBuckets = 20;
-
+  private static String PartitionedRegionName1 = "TestPartitionedRegion1"; // default name
+  private static String repRegionName = "TestRepRegion"; // default name
   public static String[] queries =
       new String[] {"select * from /" + PartitionedRegionName1 + " where ID=1",};
+  private static volatile boolean hooked = false;
 
-  public static String[] queriesForRR =
-      new String[] {"<trace> select * from /" + repRegionName + " where ID=1"};
-
-  public static volatile boolean hooked = false;
+  private Logger logger = LogService.getLogger(QueryDataInconsistencyDUnitTest.class.getName());
 
   public QueryDataInconsistencyDUnitTest() {
     super();
   }
 
   @Override
-  public final void postTearDownCacheTestCase() throws Exception {
+  public final void postTearDownCacheTestCase() {
     Invoke.invokeInEveryVM(JUnit4DistributedTestCase::disconnectFromDS);
     Invoke.invokeInEveryVM(QueryObserverHolder::reset);
   }
 
   @Override
-  public final void postSetUp() throws Exception {
+  public final void postSetUp() {
     Host host = Host.getHost(0);
     server = host.getVM(0);
+    Awaitility.setDefaultPollInterval(200, TimeUnit.MILLISECONDS);
+    Awaitility.waitAtMost(30, TimeUnit.SECONDS);
   }
 
   @Test
@@ -109,99 +102,79 @@ public class QueryDataInconsistencyDUnitTest extends JUnit4CacheTestCase
{
     Properties props = new Properties();
     server.invoke(() -> PRClientServerTestBase.createCacheInVm(props));
 
-    server.invoke(new CacheSerializableRunnable("create indexes") {
+    server.invoke("create indexes", () -> {
+      cache = CacheFactory.getAnyInstance();
+      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
 
-      @Override
-      public void run2() throws CacheException {
-        cache = CacheFactory.getAnyInstance();
-        Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
-
-        // Create common Portflios and NewPortfolios
-        for (int j = cnt; j < cntDest; j++) {
-          region.put(new Integer(j), new Portfolio(j));
-        }
+      // Create common Portflios and NewPortfolios
+      for (int j = cnt; j < cntDest; j++) {
+        region.put(new Integer(j), new Portfolio(j));
+      }
 
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        try {
-          Index index = qs.createIndex("idIndex", "ID", "/" + repRegionName);
-          assertEquals(10, index.getStatistics().getNumberOfKeys());
-        } catch (Exception e) {
-          fail("Index creation failed");
-        }
+      QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+      try {
+        Index index = qs.createIndex("idIndex", "ID", "/" + repRegionName);
+        assertEquals(10, index.getStatistics().getNumberOfKeys());
+      } catch (Exception e) {
+        fail("Index creation failed");
       }
     });
     // Invoke update from client and stop in updateIndex
     // first before updating the RegionEntry and second after updating
     // the RegionEntry.
-    AsyncInvocation putThread =
-        server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
-
-          @Override
-          public void run2() throws CacheException {
-            Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
-            IndexManager.testHook = new IndexManagerTestHook();
-            repRegion.put(new Integer("1"), new Portfolio(cntDest + 1));
-            // above call must be hooked in BEFORE_UPDATE_OP call.
-          }
-        });
-    server.invoke(new CacheSerializableRunnable("query on server") {
-
-      @Override
-      public void run2() throws CacheException {
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        while (!hooked) {
-          Wait.pause(100);
-        }
-        Object rs = null;
-        try {
-          rs = qs.newQuery("<trace> select * from /" + repRegionName + " where ID =
1").execute();
-        } catch (Exception e) {
-          e.printStackTrace();
-          fail("Query execution failed on server.");
-          IndexManager.testHook = null;
-        }
-        assertTrue(rs instanceof SelectResults);
-        assertEquals(1, ((SelectResults) rs).size());
-        Portfolio p1 = (Portfolio) ((SelectResults) rs).asList().get(0);
-        if (p1.getID() != 1) {
-          fail("Query thread did not verify index results even when RE is under update");
-          IndexManager.testHook = null;
-        }
-        hooked = false;// Let client put go further.
+    AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
+      Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
+      IndexManager.testHook = new IndexManagerTestHook();
+      repRegion.put(new Integer("1"), new Portfolio(cntDest + 1));
+      // above call must be hooked in BEFORE_UPDATE_OP call.
+    });
+    server.invoke("query on server", () -> {
+      QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+      Awaitility.await().until(() -> hooked);
+      Object rs = null;
+      try {
+        rs = qs.newQuery("<trace> select * from /" + repRegionName + " where ID = 1").execute();
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail("Query execution failed on server.");
+        IndexManager.testHook = null;
+      }
+      assertTrue(rs instanceof SelectResults);
+      assertEquals(1, ((SelectResults) rs).size());
+      Portfolio p1 = (Portfolio) ((SelectResults) rs).asList().get(0);
+      if (p1.getID() != 1) {
+        fail("Query thread did not verify index results even when RE is under update");
+        IndexManager.testHook = null;
       }
+      hooked = false;// Let client put go further.
     });
 
     // Client put is again hooked in AFTER_UPDATE_OP call in updateIndex.
-    server.invoke(new CacheSerializableRunnable("query on server") {
-
-      @Override
-      public void run2() throws CacheException {
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        while (!hooked) {
-          Wait.pause(100);
-        }
-        Object rs = null;
-        try {
-          rs = qs.newQuery("<trace> select * from /" + repRegionName + " where ID =
1").execute();
-        } catch (Exception e) {
-          e.printStackTrace();
-          fail("Query execution failed on server." + e.getMessage());
-        } finally {
+    server.invoke("query on server", () -> {
+      QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+      Awaitility.await().until(() -> hooked);
+      Object rs = null;
+      try {
+        rs = qs.newQuery("<trace> select * from /" + repRegionName + " where ID = 1").execute();
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail("Query execution failed on server." + e.getMessage());
+      } finally {
+        IndexManager.testHook = null;
+      }
+      assertTrue(rs instanceof SelectResults);
+      if (((SelectResults) rs).size() > 0) {
+        Portfolio p1 = (Portfolio) ((SelectResults) rs).iterator().next();
+        if (p1.getID() != 1) {
+          fail("Query thread did not verify index results even when RE is under update and
"
+              + "RegionEntry value has been modified before releasing the lock");
           IndexManager.testHook = null;
         }
-        assertTrue(rs instanceof SelectResults);
-        if (((SelectResults) rs).size() > 0) {
-          Portfolio p1 = (Portfolio) ((SelectResults) rs).iterator().next();
-          if (p1.getID() != 1) {
-            fail("Query thread did not verify index results even when RE is under update
and "
-                + "RegionEntry value has been modified before releasing the lock");
-            IndexManager.testHook = null;
-          }
-        }
-        hooked = false;// Let client put go further.
       }
+      hooked = false;// Let client put go further.
     });
-    ThreadUtils.join(putThread, 200);
+    Awaitility.await().until(joinThread(putThread));
+    // ThreadUtils.join(putThread, 200);
   }
 
   @Test
@@ -210,321 +183,277 @@ public class QueryDataInconsistencyDUnitTest extends JUnit4CacheTestCase
{
     Properties props = new Properties();
     server.invoke(() -> PRClientServerTestBase.createCacheInVm(props));
 
-    server.invoke(new CacheSerializableRunnable("create indexes") {
-
-      @Override
-      public void run2() throws CacheException {
-        cache = CacheFactory.getAnyInstance();
-        Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
-        IndexManager.testHook = null;
-        // Create common Portfolios and NewPortfolios
-        Position.cnt = 0;
-        for (int j = cnt; j < cntDest; j++) {
-          Portfolio p = new Portfolio(j);
-          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: portfolio " + j + " :
" + p);
-          region.put(new Integer(j), p);
-        }
+    server.invoke("create indexes", () -> {
+      cache = CacheFactory.getAnyInstance();
+      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
+      IndexManager.testHook = null;
+      // Create common Portfolios and NewPortfolios
+      Position.cnt = 0;
+      for (int j = cnt; j < cntDest; j++) {
+        Portfolio p = new Portfolio(j);
+        CacheFactory.getAnyInstance().getLogger().fine("Shobhit: portfolio " + j + " : "
+ p);
+        region.put(j, p);
+      }
 
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        try {
-          Index index = qs.createIndex("posIndex", "pos.secId",
-              "/" + repRegionName + " p, p.positions.values pos");
-          assertEquals(12, index.getStatistics().getNumberOfKeys());
-        } catch (Exception e) {
-          fail("Index creation failed");
-        }
+      QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+      try {
+        Index index = qs.createIndex("posIndex", "pos.secId",
+            "/" + repRegionName + " p, p.positions.values pos");
+        assertEquals(12, index.getStatistics().getNumberOfKeys());
+      } catch (Exception e) {
+        fail("Index creation failed");
       }
     });
     // Invoke update from client and stop in updateIndex
     // first before updating the RegionEntry and second after updating
     // the RegionEntry.
-    AsyncInvocation putThread =
-        server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
-
-          @Override
-          public void run2() throws CacheException {
-            Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
-            IndexManager.testHook = new IndexManagerTestHook();
-            Portfolio newPort = new Portfolio(cntDest + 1);
-            CacheFactory.getAnyInstance().getLogger().fine("Shobhit: New Portfolio" + newPort);
-            repRegion.put(new Integer("1"), newPort);
-            // above call must be hooked in BEFORE_UPDATE_OP call.
-          }
-        });
-
-    server.invoke(new CacheSerializableRunnable("query on server") {
+    AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
+      Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
+      IndexManager.testHook = new IndexManagerTestHook();
+      Portfolio newPort = new Portfolio(cntDest + 1);
+      CacheFactory.getAnyInstance().getLogger().fine("Shobhit: New Portfolio" + newPort);
+      repRegion.put(new Integer("1"), newPort);
+      // above call must be hooked in BEFORE_UPDATE_OP call.
+    });
 
-      @Override
-      public void run2() throws CacheException {
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        Position pos1 = null;
-        while (!hooked) {
-          Wait.pause(100);
-        }
-        try {
-          Object rs = qs.newQuery("<trace> select pos from /" + repRegionName
-              + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
-          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
-          assertTrue(rs instanceof SelectResults);
-          pos1 = (Position) ((SelectResults) rs).iterator().next();
-          if (!pos1.secId.equals("APPL")) {
-            fail("Query thread did not verify index results even when RE is under update");
-            IndexManager.testHook = null;
-          }
-        } catch (Exception e) {
-          e.printStackTrace();
-          Assert.fail("Query execution failed on server.", e);
+    server.invoke("query on server", () -> {
+      QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+      Position pos1 = null;
+      Awaitility.await().until(() -> hooked);
+      try {
+        Object rs = qs.newQuery("<trace> select pos from /" + repRegionName
+            + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
+        CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
+        assertTrue(rs instanceof SelectResults);
+        pos1 = (Position) ((SelectResults) rs).iterator().next();
+        if (!pos1.secId.equals("APPL")) {
+          fail("Query thread did not verify index results even when RE is under update");
           IndexManager.testHook = null;
-        } finally {
-          hooked = false;// Let client put go further.
-        }
-        while (!hooked) {
-          Wait.pause(100);
         }
-        try {
-          Object rs = qs.newQuery("<trace> select pos from /" + repRegionName
-              + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
-          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
-          assertTrue(rs instanceof SelectResults);
-          if (((SelectResults) rs).size() > 0) {
-            Position pos2 = (Position) ((SelectResults) rs).iterator().next();
-            if (pos2.equals(pos1)) {
-              fail("Query thread did not verify index results even when RE is under update
and "
-                  + "RegionEntry value has been modified before releasing the lock");
-            }
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail("Query execution failed on server.", e);
+        IndexManager.testHook = null;
+      } finally {
+        hooked = false;// Let client put go further.
+      }
+      Awaitility.await().until(() -> hooked);
+      try {
+        Object rs = qs.newQuery("<trace> select pos from /" + repRegionName
+            + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
+        CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
+        assertTrue(rs instanceof SelectResults);
+        if (((SelectResults) rs).size() > 0) {
+          Position pos2 = (Position) ((SelectResults) rs).iterator().next();
+          if (pos2.equals(pos1)) {
+            fail("Query thread did not verify index results even when RE is under update
and "
+                + "RegionEntry value has been modified before releasing the lock");
           }
-        } catch (Exception e) {
-          e.printStackTrace();
-          fail("Query execution failed on server.");
-        } finally {
-          hooked = false;// Let client put go further.
-          IndexManager.testHook = null;
         }
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail("Query execution failed on server.");
+      } finally {
+        hooked = false;// Let client put go further.
+        IndexManager.testHook = null;
       }
     });
-    ThreadUtils.join(putThread, 200);
+    Awaitility.await().until(joinThread(putThread));
+    // ThreadUtils.join(putThread, 200);
   }
 
   @Category(FlakyTest.class) // GEODE-925: time sensitive, async actions, short timeouts
   @Test
-  public void testRangeIndexWithIndexAndQueryFromCluaseMisMatch() { // TODO: fix misspelling
+  public void testRangeIndexWithIndexAndQueryFromClauseMisMatch() { // TODO: fix misspelling
     // Create caches
     Properties props = new Properties();
     server.invoke(() -> PRClientServerTestBase.createCacheInVm(props));
 
-    server.invoke(new CacheSerializableRunnable("create indexes") {
-
-      @Override
-      public void run2() throws CacheException {
-        cache = CacheFactory.getAnyInstance();
-        Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
-        IndexManager.testHook = null;
-        // Create common Portfolios and NewPortfolios
-        Position.cnt = 0;
-        for (int j = cnt; j < cntDest; j++) {
-          region.put(new Integer(j), new Portfolio(j));
-        }
+    server.invoke("create indexes", () -> {
+      cache = CacheFactory.getAnyInstance();
+      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
+      IndexManager.testHook = null;
+      // Create common Portfolios and NewPortfolios
+      Position.cnt = 0;
+      for (int j = cnt; j < cntDest; j++) {
+        region.put(j, new Portfolio(j));
+      }
 
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        try {
-          Index index = qs.createIndex("posIndex", "pos.secId", "/" + repRegionName
-              + " p, p.collectionHolderMap.values coll, p.positions.values pos");
-          assertEquals(12, index.getStatistics().getNumberOfKeys());
-        } catch (Exception e) {
-          fail("Index creation failed");
-        }
+      QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+      try {
+        Index index = qs.createIndex("posIndex", "pos.secId",
+            "/" + repRegionName + " p, p.collectionHolderMap.values coll, p.positions.values
pos");
+        assertEquals(12, index.getStatistics().getNumberOfKeys());
+      } catch (Exception e) {
+        fail("Index creation failed");
       }
     });
     // Invoke update from client and stop in updateIndex
     // first before updating the RegionEntry and second after updating
     // the RegionEntry.
-    AsyncInvocation putThread =
-        server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
-
-          @Override
-          public void run2() throws CacheException {
-            Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
-            IndexManager.testHook = new IndexManagerTestHook();
-            // This portfolio with same ID must have different positions.
-            repRegion.put(new Integer("1"), new Portfolio(1));
-            // above call must be hooked in BEFORE_UPDATE_OP call.
-          }
-        });
-
-    server.invoke(new CacheSerializableRunnable("query on server") {
+    AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
+      Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
+      IndexManager.testHook = new IndexManagerTestHook();
+      // This portfolio with same ID must have different positions.
+      repRegion.put(new Integer("1"), new Portfolio(1));
+      // above call must be hooked in BEFORE_UPDATE_OP call.
+    });
 
-      @Override
-      public void run2() throws CacheException {
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        Position pos1 = null;
-        while (!hooked) {
-          Wait.pause(100);
-        }
-        try {
-          Object rs = qs.newQuery("<trace> select pos from /" + repRegionName
-              + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
-          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
-          assertTrue(rs instanceof SelectResults);
-          pos1 = (Position) ((SelectResults) rs).iterator().next();
-          if (!pos1.secId.equals("APPL")) {
-            fail("Query thread did not verify index results even when RE is under update");
-            IndexManager.testHook = null;
-          }
-        } catch (Exception e) {
-          e.printStackTrace();
-          Assert.fail("Query execution failed on server.", e);
+    server.invoke("query on server", () -> {
+      QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+      Position pos1 = null;
+      Awaitility.await().until(() -> hooked);
+      try {
+        Object rs = qs.newQuery("<trace> select pos from /" + repRegionName
+            + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
+        CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
+        assertTrue(rs instanceof SelectResults);
+        pos1 = (Position) ((SelectResults) rs).iterator().next();
+        if (!pos1.secId.equals("APPL")) {
+          fail("Query thread did not verify index results even when RE is under update");
           IndexManager.testHook = null;
-        } finally {
-          hooked = false;// Let client put go further.
         }
-        while (!hooked) {
-          Wait.pause(100);
-        }
-        try {
-          Object rs = qs.newQuery("select pos from /" + repRegionName
-              + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
-          assertTrue(rs instanceof SelectResults);
-          if (((SelectResults) rs).size() > 0) {
-            Position pos2 = (Position) ((SelectResults) rs).iterator().next();
-            if (pos2.equals(pos1)) {
-              fail("Query thread did not verify index results even when RE is under update
and "
-                  + "RegionEntry value has been modified before releasing the lock");
-            }
+      } catch (Exception e) {
+        logger.error(e);
+        fail("Query execution failed on server.");
+        IndexManager.testHook = null;
+      } finally {
+        hooked = false;// Let client put go further.
+      }
+      Awaitility.await().until(() -> hooked);
+      try {
+        Object rs = qs.newQuery("select pos from /" + repRegionName
+            + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
+        assertTrue(rs instanceof SelectResults);
+        if (((SelectResults) rs).size() > 0) {
+          Position pos2 = (Position) ((SelectResults) rs).iterator().next();
+          if (pos2.equals(pos1)) {
+            fail("Query thread did not verify index results even when RE is under update
and "
+                + "RegionEntry value has been modified before releasing the lock");
           }
-        } catch (Exception e) {
-          e.printStackTrace();
-          fail("Query execution failed on server.");
-        } finally {
-          hooked = false;// Let client put go further.
-          IndexManager.testHook = null;
         }
+      } catch (Exception e) {
+        logger.error(e);
+        fail("Query execution failed on server.");
+      } finally {
+        hooked = false;// Let client put go further.
+        IndexManager.testHook = null;
       }
     });
-    ThreadUtils.join(putThread, 200); // GEODE-925 occurs here and this is very short join
200
-                                      // millis
+    Awaitility.await().until(joinThread(putThread));
+    // ThreadUtils.join(putThread, 200); // GEODE-925 occurs here and this is very short
join 200
+    // millis
   }
 
   @Test
-  public void testRangeIndexWithIndexAndQueryFromCluaseMisMatch2() {
+  public void testRangeIndexWithIndexAndQueryFromClauseMisMatch2() {
     // Create caches
     Properties props = new Properties();
     server.invoke(() -> PRClientServerTestBase.createCacheInVm(props));
 
-    server.invoke(new CacheSerializableRunnable("create indexes") {
-
-      @Override
-      public void run2() throws CacheException {
-        cache = CacheFactory.getAnyInstance();
-        Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
-        IndexManager.testHook = null;
-        // Create common Portfolios and NewPortfolios
-        Position.cnt = 0;
-        for (int j = cnt; j < cntDest; j++) {
-          region.put(new Integer(j), new Portfolio(j));
-        }
+    server.invoke("create indexes", () -> {
+      cache = CacheFactory.getAnyInstance();
+      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
+      IndexManager.testHook = null;
+      // Create common Portfolios and NewPortfolios
+      Position.cnt = 0;
+      for (int j = cnt; j < cntDest; j++) {
+        region.put(new Integer(j), new Portfolio(j));
+      }
 
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        try {
-          Index index = qs.createIndex("posIndex", "pos.secId",
-              "/" + repRegionName + " p, p.positions.values pos");
-          assertEquals(12, index.getStatistics().getNumberOfKeys());
-        } catch (Exception e) {
-          fail("Index creation failed");
-        }
+      QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+      try {
+        Index index = qs.createIndex("posIndex", "pos.secId",
+            "/" + repRegionName + " p, p.positions.values pos");
+        assertEquals(12, index.getStatistics().getNumberOfKeys());
+      } catch (Exception e) {
+        fail("Index creation failed");
       }
     });
     // Invoke update from client and stop in updateIndex
     // first before updating the RegionEntry and second after updating
     // the RegionEntry.
-    AsyncInvocation putThread =
-        server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
-
-          @Override
-          public void run2() throws CacheException {
-            Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
-            IndexManager.testHook = new IndexManagerTestHook();
-            // This portfolio with same ID must have different positions.
-            repRegion.put(new Integer("1"), new Portfolio(1));
-            // above call must be hooked in BEFORE_UPDATE_OP call.
-          }
-        });
-
-    server.invoke(new CacheSerializableRunnable("query on server") {
+    AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
+      Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
+      IndexManager.testHook = new IndexManagerTestHook();
+      // This portfolio with same ID must have different positions.
+      repRegion.put(new Integer("1"), new Portfolio(1));
+      // above call must be hooked in BEFORE_UPDATE_OP call.
+    });
 
-      @Override
-      public void run2() throws CacheException {
-        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
-        Position pos1 = null;
-        while (!hooked) {
-          Wait.pause(100);
-        }
-        try {
-          Object rs = qs
-              .newQuery("<trace> select pos from /" + repRegionName
-                  + " p, p.collectionHolderMap.values coll, p.positions.values pos where
pos.secId = 'APPL' AND p.ID = 1")
-              .execute();
-          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
-          assertTrue(rs instanceof SelectResults);
-          pos1 = (Position) ((SelectResults) rs).iterator().next();
-          if (!pos1.secId.equals("APPL")) {
-            fail("Query thread did not verify index results even when RE is under update");
-            IndexManager.testHook = null;
-          }
-        } catch (Exception e) {
-          e.printStackTrace();
-          Assert.fail("Query execution failed on server.", e);
+    server.invoke("query on server", () -> {
+      QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+      Position pos1 = null;
+      Awaitility.await().until(() -> hooked);
+      try {
+        Object rs = qs
+            .newQuery("<trace> select pos from /" + repRegionName
+                + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId
= 'APPL' AND p.ID = 1")
+            .execute();
+        CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
+        assertTrue(rs instanceof SelectResults);
+        pos1 = (Position) ((SelectResults) rs).iterator().next();
+        if (!pos1.secId.equals("APPL")) {
+          fail("Query thread did not verify index results even when RE is under update");
           IndexManager.testHook = null;
-        } finally {
-          hooked = false;// Let client put go further.
         }
-        while (!hooked) {
-          Wait.pause(100);
-        }
-        try {
-          Object rs = qs
-              .newQuery("select pos from /" + repRegionName
-                  + " p, p.collectionHolderMap.values coll, p.positions.values pos where
pos.secId = 'APPL' AND p.ID = 1")
-              .execute();
-          assertTrue(rs instanceof SelectResults);
-          if (((SelectResults) rs).size() > 0) {
-            Position pos2 = (Position) ((SelectResults) rs).iterator().next();
-            if (pos2.equals(pos1)) {
-              fail("Query thread did not verify index results even when RE is under update
and "
-                  + "RegionEntry value has been modified before releasing the lock");
-            }
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail("Query execution failed on server.", e);
+        IndexManager.testHook = null;
+      } finally {
+        hooked = false;// Let client put go further.
+      }
+      Awaitility.await().until(() -> hooked);
+
+      try {
+        Object rs = qs
+            .newQuery("select pos from /" + repRegionName
+                + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId
= 'APPL' AND p.ID = 1")
+            .execute();
+        assertTrue(rs instanceof SelectResults);
+        if (((SelectResults) rs).size() > 0) {
+          Position pos2 = (Position) ((SelectResults) rs).iterator().next();
+          if (pos2.equals(pos1)) {
+            fail("Query thread did not verify index results even when RE is under update
and "
+                + "RegionEntry value has been modified before releasing the lock");
           }
-        } catch (Exception e) {
-          e.printStackTrace();
-          fail("Query execution failed on server.");
-        } finally {
-          IndexManager.testHook = null;
-          hooked = false;// Let client put go further.
         }
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail("Query execution failed on server.");
+      } finally {
+        IndexManager.testHook = null;
+        hooked = false;// Let client put go further.
       }
     });
-    ThreadUtils.join(putThread, 200);
+    Awaitility.await().until(joinThread(putThread));
+    // ThreadUtils.join(putThread, 200);
   }
 
-  public static void createProxyRegions() {
-    new QueryDataInconsistencyDUnitTest().createProxyRegs();
+  private Callable<Boolean> joinThread(AsyncInvocation thread) {
+    return () -> {
+      try {
+        thread.join(100L);
+      } catch (InterruptedException e) {
+        return false;
+      }
+      if (thread.isAlive()) {
+        return false;
+      }
+      return true;
+    };
   }
 
   private void createProxyRegs() {
     ClientCache cache = (ClientCache) CacheFactory.getAnyInstance();
     cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(repRegionName);
-
-    /*
-     * cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create( PartitionedRegionName1);
-     */
-  }
-
-  public static void createNewPR() {
-    new QueryDataInconsistencyDUnitTest().createPR();
   }
 
   public void createPR() {
     PartitionResolver testKeyBasedResolver = new QueryAPITestPartitionResolver();
     cache = CacheFactory.getAnyInstance();
+    int numOfBuckets = 20;
     cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT)
         .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(numOfBuckets)
             .setPartitionResolver(testKeyBasedResolver).create())
@@ -552,9 +481,10 @@ public class QueryDataInconsistencyDUnitTest extends JUnit4CacheTestCase
{
       public void run2() throws CacheException {
         Cache cache = CacheFactory.getAnyInstance();
         Region region = cache.getRegion(repRegionName);
-        for (int j = from; j < to; j++)
+        for (int j = from; j < to; j++) {
           region.put(new Integer(j), portfolio[j]);
-        LogWriterUtils.getLogWriter().info(
+        }
+        logger.info(
             "PRQueryDUnitHelper#getCacheSerializableRunnableForPRPuts: Inserted Portfolio
data on Region "
                 + regionName);
       }
@@ -568,21 +498,15 @@ public class QueryDataInconsistencyDUnitTest extends JUnit4CacheTestCase
{
       switch (spot) {
         case 9: // Before Index update and after region entry lock.
           hooked = true;
-          LogWriterUtils.getLogWriter()
+          logger
               .info("QueryDataInconsistency.IndexManagerTestHook is hooked in Update Index
Entry.");
-          while (hooked) {
-            Wait.pause(100);
-          }
-          assertEquals(hooked, false);
+          Awaitility.await().until(() -> !hooked);
           break;
         case 10: // Before Region update and after Index Remove call.
           hooked = true;
-          LogWriterUtils.getLogWriter()
+          logger
               .info("QueryDataInconsistency.IndexManagerTestHook is hooked in Remove Index
Entry.");
-          while (hooked) {
-            Wait.pause(100);
-          }
-          assertEquals(hooked, false);
+          Awaitility.await().until(() -> !hooked);
           break;
         default:
           break;

-- 
To stop receiving notification emails like this one, please contact
udo@apache.org.

Mime
View raw message