From commits-return-26789-archive-asf-public=cust-asf.ponee.io@geode.apache.org Wed Apr 25 00:54:53 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id CC8A5180679 for ; Wed, 25 Apr 2018 00:54:51 +0200 (CEST) Received: (qmail 42563 invoked by uid 500); 24 Apr 2018 22:54:50 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 42554 invoked by uid 99); 24 Apr 2018 22:54:50 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Apr 2018 22:54:50 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 2582E80AA5; Tue, 24 Apr 2018 22:54:50 +0000 (UTC) Date: Tue, 24 Apr 2018 22:54:50 +0000 To: "commits@geode.apache.org" Subject: [geode] 01/02: Revert "Revert "GEODE-925: Replaced all Wait.pause with Awaitility. Replaced all ThreadUtils.join with an Awaitility. General code and lambda clean up"" MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: udo@apache.org In-Reply-To: <152461048934.5845.2555596044854177830@gitbox.apache.org> References: <152461048934.5845.2555596044854177830@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: geode X-Git-Refname: refs/heads/develop X-Git-Reftype: branch X-Git-Rev: e80d3f4038485b7d71265690376b8c480bd8a515 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20180424225450.2582E80AA5@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. udo pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git commit e80d3f4038485b7d71265690376b8c480bd8a515 Author: Udo Kohlmeyer AuthorDate: Tue Apr 24 15:52:47 2018 -0700 Revert "Revert "GEODE-925: Replaced all Wait.pause with Awaitility. Replaced all ThreadUtils.join with an Awaitility. General code and lambda clean up"" This reverts commit 7c4c4e57cefcf49e2ea2a69250176c63ffcffc44. --- .../dunit/QueryDataInconsistencyDUnitTest.java | 745 +++++++++------------ 1 file changed, 324 insertions(+), 421 deletions(-) diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java index 20d3a78..3e6c192 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java @@ -14,10 +14,23 @@ */ package org.apache.geode.cache.query.dunit; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.fail; +import java.io.Serializable; import java.util.Properties; - +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.Logger; +import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -39,18 +52,20 @@ import org.apache.geode.cache.query.data.Position; import org.apache.geode.cache.query.internal.QueryObserverHolder; import org.apache.geode.cache.query.internal.index.IndexManager; import org.apache.geode.cache30.CacheSerializableRunnable; +import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.internal.cache.execute.PRClientServerTestBase; +import org.apache.geode.internal.logging.LogService; import org.apache.geode.test.dunit.Assert; import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.Invoke; -import org.apache.geode.test.dunit.LogWriterUtils; import org.apache.geode.test.dunit.SerializableRunnable; -import org.apache.geode.test.dunit.ThreadUtils; import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.Wait; import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.CleanupDUnitVMsRule; +import org.apache.geode.test.dunit.rules.DistributedTestRule; import org.apache.geode.test.junit.categories.DistributedTest; import org.apache.geode.test.junit.categories.FlakyTest; import org.apache.geode.test.junit.categories.OQLIndexTest; @@ -59,530 +74,418 @@ import org.apache.geode.test.junit.categories.OQLIndexTest; * This tests the data inconsistency during update on an index and querying the same UNLOCKED index. */ @Category({DistributedTest.class, OQLIndexTest.class}) -public class QueryDataInconsistencyDUnitTest extends JUnit4CacheTestCase { +public class QueryDataInconsistencyDUnitTest implements Serializable { private static final int cnt = 0; - private static final int cntDest = 10; + private static VM server = null; + private static String PartitionedRegionName1 = "TestPartitionedRegion1"; // default name + private static String repRegionName = "TestRepRegion"; // default name + private static volatile boolean hooked = false; - static VM server = null; - - static VM client = null; - - static Cache cache = null; - - static String PartitionedRegionName1 = "TestPartitionedRegion1"; // default - // name - static String repRegionName = "TestRepRegion"; // default name - - static Integer serverPort1 = null; - - public static int numOfBuckets = 20; - - public static String[] queries = - new String[] {"select * from /" + PartitionedRegionName1 + " where ID=1",}; + private Logger logger = LogService.getLogger(QueryDataInconsistencyDUnitTest.class.getName()); - public static String[] queriesForRR = - new String[] {" select * from /" + repRegionName + " where ID=1"}; + @ClassRule + public static DistributedTestRule distributedTestRule = new DistributedTestRule(1); - public static volatile boolean hooked = false; + @Rule + public CacheRule cacheRule = CacheRule.builder().createCacheInAll().disconnectAfter().build(); - public QueryDataInconsistencyDUnitTest() { - super(); + @Before + public void initialize() + { + server = Host.getHost(0).getVM(0); } - @Override - public final void postTearDownCacheTestCase() throws Exception { - Invoke.invokeInEveryVM(JUnit4DistributedTestCase::disconnectFromDS); + @After + public final void postTearDownCacheTestCase() { Invoke.invokeInEveryVM(QueryObserverHolder::reset); } - @Override - public final void postSetUp() throws Exception { - Host host = Host.getHost(0); - server = host.getVM(0); + @BeforeClass + public static final void postSetUp() { + Awaitility.setDefaultPollInterval(200, TimeUnit.MILLISECONDS); + Awaitility.waitAtMost(30, TimeUnit.SECONDS); } @Test public void testCompactRangeIndex() { // Create caches - Properties props = new Properties(); - server.invoke(() -> PRClientServerTestBase.createCacheInVm(props)); + server.invoke("create indexes", () -> { + Cache cache = cacheRule.getCache(); + Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName); - server.invoke(new CacheSerializableRunnable("create indexes") { - - @Override - public void run2() throws CacheException { - cache = CacheFactory.getAnyInstance(); - Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName); - - // Create common Portflios and NewPortfolios - for (int j = cnt; j < cntDest; j++) { - region.put(new Integer(j), new Portfolio(j)); - } + // Create common Portflios and NewPortfolios + for (int j = cnt; j < cntDest; j++) { + region.put(new Integer(j), new Portfolio(j)); + } - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - try { - Index index = qs.createIndex("idIndex", "ID", "/" + repRegionName); - assertEquals(10, index.getStatistics().getNumberOfKeys()); - } catch (Exception e) { - fail("Index creation failed"); - } + QueryService queryService = cache.getQueryService(); + try { + Index index = queryService.createIndex("idIndex", "ID", "/" + repRegionName); + assertEquals(10, index.getStatistics().getNumberOfKeys()); + } catch (Exception e) { + logger.error(e); + fail("Index creation failed"); } }); // Invoke update from client and stop in updateIndex - // first before updating the RegionEntry and second after updating + // firesultSett before updating the RegionEntry and second after updating // the RegionEntry. - AsyncInvocation putThread = - server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") { - - @Override - public void run2() throws CacheException { - Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName); - IndexManager.testHook = new IndexManagerTestHook(); - repRegion.put(new Integer("1"), new Portfolio(cntDest + 1)); - // above call must be hooked in BEFORE_UPDATE_OP call. - } - }); - server.invoke(new CacheSerializableRunnable("query on server") { - - @Override - public void run2() throws CacheException { - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - while (!hooked) { - Wait.pause(100); - } - Object rs = null; - try { - rs = qs.newQuery(" select * from /" + repRegionName + " where ID = 1").execute(); - } catch (Exception e) { - e.printStackTrace(); - fail("Query execution failed on server."); - IndexManager.testHook = null; - } - assertTrue(rs instanceof SelectResults); - assertEquals(1, ((SelectResults) rs).size()); - Portfolio p1 = (Portfolio) ((SelectResults) rs).asList().get(0); - if (p1.getID() != 1) { - fail("Query thread did not verify index results even when RE is under update"); - IndexManager.testHook = null; - } - hooked = false;// Let client put go further. + AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> { + Region repRegion = cacheRule.getCache().getRegion(repRegionName); + IndexManager.testHook = new IndexManagerTestHook(); + repRegion.put(new Integer("1"), new Portfolio(cntDest + 1)); + // above call must be hooked in BEFORE_UPDATE_OP call. + }); + server.invoke("query on server", () -> { + QueryService queryService = cacheRule.getCache().getQueryService(); + Awaitility.await().until(() -> hooked); + Object resultSet = null; + try { + resultSet = + queryService.newQuery(" select * from /" + repRegionName + " where ID = 1") + .execute(); + } catch (Exception e) { + logger.error(e); + fail("Query execution failed on server."); + IndexManager.testHook = null; } + assertTrue(resultSet instanceof SelectResults); + assertEquals(1, ((SelectResults) resultSet).size()); + Portfolio p1 = (Portfolio) ((SelectResults) resultSet).asList().get(0); + if (p1.getID() != 1) { + fail("Query thread did not verify index results even when RE is under update"); + IndexManager.testHook = null; + } + hooked = false;// Let client put go further. }); // Client put is again hooked in AFTER_UPDATE_OP call in updateIndex. - server.invoke(new CacheSerializableRunnable("query on server") { - - @Override - public void run2() throws CacheException { - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - while (!hooked) { - Wait.pause(100); - } - Object rs = null; - try { - rs = qs.newQuery(" select * from /" + repRegionName + " where ID = 1").execute(); - } catch (Exception e) { - e.printStackTrace(); - fail("Query execution failed on server." + e.getMessage()); - } finally { + server.invoke("query on server", () -> { + QueryService queryService = cacheRule.getCache().getQueryService(); + Awaitility.await().until(() -> hooked); + Object resultSet = null; + try { + resultSet = + queryService.newQuery(" select * from /" + repRegionName + " where ID = 1") + .execute(); + } catch (Exception e) { + logger.error(e); + fail("Query execution failed on server." + e.getMessage()); + } finally { + IndexManager.testHook = null; + } + assertTrue(resultSet instanceof SelectResults); + if (((SelectResults) resultSet).size() > 0) { + Portfolio p1 = (Portfolio) ((SelectResults) resultSet).iterator().next(); + if (p1.getID() != 1) { + fail("Query thread did not verify index results even when RE is under update and " + + "RegionEntry value has been modified before releasing the lock"); IndexManager.testHook = null; } - assertTrue(rs instanceof SelectResults); - if (((SelectResults) rs).size() > 0) { - Portfolio p1 = (Portfolio) ((SelectResults) rs).iterator().next(); - if (p1.getID() != 1) { - fail("Query thread did not verify index results even when RE is under update and " - + "RegionEntry value has been modified before releasing the lock"); - IndexManager.testHook = null; - } - } - hooked = false;// Let client put go further. } + hooked = false;// Let client put go further. }); - ThreadUtils.join(putThread, 200); + Awaitility.await().until(joinThread(putThread)); + // ThreadUtils.join(putThread, 200); } @Test public void testRangeIndex() { // Create caches - Properties props = new Properties(); - server.invoke(() -> PRClientServerTestBase.createCacheInVm(props)); - - server.invoke(new CacheSerializableRunnable("create indexes") { - - @Override - public void run2() throws CacheException { - cache = CacheFactory.getAnyInstance(); - Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName); - IndexManager.testHook = null; - // Create common Portfolios and NewPortfolios - Position.cnt = 0; - for (int j = cnt; j < cntDest; j++) { - Portfolio p = new Portfolio(j); - CacheFactory.getAnyInstance().getLogger().fine("Shobhit: portfolio " + j + " : " + p); - region.put(new Integer(j), p); - } + server.invoke("create indexes", () -> { + Cache cache = cacheRule.getCache(); + Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName); + IndexManager.testHook = null; + // Create common Portfolios and NewPortfolios + Position.cnt = 0; + for (int j = cnt; j < cntDest; j++) { + Portfolio p = new Portfolio(j); + cache.getLogger().fine("Shobhit: portfolio " + j + " : " + p); + region.put(j, p); + } - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - try { - Index index = qs.createIndex("posIndex", "pos.secId", - "/" + repRegionName + " p, p.positions.values pos"); - assertEquals(12, index.getStatistics().getNumberOfKeys()); - } catch (Exception e) { - fail("Index creation failed"); - } + QueryService queryService = cache.getQueryService(); + try { + Index index = queryService.createIndex("posIndex", "pos.secId", + "/" + repRegionName + " p, p.positions.values pos"); + assertEquals(12, index.getStatistics().getNumberOfKeys()); + } catch (Exception e) { + logger.error(e); + fail("Index creation failed"); } }); // Invoke update from client and stop in updateIndex - // first before updating the RegionEntry and second after updating + // firesultSett before updating the RegionEntry and second after updating // the RegionEntry. - AsyncInvocation putThread = - server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") { - - @Override - public void run2() throws CacheException { - Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName); - IndexManager.testHook = new IndexManagerTestHook(); - Portfolio newPort = new Portfolio(cntDest + 1); - CacheFactory.getAnyInstance().getLogger().fine("Shobhit: New Portfolio" + newPort); - repRegion.put(new Integer("1"), newPort); - // above call must be hooked in BEFORE_UPDATE_OP call. - } - }); - - server.invoke(new CacheSerializableRunnable("query on server") { + AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> { + Cache cache = cacheRule.getCache(); + Region repRegion = cache.getRegion(repRegionName); + IndexManager.testHook = new IndexManagerTestHook(); + Portfolio newPort = new Portfolio(cntDest + 1); + cache.getLogger().fine("Shobhit: New Portfolio" + newPort); + repRegion.put(new Integer("1"), newPort); + // above call must be hooked in BEFORE_UPDATE_OP call. + }); - @Override - public void run2() throws CacheException { - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - Position pos1 = null; - while (!hooked) { - Wait.pause(100); - } - try { - Object rs = qs.newQuery(" select pos from /" + repRegionName - + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute(); - CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs); - assertTrue(rs instanceof SelectResults); - pos1 = (Position) ((SelectResults) rs).iterator().next(); - if (!pos1.secId.equals("APPL")) { - fail("Query thread did not verify index results even when RE is under update"); - IndexManager.testHook = null; - } - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Query execution failed on server.", e); + server.invoke("query on server", () -> { + Cache cache = cacheRule.getCache(); + QueryService queryService = cache.getQueryService(); + Position pos1 = null; + Awaitility.await().until(() -> hooked); + try { + Object resultSet = queryService.newQuery(" select pos from /" + repRegionName + + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute(); + cache.getLogger().fine("Shobhit: " + resultSet); + assertTrue(resultSet instanceof SelectResults); + pos1 = (Position) ((SelectResults) resultSet).iterator().next(); + if (!pos1.secId.equals("APPL")) { + fail("Query thread did not verify index results even when RE is under update"); IndexManager.testHook = null; - } finally { - hooked = false;// Let client put go further. } - while (!hooked) { - Wait.pause(100); - } - try { - Object rs = qs.newQuery(" select pos from /" + repRegionName - + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute(); - CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs); - assertTrue(rs instanceof SelectResults); - if (((SelectResults) rs).size() > 0) { - Position pos2 = (Position) ((SelectResults) rs).iterator().next(); - if (pos2.equals(pos1)) { - fail("Query thread did not verify index results even when RE is under update and " - + "RegionEntry value has been modified before releasing the lock"); - } + } catch (Exception e) { + logger.error(e); + fail("Query execution failed on server.", e); + IndexManager.testHook = null; + } finally { + hooked = false;// Let client put go further. + } + Awaitility.await().until(() -> hooked); + try { + Object resultSet = queryService.newQuery(" select pos from /" + repRegionName + + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute(); + cache.getLogger().fine("Shobhit: " + resultSet); + assertTrue(resultSet instanceof SelectResults); + if (((SelectResults) resultSet).size() > 0) { + Position pos2 = (Position) ((SelectResults) resultSet).iterator().next(); + if (pos2.equals(pos1)) { + fail("Query thread did not verify index results even when RE is under update and " + + "RegionEntry value has been modified before releasing the lock"); } - } catch (Exception e) { - e.printStackTrace(); - fail("Query execution failed on server."); - } finally { - hooked = false;// Let client put go further. - IndexManager.testHook = null; } + } catch (Exception e) { + logger.error(e); + fail("Query execution failed on server."); + } finally { + hooked = false;// Let client put go further. + IndexManager.testHook = null; } }); - ThreadUtils.join(putThread, 200); + Awaitility.await().until(joinThread(putThread)); } @Category(FlakyTest.class) // GEODE-925: time sensitive, async actions, short timeouts @Test - public void testRangeIndexWithIndexAndQueryFromCluaseMisMatch() { // TODO: fix misspelling + public void testRangeIndexWithIndexAndQueryFromClauseMisMatch() { // Create caches - Properties props = new Properties(); - server.invoke(() -> PRClientServerTestBase.createCacheInVm(props)); - - server.invoke(new CacheSerializableRunnable("create indexes") { - - @Override - public void run2() throws CacheException { - cache = CacheFactory.getAnyInstance(); - Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName); - IndexManager.testHook = null; - // Create common Portfolios and NewPortfolios - Position.cnt = 0; - for (int j = cnt; j < cntDest; j++) { - region.put(new Integer(j), new Portfolio(j)); - } + server.invoke("create indexes", () -> { + Cache cache = cacheRule.getCache(); + Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName); + IndexManager.testHook = null; + // Create common Portfolios and NewPortfolios + Position.cnt = 0; + for (int j = cnt; j < cntDest; j++) { + region.put(j, new Portfolio(j)); + } - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - try { - Index index = qs.createIndex("posIndex", "pos.secId", "/" + repRegionName - + " p, p.collectionHolderMap.values coll, p.positions.values pos"); - assertEquals(12, index.getStatistics().getNumberOfKeys()); - } catch (Exception e) { - fail("Index creation failed"); - } + QueryService queryService = cache.getQueryService(); + try { + Index index = queryService.createIndex("posIndex", "pos.secId", + "/" + repRegionName + " p, p.collectionHolderMap.values coll, p.positions.values pos"); + assertEquals(12, index.getStatistics().getNumberOfKeys()); + } catch (Exception e) { + logger.error(e); + fail("Index creation failed"); } }); // Invoke update from client and stop in updateIndex - // first before updating the RegionEntry and second after updating + // firesultSett before updating the RegionEntry and second after updating // the RegionEntry. - AsyncInvocation putThread = - server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") { - - @Override - public void run2() throws CacheException { - Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName); - IndexManager.testHook = new IndexManagerTestHook(); - // This portfolio with same ID must have different positions. - repRegion.put(new Integer("1"), new Portfolio(1)); - // above call must be hooked in BEFORE_UPDATE_OP call. - } - }); - - server.invoke(new CacheSerializableRunnable("query on server") { + AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> { + Region repRegion = cacheRule.getCache().getRegion(repRegionName); + IndexManager.testHook = new IndexManagerTestHook(); + // This portfolio with same ID must have different positions. + repRegion.put(new Integer("1"), new Portfolio(1)); + // above call must be hooked in BEFORE_UPDATE_OP call. + }); - @Override - public void run2() throws CacheException { - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - Position pos1 = null; - while (!hooked) { - Wait.pause(100); - } - try { - Object rs = qs.newQuery(" select pos from /" + repRegionName - + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute(); - CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs); - assertTrue(rs instanceof SelectResults); - pos1 = (Position) ((SelectResults) rs).iterator().next(); - if (!pos1.secId.equals("APPL")) { - fail("Query thread did not verify index results even when RE is under update"); - IndexManager.testHook = null; - } - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Query execution failed on server.", e); + server.invoke("query on server", () -> { + Cache cache = cacheRule.getCache(); + QueryService queryService = cache.getQueryService(); + Position pos1 = null; + Awaitility.await().until(() -> hooked); + try { + Object resultSet = queryService.newQuery(" select pos from /" + repRegionName + + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute(); + cache.getLogger().fine("Shobhit: " + resultSet); + assertTrue(resultSet instanceof SelectResults); + pos1 = (Position) ((SelectResults) resultSet).iterator().next(); + if (!pos1.secId.equals("APPL")) { + fail("Query thread did not verify index results even when RE is under update"); IndexManager.testHook = null; - } finally { - hooked = false;// Let client put go further. - } - while (!hooked) { - Wait.pause(100); } - try { - Object rs = qs.newQuery("select pos from /" + repRegionName - + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute(); - assertTrue(rs instanceof SelectResults); - if (((SelectResults) rs).size() > 0) { - Position pos2 = (Position) ((SelectResults) rs).iterator().next(); - if (pos2.equals(pos1)) { - fail("Query thread did not verify index results even when RE is under update and " - + "RegionEntry value has been modified before releasing the lock"); - } + } catch (Exception e) { + logger.error(e); + fail("Query execution failed on server."); + IndexManager.testHook = null; + } finally { + hooked = false;// Let client put go further. + } + Awaitility.await().until(() -> hooked); + try { + Object resultSet = queryService.newQuery("select pos from /" + repRegionName + + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute(); + assertTrue(resultSet instanceof SelectResults); + if (((SelectResults) resultSet).size() > 0) { + Position pos2 = (Position) ((SelectResults) resultSet).iterator().next(); + if (pos2.equals(pos1)) { + fail("Query thread did not verify index results even when RE is under update and " + + "RegionEntry value has been modified before releasing the lock"); } - } catch (Exception e) { - e.printStackTrace(); - fail("Query execution failed on server."); - } finally { - hooked = false;// Let client put go further. - IndexManager.testHook = null; } + } catch (Exception e) { + logger.error(e); + fail("Query execution failed on server."); + } finally { + hooked = false;// Let client put go further. + IndexManager.testHook = null; } }); - ThreadUtils.join(putThread, 200); // GEODE-925 occurs here and this is very short join 200 - // millis + Awaitility.await().until(joinThread(putThread)); + // ThreadUtils.join(putThread, 200); // GEODE-925 occuresultSet here and this is very short join 200 + // millis } @Test - public void testRangeIndexWithIndexAndQueryFromCluaseMisMatch2() { + public void testRangeIndexWithIndexAndQueryFromClauseMisMatch2() { // Create caches - Properties props = new Properties(); - server.invoke(() -> PRClientServerTestBase.createCacheInVm(props)); - - server.invoke(new CacheSerializableRunnable("create indexes") { - - @Override - public void run2() throws CacheException { - cache = CacheFactory.getAnyInstance(); - Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName); - IndexManager.testHook = null; - // Create common Portfolios and NewPortfolios - Position.cnt = 0; - for (int j = cnt; j < cntDest; j++) { - region.put(new Integer(j), new Portfolio(j)); - } + server.invoke("create indexes", () -> { + Cache cache = cacheRule.getCache(); + Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName); + IndexManager.testHook = null; + // Create common Portfolios and NewPortfolios + Position.cnt = 0; + for (int j = cnt; j < cntDest; j++) { + region.put(new Integer(j), new Portfolio(j)); + } - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - try { - Index index = qs.createIndex("posIndex", "pos.secId", - "/" + repRegionName + " p, p.positions.values pos"); - assertEquals(12, index.getStatistics().getNumberOfKeys()); - } catch (Exception e) { - fail("Index creation failed"); - } + QueryService queryService = cache.getQueryService(); + try { + Index index = queryService.createIndex("posIndex", "pos.secId", + "/" + repRegionName + " p, p.positions.values pos"); + assertEquals(12, index.getStatistics().getNumberOfKeys()); + } catch (Exception e) { + logger.error(e); + fail("Index creation failed"); } }); // Invoke update from client and stop in updateIndex - // first before updating the RegionEntry and second after updating + // firesultSett before updating the RegionEntry and second after updating // the RegionEntry. - AsyncInvocation putThread = - server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") { - - @Override - public void run2() throws CacheException { - Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName); - IndexManager.testHook = new IndexManagerTestHook(); - // This portfolio with same ID must have different positions. - repRegion.put(new Integer("1"), new Portfolio(1)); - // above call must be hooked in BEFORE_UPDATE_OP call. - } - }); - - server.invoke(new CacheSerializableRunnable("query on server") { + AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> { + Cache cache = cacheRule.getCache(); + Region repRegion = cache.getRegion(repRegionName); + IndexManager.testHook = new IndexManagerTestHook(); + // This portfolio with same ID must have different positions. + repRegion.put(new Integer("1"), new Portfolio(1)); + // above call must be hooked in BEFORE_UPDATE_OP call. + }); - @Override - public void run2() throws CacheException { - QueryService qs = CacheFactory.getAnyInstance().getQueryService(); - Position pos1 = null; - while (!hooked) { - Wait.pause(100); - } - try { - Object rs = qs - .newQuery(" select pos from /" + repRegionName - + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1") - .execute(); - CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs); - assertTrue(rs instanceof SelectResults); - pos1 = (Position) ((SelectResults) rs).iterator().next(); - if (!pos1.secId.equals("APPL")) { - fail("Query thread did not verify index results even when RE is under update"); - IndexManager.testHook = null; - } - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Query execution failed on server.", e); + server.invoke("query on server", () -> { + Cache cache = cacheRule.getCache(); + QueryService queryService = cache.getQueryService(); + Position pos1 = null; + Awaitility.await().until(() -> hooked); + try { + Object resultSet = queryService + .newQuery(" select pos from /" + repRegionName + + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1") + .execute(); + cache.getLogger().fine("Shobhit: " + resultSet); + assertTrue(resultSet instanceof SelectResults); + pos1 = (Position) ((SelectResults) resultSet).iterator().next(); + if (!pos1.secId.equals("APPL")) { + fail("Query thread did not verify index results even when RE is under update"); IndexManager.testHook = null; - } finally { - hooked = false;// Let client put go further. - } - while (!hooked) { - Wait.pause(100); } - try { - Object rs = qs - .newQuery("select pos from /" + repRegionName - + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1") - .execute(); - assertTrue(rs instanceof SelectResults); - if (((SelectResults) rs).size() > 0) { - Position pos2 = (Position) ((SelectResults) rs).iterator().next(); - if (pos2.equals(pos1)) { - fail("Query thread did not verify index results even when RE is under update and " - + "RegionEntry value has been modified before releasing the lock"); - } + } catch (Exception e) { + logger.error(e); + Assertions.fail("Query execution failed on server.", e); + IndexManager.testHook = null; + } finally { + hooked = false;// Let client put go further. + } + Awaitility.await().until(() -> hooked); + + try { + Object resultSet = queryService + .newQuery("select pos from /" + repRegionName + + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1") + .execute(); + assertTrue(resultSet instanceof SelectResults); + if (((SelectResults) resultSet).size() > 0) { + Position pos2 = (Position) ((SelectResults) resultSet).iterator().next(); + if (pos2.equals(pos1)) { + fail("Query thread did not verify index results even when RE is under update and " + + "RegionEntry value has been modified before releasing the lock"); } - } catch (Exception e) { - e.printStackTrace(); - fail("Query execution failed on server."); - } finally { - IndexManager.testHook = null; - hooked = false;// Let client put go further. } + } catch (Exception e) { + logger.error(e); + fail("Query execution failed on server."); + } finally { + IndexManager.testHook = null; + hooked = false;// Let client put go further. } }); - ThreadUtils.join(putThread, 200); + Awaitility.await().until(joinThread(putThread)); } - public static void createProxyRegions() { - new QueryDataInconsistencyDUnitTest().createProxyRegs(); + private Callable joinThread(AsyncInvocation thread) { + return () -> { + try { + thread.join(100L); + } catch (InterruptedException e) { + return false; + } + if (thread.isAlive()) { + return false; + } + return true; + }; } private void createProxyRegs() { - ClientCache cache = (ClientCache) CacheFactory.getAnyInstance(); + ClientCache cache = (ClientCache) cacheRule.getCache(); cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(repRegionName); - - /* - * cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create( PartitionedRegionName1); - */ - } - - public static void createNewPR() { - new QueryDataInconsistencyDUnitTest().createPR(); } public void createPR() { PartitionResolver testKeyBasedResolver = new QueryAPITestPartitionResolver(); - cache = CacheFactory.getAnyInstance(); + Cache cache = cacheRule.getCache(); + int numOfBuckets = 20; cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT) .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(numOfBuckets) .setPartitionResolver(testKeyBasedResolver).create()) .create(PartitionedRegionName1); } - public static void createCacheClientWithoutRegion(String host, Integer port1) { - new QueryDataInconsistencyDUnitTest().createCacheClientWithoutReg(host, port1); - } - - private void createCacheClientWithoutReg(String host, Integer port1) { - disconnectFromDS(); - new ClientCacheFactory().addPoolServer(host, port1).create(); - } - - /** - * This function puts portfolio objects into the created Region (PR or Local) * - * - * @return cacheSerializable object - */ - public CacheSerializableRunnable getCacheSerializableRunnableForPRPuts(final String regionName, - final Object[] portfolio, final int from, final int to) { - SerializableRunnable puts = new CacheSerializableRunnable("Region Puts") { - @Override - public void run2() throws CacheException { - Cache cache = CacheFactory.getAnyInstance(); - Region region = cache.getRegion(repRegionName); - for (int j = from; j < to; j++) - region.put(new Integer(j), portfolio[j]); - LogWriterUtils.getLogWriter().info( - "PRQueryDUnitHelper#getCacheSerializableRunnableForPRPuts: Inserted Portfolio data on Region " - + regionName); - } - }; - return (CacheSerializableRunnable) puts; - } - public class IndexManagerTestHook implements org.apache.geode.cache.query.internal.index.IndexManager.TestHook { public void hook(final int spot) throws RuntimeException { switch (spot) { case 9: // Before Index update and after region entry lock. hooked = true; - LogWriterUtils.getLogWriter() + logger .info("QueryDataInconsistency.IndexManagerTestHook is hooked in Update Index Entry."); - while (hooked) { - Wait.pause(100); - } - assertEquals(hooked, false); + Awaitility.await().until(() -> !hooked); break; case 10: // Before Region update and after Index Remove call. hooked = true; - LogWriterUtils.getLogWriter() + logger .info("QueryDataInconsistency.IndexManagerTestHook is hooked in Remove Index Entry."); - while (hooked) { - Wait.pause(100); - } - assertEquals(hooked, false); + Awaitility.await().until(() -> !hooked); break; default: break; -- To stop receiving notification emails like this one, please contact udo@apache.org.