From commits-return-36278-archive-asf-public=cust-asf.ponee.io@geode.apache.org Mon Aug 3 18:08:15 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mailroute1-lw-us.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 883F0180642 for ; Mon, 3 Aug 2020 20:08:15 +0200 (CEST) Received: from mail.apache.org (localhost [127.0.0.1]) by mailroute1-lw-us.apache.org (ASF Mail Server at mailroute1-lw-us.apache.org) with SMTP id BE8FE123F20 for ; Mon, 3 Aug 2020 18:08:14 +0000 (UTC) Received: (qmail 55548 invoked by uid 500); 3 Aug 2020 18:08:14 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 55539 invoked by uid 99); 3 Aug 2020 18:08:14 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Aug 2020 18:08:14 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 541478242E; Mon, 3 Aug 2020 18:08:14 +0000 (UTC) Date: Mon, 03 Aug 2020 18:08:06 +0000 To: "commits@geode.apache.org" Subject: [geode] 04/17: GEODE-7912: cacheWriter should be triggered when PR.clear (#4882) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: jinmeiliao@apache.org In-Reply-To: <159647807941.26412.11938742912925088547@gitbox.apache.org> References: <159647807941.26412.11938742912925088547@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: geode X-Git-Refname: refs/heads/feature/GEODE-7665 X-Git-Reftype: branch X-Git-Rev: 560e3a7950ceb413e3e9f894b7cf8d039c0bd66c X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20200803180814.541478242E@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. jinmeiliao pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git commit 560e3a7950ceb413e3e9f894b7cf8d039c0bd66c Author: Xiaojian Zhou AuthorDate: Mon Mar 30 19:34:35 2020 -0700 GEODE-7912: cacheWriter should be triggered when PR.clear (#4882) Co-authored-by: Anil Co-authored-by: Xiaojian Zhou --- .../cache/PartitionedRegionClearDUnitTest.java | 228 +++++++++++++++++---- .../apache/geode/internal/cache/LocalRegion.java | 4 +- .../geode/internal/cache/PartitionedRegion.java | 56 +++-- 3 files changed, 223 insertions(+), 65 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java index fb2a81b..a5a22b9 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java @@ -20,6 +20,7 @@ import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCach import static org.assertj.core.api.Assertions.assertThat; import java.io.Serializable; +import java.util.HashMap; import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; @@ -30,13 +31,15 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.apache.geode.cache.CacheWriterException; import org.apache.geode.cache.InterestResultPolicy; import org.apache.geode.cache.PartitionAttributesFactory; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionFactory; import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.client.ClientRegionShortcut; -import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.cache.util.CacheWriterAdapter; import org.apache.geode.test.dunit.SerializableCallableIF; import org.apache.geode.test.dunit.rules.ClientVM; import org.apache.geode.test.dunit.rules.ClusterStartupRule; @@ -68,12 +71,6 @@ public class PartitionedRegionClearDUnitTest implements Serializable { c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); client2 = cluster.startClientVM(6, c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); - dataStore1.invoke(this::initDataStore); - dataStore2.invoke(this::initDataStore); - dataStore3.invoke(this::initDataStore); - accessor.invoke(this::initAccessor); - client1.invoke(this::initClientCache); - client2.invoke(this::initClientCache); } protected RegionShortcut getRegionShortCut() { @@ -104,14 +101,18 @@ public class PartitionedRegionClearDUnitTest implements Serializable { region.registerInterestForAllKeys(InterestResultPolicy.KEYS); } - private void initDataStore() { - getCache().createRegionFactory(getRegionShortCut()) - .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()) - .addCacheListener(new CountingCacheListener()) - .create(REGION_NAME); + private void initDataStore(boolean withWriter) { + RegionFactory factory = getCache().createRegionFactory(getRegionShortCut()) + .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()); + if (withWriter) { + factory.setCacheWriter(new CountingCacheWriter()); + } + factory.create(REGION_NAME); + clearsByRegion = new HashMap<>(); + destroysByRegion = new HashMap<>(); } - private void initAccessor() { + private void initAccessor(boolean withWriter) { RegionShortcut shortcut = getRegionShortCut(); if (shortcut.isPersistent()) { if (shortcut == RegionShortcut.PARTITION_PERSISTENT) { @@ -126,12 +127,16 @@ public class PartitionedRegionClearDUnitTest implements Serializable { fail("Wrong region type:" + shortcut); } } - getCache().createRegionFactory(shortcut) + RegionFactory factory = getCache().createRegionFactory(shortcut) .setPartitionAttributes( new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create()) - .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()) - .addCacheListener(new CountingCacheListener()) - .create(REGION_NAME); + .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()); + if (withWriter) { + factory.setCacheWriter(new CountingCacheWriter()); + } + factory.create(REGION_NAME); + clearsByRegion = new HashMap<>(); + destroysByRegion = new HashMap<>(); } private void feed(boolean isClient) { @@ -152,45 +157,148 @@ public class PartitionedRegionClearDUnitTest implements Serializable { // client2.invoke(()->verifyRegionSize(true, expectedNum)); } - private void verifyCacheListenerTriggerCount(MemberVM serverVM) { - SerializableCallableIF getListenerTriggerCount = () -> { - CountingCacheListener countingCacheListener = - (CountingCacheListener) getRegion(false).getAttributes() - .getCacheListeners()[0]; - return countingCacheListener.getClears(); - }; + SerializableCallableIF getWriterClears = () -> { + int clears = + clearsByRegion.get(REGION_NAME) == null ? 0 : clearsByRegion.get(REGION_NAME).get(); + return clears; + }; - int count = accessor.invoke(getListenerTriggerCount) - + dataStore1.invoke(getListenerTriggerCount) - + dataStore2.invoke(getListenerTriggerCount) - + dataStore3.invoke(getListenerTriggerCount); - assertThat(count).isEqualTo(1); + SerializableCallableIF getWriterDestroys = () -> { + int destroys = + destroysByRegion.get(REGION_NAME) == null ? 0 : destroysByRegion.get(REGION_NAME).get(); + return destroys; + }; - if (serverVM != null) { - assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1); - } + void configureServers(boolean dataStoreWithWriter, boolean accessorWithWriter) { + dataStore1.invoke(() -> initDataStore(dataStoreWithWriter)); + dataStore2.invoke(() -> initDataStore(dataStoreWithWriter)); + dataStore3.invoke(() -> initDataStore(dataStoreWithWriter)); + accessor.invoke(() -> initAccessor(accessorWithWriter)); + // make sure only datastore3 has cacheWriter + dataStore1.invoke(() -> { + Region region = getRegion(false); + region.getAttributesMutator().setCacheWriter(null); + }); + dataStore2.invoke(() -> { + Region region = getRegion(false); + region.getAttributesMutator().setCacheWriter(null); + }); + } + + @Test + public void normalClearFromDataStoreWithWriterOnDataStore() { + configureServers(true, true); + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + dataStore3.invoke(() -> getRegion(false).clear()); + verifyServerRegionSize(0); + + // do the region destroy to compare that the same callbacks will be triggered + dataStore3.invoke(() -> { + Region region = getRegion(false); + region.destroyRegion(); + }); + + assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears)) + .isEqualTo(1); + assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears)) + .isEqualTo(0); } @Test - public void normalClearFromDataStore() { + public void normalClearFromDataStoreWithoutWriterOnDataStore() { + configureServers(false, true); + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + accessor.invoke(() -> feed(false)); verifyServerRegionSize(NUM_ENTRIES); dataStore1.invoke(() -> getRegion(false).clear()); verifyServerRegionSize(0); - verifyCacheListenerTriggerCount(dataStore1); + + // do the region destroy to compare that the same callbacks will be triggered + dataStore1.invoke(() -> { + Region region = getRegion(false); + region.destroyRegion(); + }); + + assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears)) + .isEqualTo(1); } @Test - public void normalClearFromAccessor() { + public void normalClearFromAccessorWithWriterOnDataStore() { + configureServers(true, true); + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + accessor.invoke(() -> feed(false)); verifyServerRegionSize(NUM_ENTRIES); accessor.invoke(() -> getRegion(false).clear()); verifyServerRegionSize(0); - verifyCacheListenerTriggerCount(accessor); + + // do the region destroy to compare that the same callbacks will be triggered + accessor.invoke(() -> { + Region region = getRegion(false); + region.destroyRegion(); + }); + + assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears)) + .isEqualTo(1); + } + + @Test + public void normalClearFromAccessorWithoutWriterButWithWriterOnDataStore() { + configureServers(true, false); + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + accessor.invoke(() -> getRegion(false).clear()); + verifyServerRegionSize(0); + + // do the region destroy to compare that the same callbacks will be triggered + accessor.invoke(() -> { + Region region = getRegion(false); + region.destroyRegion(); + }); + + assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears)) + .isEqualTo(1); + assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears)) + .isEqualTo(0); } @Test public void normalClearFromClient() { + configureServers(true, false); + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + client1.invoke(() -> feed(true)); verifyClientRegionSize(NUM_ENTRIES); verifyServerRegionSize(NUM_ENTRIES); @@ -198,21 +306,53 @@ public class PartitionedRegionClearDUnitTest implements Serializable { client1.invoke(() -> getRegion(true).clear()); verifyServerRegionSize(0); verifyClientRegionSize(0); - verifyCacheListenerTriggerCount(null); + + // do the region destroy to compare that the same callbacks will be triggered + client1.invoke(() -> { + Region region = getRegion(true); + region.destroyRegion(); + }); + + assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears)) + .isEqualTo(1); + assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears)) + .isEqualTo(0); } - private static class CountingCacheListener extends CacheListenerAdapter { - private final AtomicInteger clears = new AtomicInteger(); + public static HashMap clearsByRegion = new HashMap<>(); + public static HashMap destroysByRegion = new HashMap<>(); + private static class CountingCacheWriter extends CacheWriterAdapter { @Override - public void afterRegionClear(RegionEvent event) { + public void beforeRegionClear(RegionEvent event) throws CacheWriterException { Region region = event.getRegion(); - logger.info("Region " + region.getFullPath() + " is cleared."); - clears.incrementAndGet(); + AtomicInteger clears = clearsByRegion.get(region.getName()); + if (clears == null) { + clears = new AtomicInteger(1); + clearsByRegion.put(region.getName(), clears); + } else { + clears.incrementAndGet(); + } + logger + .info("Region " + region.getName() + " will be cleared, clear count is:" + clears.get()); } - int getClears() { - return clears.get(); + @Override + public void beforeRegionDestroy(RegionEvent event) throws CacheWriterException { + Region region = event.getRegion(); + AtomicInteger destroys = destroysByRegion.get(region.getName()); + if (destroys == null) { + destroys = new AtomicInteger(1); + destroysByRegion.put(region.getName(), destroys); + } else { + destroys.incrementAndGet(); + } + logger.info( + "Region " + region.getName() + " will be destroyed, destroy count is:" + destroys.get()); } } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index a27e058..3580f72 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -3000,7 +3000,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, /** * @since GemFire 5.7 */ - private void serverRegionClear(RegionEventImpl regionEvent) { + protected void serverRegionClear(RegionEventImpl regionEvent) { if (regionEvent.getOperation().isDistributed()) { ServerRegionProxy mySRP = getServerProxy(); if (mySRP != null) { @@ -3121,7 +3121,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, return result; } - private void cacheWriteBeforeRegionClear(RegionEventImpl event) + void cacheWriteBeforeRegionClear(RegionEventImpl event) throws CacheWriterException, TimeoutException { // copy into local var to prevent race condition CacheWriter writer = basicGetWriter(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index fb312a2..2e17e12 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -2188,6 +2188,9 @@ public class PartitionedRegion extends LocalRegion throw cache.getCacheClosedException("Cache is shutting down"); } + // do cacheWrite + cacheWriteBeforeRegionClear(regionEvent); + // create ClearPRMessage per bucket List clearMsgList = createClearPRMessages(regionEvent.getEventId()); for (ClearPRMessage clearPRMessage : clearMsgList) { @@ -4455,6 +4458,26 @@ public class PartitionedRegion extends LocalRegion return null; } + boolean triggerWriter(RegionEventImpl event, SearchLoadAndWriteProcessor processor, int paction, + String theKey) { + CacheWriter localWriter = basicGetWriter(); + Set netWriteRecipients = localWriter == null ? this.distAdvisor.adviseNetWrite() : null; + + if (localWriter == null && (netWriteRecipients == null || netWriteRecipients.isEmpty())) { + return false; + } + + final long start = getCachePerfStats().startCacheWriterCall(); + try { + processor.initialize(this, theKey, null); + processor.doNetWrite(event, netWriteRecipients, localWriter, paction); + processor.release(); + } finally { + getCachePerfStats().endCacheWriterCall(start); + } + return true; + } + /** * This invokes a cache writer before a destroy operation. Although it has the same method * signature as the method in LocalRegion, it is invoked in a different code path. LocalRegion @@ -4464,31 +4487,26 @@ public class PartitionedRegion extends LocalRegion @Override boolean cacheWriteBeforeRegionDestroy(RegionEventImpl event) throws CacheWriterException, TimeoutException { - if (event.getOperation().isDistributed()) { serverRegionDestroy(event); - CacheWriter localWriter = basicGetWriter(); - Set netWriteRecipients = localWriter == null ? this.distAdvisor.adviseNetWrite() : null; - - if (localWriter == null && (netWriteRecipients == null || netWriteRecipients.isEmpty())) { - return false; - } - - final long start = getCachePerfStats().startCacheWriterCall(); - try { - SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor(); - processor.initialize(this, "preDestroyRegion", null); - processor.doNetWrite(event, netWriteRecipients, localWriter, - SearchLoadAndWriteProcessor.BEFOREREGIONDESTROY); - processor.release(); - } finally { - getCachePerfStats().endCacheWriterCall(start); - } - return true; + SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor(); + return triggerWriter(event, processor, SearchLoadAndWriteProcessor.BEFOREREGIONDESTROY, + "preDestroyRegion"); } return false; } + @Override + void cacheWriteBeforeRegionClear(RegionEventImpl event) + throws CacheWriterException, TimeoutException { + if (event.getOperation().isDistributed()) { + serverRegionClear(event); + SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor(); + triggerWriter(event, processor, SearchLoadAndWriteProcessor.BEFOREREGIONCLEAR, + "preClearRegion"); + } + } + /** * Test Method: Get the DistributedMember identifier for the vm containing a key *