From commits-return-26558-archive-asf-public=cust-asf.ponee.io@geode.apache.org Fri Apr 13 03:30:28 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 55BFA18067B for ; Fri, 13 Apr 2018 03:30:28 +0200 (CEST) Received: (qmail 59477 invoked by uid 500); 13 Apr 2018 01:30:27 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 59468 invoked by uid 99); 13 Apr 2018 01:30:27 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Apr 2018 01:30:27 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 76ECF85131; Fri, 13 Apr 2018 01:30:26 +0000 (UTC) Date: Fri, 13 Apr 2018 01:30:26 +0000 To: "commits@geode.apache.org" Subject: [geode] 01/01: GEODE-5056: when found the dropped events at primary sender, send QueueRemovalMessage for it MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: zhouxj@apache.org In-Reply-To: <152358302560.17508.6337831456467209204@gitbox.apache.org> References: <152358302560.17508.6337831456467209204@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: geode X-Git-Refname: refs/heads/feature/GEODE-5056 X-Git-Reftype: branch X-Git-Rev: 2b31ba8e3c1fe08d33e7fa2361dbe6ce2fedd687 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20180413013026.76ECF85131@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-5056 in repository https://gitbox.apache.org/repos/asf/geode.git commit 2b31ba8e3c1fe08d33e7fa2361dbe6ce2fedd687 Author: zhouxh AuthorDate: Thu Apr 12 17:44:16 2018 -0700 GEODE-5056: when found the dropped events at primary sender, send QueueRemovalMessage for it --- .../cache/wan/AbstractGatewaySenderEventProcessor.java | 2 +- .../cache/wan/parallel/ParallelGatewaySenderQueue.java | 17 ++++++++++++++++- .../ParallelGatewaySenderOperationsDUnitTest.java | 2 -- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index eea7480..34d511c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -304,7 +304,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId); boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary(); if (isPrimary) { - pgsq.addRemovedEvent(prQ, bucketId, shadowKey); + pgsq.sendQueueRemovalMesssageForDroppedEvent(prQ, bucketId, shadowKey); this.sender.getStatistics().incEventsNotQueuedAtYetRunningPrimarySender(); if (logger.isDebugEnabled()) { logger.debug("register dropped event for primary queue. BucketId is " + bucketId diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 89880fc..cdb33ab 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -1112,7 +1112,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { // This method may need synchronization in case it is used by // ConcurrentParallelGatewaySender - public void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) { + protected void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) { StoppableReentrantLock lock = buckToDispatchLock; if (lock != null) { lock.lock(); @@ -1133,6 +1133,21 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } } + public void sendQueueRemovalMesssageForDroppedEvent(PartitionedRegion prQ, int bucketId, + Object key) { + final HashMap> temp = new HashMap>(); + Map bucketIdToDispatchedKeys = new ConcurrentHashMap(); + temp.put(prQ.getFullPath(), bucketIdToDispatchedKeys); + addRemovedEventToMap(bucketIdToDispatchedKeys, bucketId, key); + Set recipients = + removalThread.getAllRecipients(sender.getCache(), temp); + if (!recipients.isEmpty()) { + ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage(temp); + pqrm.setRecipients(recipients); + sender.getCache().getInternalDistributedSystem().getDistributionManager().putOutgoing(pqrm); + } + } + private void addRemovedEventToMap(Map bucketIdToDispatchedKeys, int bucketId, Object key) { List dispatchedKeys = (List) bucketIdToDispatchedKeys.get(bucketId); if (dispatchedKeys == null) { diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java index a1808f5..f5b98b7 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java @@ -37,7 +37,6 @@ import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.RMIException; import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; import org.apache.geode.test.junit.categories.DistributedTest; -import org.apache.geode.test.junit.categories.FlakyTest; import org.apache.geode.test.junit.categories.WanTest; /** @@ -377,7 +376,6 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { * case in the way that when the sender is starting from stopped state, puts are simultaneously * happening on the region by another thread. */ - @Category({FlakyTest.class, WanTest.class}) // GEODE-5056 @Test public void testParallelPropagationSenderStartAfterStop_Scenario2() throws Exception { addIgnoredException("Broken pipe"); -- To stop receiving notification emails like this one, please contact zhouxj@apache.org.