From commits-return-26679-archive-asf-public=cust-asf.ponee.io@geode.apache.org Thu Apr 19 20:01:34 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id DDB8818076D for ; Thu, 19 Apr 2018 20:01:32 +0200 (CEST) Received: (qmail 41917 invoked by uid 500); 19 Apr 2018 18:01:32 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 41887 invoked by uid 99); 19 Apr 2018 18:01:31 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Apr 2018 18:01:31 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 342CA82FA0; Thu, 19 Apr 2018 18:01:31 +0000 (UTC) Date: Thu, 19 Apr 2018 18:01:31 +0000 To: "commits@geode.apache.org" Subject: [geode] 01/01: GEODE-925: Replaced all Wait.pause with Awaitility. Replaced all ThreadUtils.join with an Awaitility. General code and lambda clean up MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: udo@apache.org In-Reply-To: <152416089026.11698.7861357638675342594@gitbox.apache.org> References: <152416089026.11698.7861357638675342594@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: geode X-Git-Refname: refs/heads/feature/GEODE-925 X-Git-Reftype: branch X-Git-Rev: 89181954a39951bf29b5729535103a5fa4f1ea52 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20180419180131.342CA82FA0@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. udo pushed a commit to branch feature/GEODE-925 in repository https://gitbox.apache.org/repos/asf/geode.git commit 89181954a39951bf29b5729535103a5fa4f1ea52 Author: Udo Kohlmeyer AuthorDate: Thu Apr 19 10:55:46 2018 -0700 GEODE-925: Replaced all Wait.pause with Awaitility. Replaced all ThreadUtils.join with an Awaitility. General code and lambda clean up --- .../dunit/QueryDataInconsistencyDUnitTest.java | 648 +++++++++------------ 1 file changed, 286 insertions(+), 362 deletions(-) diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java index 20d3a78..cc7c336 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java @@ -14,10 +14,16 @@ */ package org.apache.geode.cache.query.dunit; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.Logger; +import org.awaitility.Awaitility; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -40,15 +46,13 @@ import org.apache.geode.cache.query.internal.QueryObserverHolder; import org.apache.geode.cache.query.internal.index.IndexManager; import org.apache.geode.cache30.CacheSerializableRunnable; import org.apache.geode.internal.cache.execute.PRClientServerTestBase; +import org.apache.geode.internal.logging.LogService; import org.apache.geode.test.dunit.Assert; import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.Invoke; -import org.apache.geode.test.dunit.LogWriterUtils; import org.apache.geode.test.dunit.SerializableRunnable; -import org.apache.geode.test.dunit.ThreadUtils; import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.Wait; import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; import org.apache.geode.test.junit.categories.DistributedTest; @@ -62,45 +66,34 @@ import org.apache.geode.test.junit.categories.OQLIndexTest; public class QueryDataInconsistencyDUnitTest extends JUnit4CacheTestCase { private static final int cnt = 0; - private static final int cntDest = 10; - static VM server = null; - static VM client = null; - static Cache cache = null; - - static String PartitionedRegionName1 = "TestPartitionedRegion1"; // default - // name - static String repRegionName = "TestRepRegion"; // default name - - static Integer serverPort1 = null; - - public static int numOfBuckets = 20; - + private static String PartitionedRegionName1 = "TestPartitionedRegion1"; // default name + private static String repRegionName = "TestRepRegion"; // default name public static String[] queries = new String[] {"select * from /" + PartitionedRegionName1 + " where ID=1",}; + private static volatile boolean hooked = false; - public static String[] queriesForRR = - new String[] {" select * from /" + repRegionName + " where ID=1"}; - - public static volatile boolean hooked = false; + private Logger logger = LogService.getLogger(QueryDataInconsistencyDUnitTest.class.getName()); public QueryDataInconsistencyDUnitTest() { super(); } @Override - public final void postTearDownCacheTestCase() throws Exception { + public final void postTearDownCacheTestCase() { Invoke.invokeInEveryVM(JUnit4DistributedTestCase::disconnectFromDS); Invoke.invokeInEveryVM(QueryObserverHolder::reset); } @Override - public final void postSetUp() throws Exception { + public final void postSetUp() { Host host = Host.getHost(0); server = host.getVM(0); + Awaitility.setDefaultPollInterval(200, TimeUnit.MILLISECONDS); + Awaitility.waitAtMost(30, TimeUnit.SECONDS); } @Test @@ -109,99 +102,79 @@ public class QueryDataInconsistencyDUnitTest extends JUnit4CacheTestCase { Properties props = new Properties(); server.invoke(() -> PRClientServerTestBase.createCacheInVm(props)); - server.invoke(new CacheSerializableRunnable("create indexes") { + server.invoke("create indexes", () -> { + cache = CacheFactory.getAnyInstance(); + Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName); - @Override - public void run2() throws CacheException { - cache = CacheFactory.getAnyInstance(); - Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName); - - // Create common Portflios and NewPortfolios - for (int j = cnt; j < cntDest; j++) { - region.put(new Integer(j), new Portfolio(j)); - } + // Create common Portflios and NewPortfolios + for (int j = cnt; j < cntDest; j++) { + region.put(new Integer(j), new Portfolio(j)); + } - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - try { - Index index = qs.createIndex("idIndex", "ID", "/" + repRegionName); - assertEquals(10, index.getStatistics().getNumberOfKeys()); - } catch (Exception e) { - fail("Index creation failed"); - } + QueryService qs = CacheFactory.getAnyInstance().getQueryService(); + try { + Index index = qs.createIndex("idIndex", "ID", "/" + repRegionName); + assertEquals(10, index.getStatistics().getNumberOfKeys()); + } catch (Exception e) { + fail("Index creation failed"); } }); // Invoke update from client and stop in updateIndex // first before updating the RegionEntry and second after updating // the RegionEntry. - AsyncInvocation putThread = - server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") { - - @Override - public void run2() throws CacheException { - Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName); - IndexManager.testHook = new IndexManagerTestHook(); - repRegion.put(new Integer("1"), new Portfolio(cntDest + 1)); - // above call must be hooked in BEFORE_UPDATE_OP call. - } - }); - server.invoke(new CacheSerializableRunnable("query on server") { - - @Override - public void run2() throws CacheException { - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - while (!hooked) { - Wait.pause(100); - } - Object rs = null; - try { - rs = qs.newQuery(" select * from /" + repRegionName + " where ID = 1").execute(); - } catch (Exception e) { - e.printStackTrace(); - fail("Query execution failed on server."); - IndexManager.testHook = null; - } - assertTrue(rs instanceof SelectResults); - assertEquals(1, ((SelectResults) rs).size()); - Portfolio p1 = (Portfolio) ((SelectResults) rs).asList().get(0); - if (p1.getID() != 1) { - fail("Query thread did not verify index results even when RE is under update"); - IndexManager.testHook = null; - } - hooked = false;// Let client put go further. + AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> { + Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName); + IndexManager.testHook = new IndexManagerTestHook(); + repRegion.put(new Integer("1"), new Portfolio(cntDest + 1)); + // above call must be hooked in BEFORE_UPDATE_OP call. + }); + server.invoke("query on server", () -> { + QueryService qs = CacheFactory.getAnyInstance().getQueryService(); + Awaitility.await().until(() -> hooked); + Object rs = null; + try { + rs = qs.newQuery(" select * from /" + repRegionName + " where ID = 1").execute(); + } catch (Exception e) { + e.printStackTrace(); + fail("Query execution failed on server."); + IndexManager.testHook = null; + } + assertTrue(rs instanceof SelectResults); + assertEquals(1, ((SelectResults) rs).size()); + Portfolio p1 = (Portfolio) ((SelectResults) rs).asList().get(0); + if (p1.getID() != 1) { + fail("Query thread did not verify index results even when RE is under update"); + IndexManager.testHook = null; } + hooked = false;// Let client put go further. }); // Client put is again hooked in AFTER_UPDATE_OP call in updateIndex. - server.invoke(new CacheSerializableRunnable("query on server") { - - @Override - public void run2() throws CacheException { - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - while (!hooked) { - Wait.pause(100); - } - Object rs = null; - try { - rs = qs.newQuery(" select * from /" + repRegionName + " where ID = 1").execute(); - } catch (Exception e) { - e.printStackTrace(); - fail("Query execution failed on server." + e.getMessage()); - } finally { + server.invoke("query on server", () -> { + QueryService qs = CacheFactory.getAnyInstance().getQueryService(); + Awaitility.await().until(() -> hooked); + Object rs = null; + try { + rs = qs.newQuery(" select * from /" + repRegionName + " where ID = 1").execute(); + } catch (Exception e) { + e.printStackTrace(); + fail("Query execution failed on server." + e.getMessage()); + } finally { + IndexManager.testHook = null; + } + assertTrue(rs instanceof SelectResults); + if (((SelectResults) rs).size() > 0) { + Portfolio p1 = (Portfolio) ((SelectResults) rs).iterator().next(); + if (p1.getID() != 1) { + fail("Query thread did not verify index results even when RE is under update and " + + "RegionEntry value has been modified before releasing the lock"); IndexManager.testHook = null; } - assertTrue(rs instanceof SelectResults); - if (((SelectResults) rs).size() > 0) { - Portfolio p1 = (Portfolio) ((SelectResults) rs).iterator().next(); - if (p1.getID() != 1) { - fail("Query thread did not verify index results even when RE is under update and " - + "RegionEntry value has been modified before releasing the lock"); - IndexManager.testHook = null; - } - } - hooked = false;// Let client put go further. } + hooked = false;// Let client put go further. }); - ThreadUtils.join(putThread, 200); + Awaitility.await().until(joinThread(putThread)); + // ThreadUtils.join(putThread, 200); } @Test @@ -210,321 +183,277 @@ public class QueryDataInconsistencyDUnitTest extends JUnit4CacheTestCase { Properties props = new Properties(); server.invoke(() -> PRClientServerTestBase.createCacheInVm(props)); - server.invoke(new CacheSerializableRunnable("create indexes") { - - @Override - public void run2() throws CacheException { - cache = CacheFactory.getAnyInstance(); - Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName); - IndexManager.testHook = null; - // Create common Portfolios and NewPortfolios - Position.cnt = 0; - for (int j = cnt; j < cntDest; j++) { - Portfolio p = new Portfolio(j); - CacheFactory.getAnyInstance().getLogger().fine("Shobhit: portfolio " + j + " : " + p); - region.put(new Integer(j), p); - } + server.invoke("create indexes", () -> { + cache = CacheFactory.getAnyInstance(); + Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName); + IndexManager.testHook = null; + // Create common Portfolios and NewPortfolios + Position.cnt = 0; + for (int j = cnt; j < cntDest; j++) { + Portfolio p = new Portfolio(j); + CacheFactory.getAnyInstance().getLogger().fine("Shobhit: portfolio " + j + " : " + p); + region.put(j, p); + } - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - try { - Index index = qs.createIndex("posIndex", "pos.secId", - "/" + repRegionName + " p, p.positions.values pos"); - assertEquals(12, index.getStatistics().getNumberOfKeys()); - } catch (Exception e) { - fail("Index creation failed"); - } + QueryService qs = CacheFactory.getAnyInstance().getQueryService(); + try { + Index index = qs.createIndex("posIndex", "pos.secId", + "/" + repRegionName + " p, p.positions.values pos"); + assertEquals(12, index.getStatistics().getNumberOfKeys()); + } catch (Exception e) { + fail("Index creation failed"); } }); // Invoke update from client and stop in updateIndex // first before updating the RegionEntry and second after updating // the RegionEntry. - AsyncInvocation putThread = - server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") { - - @Override - public void run2() throws CacheException { - Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName); - IndexManager.testHook = new IndexManagerTestHook(); - Portfolio newPort = new Portfolio(cntDest + 1); - CacheFactory.getAnyInstance().getLogger().fine("Shobhit: New Portfolio" + newPort); - repRegion.put(new Integer("1"), newPort); - // above call must be hooked in BEFORE_UPDATE_OP call. - } - }); - - server.invoke(new CacheSerializableRunnable("query on server") { + AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> { + Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName); + IndexManager.testHook = new IndexManagerTestHook(); + Portfolio newPort = new Portfolio(cntDest + 1); + CacheFactory.getAnyInstance().getLogger().fine("Shobhit: New Portfolio" + newPort); + repRegion.put(new Integer("1"), newPort); + // above call must be hooked in BEFORE_UPDATE_OP call. + }); - @Override - public void run2() throws CacheException { - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - Position pos1 = null; - while (!hooked) { - Wait.pause(100); - } - try { - Object rs = qs.newQuery(" select pos from /" + repRegionName - + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute(); - CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs); - assertTrue(rs instanceof SelectResults); - pos1 = (Position) ((SelectResults) rs).iterator().next(); - if (!pos1.secId.equals("APPL")) { - fail("Query thread did not verify index results even when RE is under update"); - IndexManager.testHook = null; - } - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Query execution failed on server.", e); + server.invoke("query on server", () -> { + QueryService qs = CacheFactory.getAnyInstance().getQueryService(); + Position pos1 = null; + Awaitility.await().until(() -> hooked); + try { + Object rs = qs.newQuery(" select pos from /" + repRegionName + + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute(); + CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs); + assertTrue(rs instanceof SelectResults); + pos1 = (Position) ((SelectResults) rs).iterator().next(); + if (!pos1.secId.equals("APPL")) { + fail("Query thread did not verify index results even when RE is under update"); IndexManager.testHook = null; - } finally { - hooked = false;// Let client put go further. - } - while (!hooked) { - Wait.pause(100); } - try { - Object rs = qs.newQuery(" select pos from /" + repRegionName - + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute(); - CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs); - assertTrue(rs instanceof SelectResults); - if (((SelectResults) rs).size() > 0) { - Position pos2 = (Position) ((SelectResults) rs).iterator().next(); - if (pos2.equals(pos1)) { - fail("Query thread did not verify index results even when RE is under update and " - + "RegionEntry value has been modified before releasing the lock"); - } + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Query execution failed on server.", e); + IndexManager.testHook = null; + } finally { + hooked = false;// Let client put go further. + } + Awaitility.await().until(() -> hooked); + try { + Object rs = qs.newQuery(" select pos from /" + repRegionName + + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute(); + CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs); + assertTrue(rs instanceof SelectResults); + if (((SelectResults) rs).size() > 0) { + Position pos2 = (Position) ((SelectResults) rs).iterator().next(); + if (pos2.equals(pos1)) { + fail("Query thread did not verify index results even when RE is under update and " + + "RegionEntry value has been modified before releasing the lock"); } - } catch (Exception e) { - e.printStackTrace(); - fail("Query execution failed on server."); - } finally { - hooked = false;// Let client put go further. - IndexManager.testHook = null; } + } catch (Exception e) { + e.printStackTrace(); + fail("Query execution failed on server."); + } finally { + hooked = false;// Let client put go further. + IndexManager.testHook = null; } }); - ThreadUtils.join(putThread, 200); + Awaitility.await().until(joinThread(putThread)); + // ThreadUtils.join(putThread, 200); } @Category(FlakyTest.class) // GEODE-925: time sensitive, async actions, short timeouts @Test - public void testRangeIndexWithIndexAndQueryFromCluaseMisMatch() { // TODO: fix misspelling + public void testRangeIndexWithIndexAndQueryFromClauseMisMatch() { // TODO: fix misspelling // Create caches Properties props = new Properties(); server.invoke(() -> PRClientServerTestBase.createCacheInVm(props)); - server.invoke(new CacheSerializableRunnable("create indexes") { - - @Override - public void run2() throws CacheException { - cache = CacheFactory.getAnyInstance(); - Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName); - IndexManager.testHook = null; - // Create common Portfolios and NewPortfolios - Position.cnt = 0; - for (int j = cnt; j < cntDest; j++) { - region.put(new Integer(j), new Portfolio(j)); - } + server.invoke("create indexes", () -> { + cache = CacheFactory.getAnyInstance(); + Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName); + IndexManager.testHook = null; + // Create common Portfolios and NewPortfolios + Position.cnt = 0; + for (int j = cnt; j < cntDest; j++) { + region.put(j, new Portfolio(j)); + } - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - try { - Index index = qs.createIndex("posIndex", "pos.secId", "/" + repRegionName - + " p, p.collectionHolderMap.values coll, p.positions.values pos"); - assertEquals(12, index.getStatistics().getNumberOfKeys()); - } catch (Exception e) { - fail("Index creation failed"); - } + QueryService qs = CacheFactory.getAnyInstance().getQueryService(); + try { + Index index = qs.createIndex("posIndex", "pos.secId", + "/" + repRegionName + " p, p.collectionHolderMap.values coll, p.positions.values pos"); + assertEquals(12, index.getStatistics().getNumberOfKeys()); + } catch (Exception e) { + fail("Index creation failed"); } }); // Invoke update from client and stop in updateIndex // first before updating the RegionEntry and second after updating // the RegionEntry. - AsyncInvocation putThread = - server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") { - - @Override - public void run2() throws CacheException { - Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName); - IndexManager.testHook = new IndexManagerTestHook(); - // This portfolio with same ID must have different positions. - repRegion.put(new Integer("1"), new Portfolio(1)); - // above call must be hooked in BEFORE_UPDATE_OP call. - } - }); - - server.invoke(new CacheSerializableRunnable("query on server") { + AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> { + Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName); + IndexManager.testHook = new IndexManagerTestHook(); + // This portfolio with same ID must have different positions. + repRegion.put(new Integer("1"), new Portfolio(1)); + // above call must be hooked in BEFORE_UPDATE_OP call. + }); - @Override - public void run2() throws CacheException { - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - Position pos1 = null; - while (!hooked) { - Wait.pause(100); - } - try { - Object rs = qs.newQuery(" select pos from /" + repRegionName - + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute(); - CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs); - assertTrue(rs instanceof SelectResults); - pos1 = (Position) ((SelectResults) rs).iterator().next(); - if (!pos1.secId.equals("APPL")) { - fail("Query thread did not verify index results even when RE is under update"); - IndexManager.testHook = null; - } - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Query execution failed on server.", e); + server.invoke("query on server", () -> { + QueryService qs = CacheFactory.getAnyInstance().getQueryService(); + Position pos1 = null; + Awaitility.await().until(() -> hooked); + try { + Object rs = qs.newQuery(" select pos from /" + repRegionName + + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute(); + CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs); + assertTrue(rs instanceof SelectResults); + pos1 = (Position) ((SelectResults) rs).iterator().next(); + if (!pos1.secId.equals("APPL")) { + fail("Query thread did not verify index results even when RE is under update"); IndexManager.testHook = null; - } finally { - hooked = false;// Let client put go further. } - while (!hooked) { - Wait.pause(100); - } - try { - Object rs = qs.newQuery("select pos from /" + repRegionName - + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute(); - assertTrue(rs instanceof SelectResults); - if (((SelectResults) rs).size() > 0) { - Position pos2 = (Position) ((SelectResults) rs).iterator().next(); - if (pos2.equals(pos1)) { - fail("Query thread did not verify index results even when RE is under update and " - + "RegionEntry value has been modified before releasing the lock"); - } + } catch (Exception e) { + logger.error(e); + fail("Query execution failed on server."); + IndexManager.testHook = null; + } finally { + hooked = false;// Let client put go further. + } + Awaitility.await().until(() -> hooked); + try { + Object rs = qs.newQuery("select pos from /" + repRegionName + + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute(); + assertTrue(rs instanceof SelectResults); + if (((SelectResults) rs).size() > 0) { + Position pos2 = (Position) ((SelectResults) rs).iterator().next(); + if (pos2.equals(pos1)) { + fail("Query thread did not verify index results even when RE is under update and " + + "RegionEntry value has been modified before releasing the lock"); } - } catch (Exception e) { - e.printStackTrace(); - fail("Query execution failed on server."); - } finally { - hooked = false;// Let client put go further. - IndexManager.testHook = null; } + } catch (Exception e) { + logger.error(e); + fail("Query execution failed on server."); + } finally { + hooked = false;// Let client put go further. + IndexManager.testHook = null; } }); - ThreadUtils.join(putThread, 200); // GEODE-925 occurs here and this is very short join 200 - // millis + Awaitility.await().until(joinThread(putThread)); + // ThreadUtils.join(putThread, 200); // GEODE-925 occurs here and this is very short join 200 + // millis } @Test - public void testRangeIndexWithIndexAndQueryFromCluaseMisMatch2() { + public void testRangeIndexWithIndexAndQueryFromClauseMisMatch2() { // Create caches Properties props = new Properties(); server.invoke(() -> PRClientServerTestBase.createCacheInVm(props)); - server.invoke(new CacheSerializableRunnable("create indexes") { - - @Override - public void run2() throws CacheException { - cache = CacheFactory.getAnyInstance(); - Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName); - IndexManager.testHook = null; - // Create common Portfolios and NewPortfolios - Position.cnt = 0; - for (int j = cnt; j < cntDest; j++) { - region.put(new Integer(j), new Portfolio(j)); - } + server.invoke("create indexes", () -> { + cache = CacheFactory.getAnyInstance(); + Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName); + IndexManager.testHook = null; + // Create common Portfolios and NewPortfolios + Position.cnt = 0; + for (int j = cnt; j < cntDest; j++) { + region.put(new Integer(j), new Portfolio(j)); + } - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - try { - Index index = qs.createIndex("posIndex", "pos.secId", - "/" + repRegionName + " p, p.positions.values pos"); - assertEquals(12, index.getStatistics().getNumberOfKeys()); - } catch (Exception e) { - fail("Index creation failed"); - } + QueryService qs = CacheFactory.getAnyInstance().getQueryService(); + try { + Index index = qs.createIndex("posIndex", "pos.secId", + "/" + repRegionName + " p, p.positions.values pos"); + assertEquals(12, index.getStatistics().getNumberOfKeys()); + } catch (Exception e) { + fail("Index creation failed"); } }); // Invoke update from client and stop in updateIndex // first before updating the RegionEntry and second after updating // the RegionEntry. - AsyncInvocation putThread = - server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") { - - @Override - public void run2() throws CacheException { - Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName); - IndexManager.testHook = new IndexManagerTestHook(); - // This portfolio with same ID must have different positions. - repRegion.put(new Integer("1"), new Portfolio(1)); - // above call must be hooked in BEFORE_UPDATE_OP call. - } - }); - - server.invoke(new CacheSerializableRunnable("query on server") { + AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> { + Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName); + IndexManager.testHook = new IndexManagerTestHook(); + // This portfolio with same ID must have different positions. + repRegion.put(new Integer("1"), new Portfolio(1)); + // above call must be hooked in BEFORE_UPDATE_OP call. + }); - @Override - public void run2() throws CacheException { - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - Position pos1 = null; - while (!hooked) { - Wait.pause(100); - } - try { - Object rs = qs - .newQuery(" select pos from /" + repRegionName - + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1") - .execute(); - CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs); - assertTrue(rs instanceof SelectResults); - pos1 = (Position) ((SelectResults) rs).iterator().next(); - if (!pos1.secId.equals("APPL")) { - fail("Query thread did not verify index results even when RE is under update"); - IndexManager.testHook = null; - } - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Query execution failed on server.", e); + server.invoke("query on server", () -> { + QueryService qs = CacheFactory.getAnyInstance().getQueryService(); + Position pos1 = null; + Awaitility.await().until(() -> hooked); + try { + Object rs = qs + .newQuery(" select pos from /" + repRegionName + + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1") + .execute(); + CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs); + assertTrue(rs instanceof SelectResults); + pos1 = (Position) ((SelectResults) rs).iterator().next(); + if (!pos1.secId.equals("APPL")) { + fail("Query thread did not verify index results even when RE is under update"); IndexManager.testHook = null; - } finally { - hooked = false;// Let client put go further. } - while (!hooked) { - Wait.pause(100); - } - try { - Object rs = qs - .newQuery("select pos from /" + repRegionName - + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1") - .execute(); - assertTrue(rs instanceof SelectResults); - if (((SelectResults) rs).size() > 0) { - Position pos2 = (Position) ((SelectResults) rs).iterator().next(); - if (pos2.equals(pos1)) { - fail("Query thread did not verify index results even when RE is under update and " - + "RegionEntry value has been modified before releasing the lock"); - } + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Query execution failed on server.", e); + IndexManager.testHook = null; + } finally { + hooked = false;// Let client put go further. + } + Awaitility.await().until(() -> hooked); + + try { + Object rs = qs + .newQuery("select pos from /" + repRegionName + + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1") + .execute(); + assertTrue(rs instanceof SelectResults); + if (((SelectResults) rs).size() > 0) { + Position pos2 = (Position) ((SelectResults) rs).iterator().next(); + if (pos2.equals(pos1)) { + fail("Query thread did not verify index results even when RE is under update and " + + "RegionEntry value has been modified before releasing the lock"); } - } catch (Exception e) { - e.printStackTrace(); - fail("Query execution failed on server."); - } finally { - IndexManager.testHook = null; - hooked = false;// Let client put go further. } + } catch (Exception e) { + e.printStackTrace(); + fail("Query execution failed on server."); + } finally { + IndexManager.testHook = null; + hooked = false;// Let client put go further. } }); - ThreadUtils.join(putThread, 200); + Awaitility.await().until(joinThread(putThread)); + // ThreadUtils.join(putThread, 200); } - public static void createProxyRegions() { - new QueryDataInconsistencyDUnitTest().createProxyRegs(); + private Callable joinThread(AsyncInvocation thread) { + return () -> { + try { + thread.join(100L); + } catch (InterruptedException e) { + return false; + } + if (thread.isAlive()) { + return false; + } + return true; + }; } private void createProxyRegs() { ClientCache cache = (ClientCache) CacheFactory.getAnyInstance(); cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(repRegionName); - - /* - * cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create( PartitionedRegionName1); - */ - } - - public static void createNewPR() { - new QueryDataInconsistencyDUnitTest().createPR(); } public void createPR() { PartitionResolver testKeyBasedResolver = new QueryAPITestPartitionResolver(); cache = CacheFactory.getAnyInstance(); + int numOfBuckets = 20; cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT) .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(numOfBuckets) .setPartitionResolver(testKeyBasedResolver).create()) @@ -552,9 +481,10 @@ public class QueryDataInconsistencyDUnitTest extends JUnit4CacheTestCase { public void run2() throws CacheException { Cache cache = CacheFactory.getAnyInstance(); Region region = cache.getRegion(repRegionName); - for (int j = from; j < to; j++) + for (int j = from; j < to; j++) { region.put(new Integer(j), portfolio[j]); - LogWriterUtils.getLogWriter().info( + } + logger.info( "PRQueryDUnitHelper#getCacheSerializableRunnableForPRPuts: Inserted Portfolio data on Region " + regionName); } @@ -568,21 +498,15 @@ public class QueryDataInconsistencyDUnitTest extends JUnit4CacheTestCase { switch (spot) { case 9: // Before Index update and after region entry lock. hooked = true; - LogWriterUtils.getLogWriter() + logger .info("QueryDataInconsistency.IndexManagerTestHook is hooked in Update Index Entry."); - while (hooked) { - Wait.pause(100); - } - assertEquals(hooked, false); + Awaitility.await().until(() -> !hooked); break; case 10: // Before Region update and after Index Remove call. hooked = true; - LogWriterUtils.getLogWriter() + logger .info("QueryDataInconsistency.IndexManagerTestHook is hooked in Remove Index Entry."); - while (hooked) { - Wait.pause(100); - } - assertEquals(hooked, false); + Awaitility.await().until(() -> !hooked); break; default: break; -- To stop receiving notification emails like this one, please contact udo@apache.org.