Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 38266200B2F for ; Sun, 3 Jul 2016 19:52:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 36ACB160A6B; Sun, 3 Jul 2016 17:52:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5875F160A50 for ; Sun, 3 Jul 2016 19:52:53 +0200 (CEST) Received: (qmail 90667 invoked by uid 500); 3 Jul 2016 17:52:52 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 90658 invoked by uid 99); 3 Jul 2016 17:52:52 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Jul 2016 17:52:52 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 230591A56D2 for ; Sun, 3 Jul 2016 17:52:52 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 5nmjFwdYj4AC for ; Sun, 3 Jul 2016 17:52:50 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 73A115F572 for ; Sun, 3 Jul 2016 17:52:49 +0000 (UTC) Received: (qmail 90582 invoked by uid 99); 3 Jul 2016 17:52:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Jul 2016 17:52:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 393B1E0278; Sun, 3 Jul 2016 17:52:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jasonhuynh@apache.org To: commits@geode.incubator.apache.org Message-Id: <5d3535f2259a4c47b6c782aee11accdd@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-geode git commit: Revert "GEODE-1588: AckReader and Dispatching thread are shut down before sending gateway sender close connection messages" Date: Sun, 3 Jul 2016 17:52:48 +0000 (UTC) archived-at: Sun, 03 Jul 2016 17:52:54 -0000 Repository: incubator-geode Updated Branches: refs/heads/develop ec3555f16 -> 96f3af9de Revert "GEODE-1588: AckReader and Dispatching thread are shut down before sending gateway sender close connection messages" This reverts commit 8899fc8d744bfd3060bafd17b1b33e02c7db9e5f. This checkin causes some instability with the wan dunit tests Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/96f3af9d Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/96f3af9d Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/96f3af9d Branch: refs/heads/develop Commit: 96f3af9dec1993cdd2d2109d6a711a74a56afcc3 Parents: ec3555f Author: Jason Huynh Authored: Sun Jul 3 10:52:28 2016 -0700 Committer: Jason Huynh Committed: Sun Jul 3 10:52:28 2016 -0700 ---------------------------------------------------------------------- .../AbstractGatewaySenderEventProcessor.java | 5 +-- ...rentParallelGatewaySenderEventProcessor.java | 8 ++--- ...urrentSerialGatewaySenderEventProcessor.java | 5 ++- .../wan/GatewaySenderEventRemoteDispatcher.java | 34 ++++---------------- 4 files changed, 15 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/96f3af9d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index e3e1a9e..ce08e8d 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -1150,9 +1150,10 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { } } } - - setIsStopped(true); + dispatcher.stop(); + //set isStopped to true + setIsStopped(true); if (this.isAlive()) { this.interrupt(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/96f3af9d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java index 82a53d3..07a3be5 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java @@ -237,9 +237,6 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew if (!this.isAlive()) { return; } - - setIsStopped(true); - final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup .createThreadGroup("ConcurrentParallelGatewaySenderEventProcessor Logger Group", logger); @@ -251,12 +248,12 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew return thread; } }; - + List stopperCallables = new ArrayList(); for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) { stopperCallables.add(new SenderStopperCallable(parallelProcessor)); } - + ExecutorService stopperService = Executors.newFixedThreadPool(processors.length, threadFactory); try { List> futures = stopperService.invokeAll(stopperCallables); @@ -278,6 +275,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew throw rejectedExecutionEx; } + setIsStopped(true); stopperService.shutdown(); closeProcessor(); if (logger.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/96f3af9d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java index a557ce1..ff810ec 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java @@ -268,8 +268,6 @@ public class ConcurrentSerialGatewaySenderEventProcessor extends return; } - setIsStopped(true); - final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup .createThreadGroup( "ConcurrentSerialGatewaySenderEventProcessor Logger Group", @@ -314,7 +312,8 @@ public class ConcurrentSerialGatewaySenderEventProcessor extends } //shutdown the stopperService. This will release all the stopper threads stopperService.shutdown(); - + setIsStopped(true); + closeProcessor(); if (logger.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/96f3af9d/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java index 3948484..b178192 100644 --- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java +++ b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java @@ -301,9 +301,6 @@ public class GatewaySenderEventRemoteDispatcher implements * @throws GatewaySenderException */ public Connection getConnection(boolean startAckReaderThread) throws GatewaySenderException{ - if (this.processor.isStopped()) { - return null; - } // IF the connection is null // OR the connection's ServerLocation doesn't match with the one stored in sender // THEN initialize the connection @@ -346,7 +343,7 @@ public class GatewaySenderEventRemoteDispatcher implements if (con != null) { if (!con.isDestroyed()) { con.destroy(); - this.sender.getProxy().returnConnection(con); + this.sender.getProxy().returnConnection(con); } // Reset the connection so the next time through a new one will be @@ -367,9 +364,6 @@ public class GatewaySenderEventRemoteDispatcher implements */ private void initializeConnection() throws GatewaySenderException, GemFireSecurityException { - if (this.processor.isStopped()) { - return; - } this.connectionLifeCycleLock.writeLock().lock(); try { // Attempt to acquire a connection @@ -631,9 +625,9 @@ public class GatewaySenderEventRemoteDispatcher implements } } else { // If we have received IOException. - if (logger.isDebugEnabled()) { + // if (logger.isDebugEnabled()) { logger.debug("{}: Received null ack from remote site.", processor.getSender()); - } + //} processor.handleException(); try { // This wait is before trying to getting new connection to // receive ack. Without this there will be continuous call to @@ -729,11 +723,9 @@ public class GatewaySenderEventRemoteDispatcher implements // not. No need to take lock as the reader thread may be blocked and we might not // get chance to destroy unless that returns. if (connection != null) { - Connection conn = connection; - shutDownAckReaderConnection(); - if (!conn.isDestroyed()) { - conn.destroy(); - sender.getProxy().returnConnection(conn); + if (!connection.isDestroyed()) { + connection.destroy(); + sender.getProxy().returnConnection(connection); } } this.shutdown = true; @@ -751,24 +743,12 @@ public class GatewaySenderEventRemoteDispatcher implements logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_ACKREADERTHREAD_IGNORED_CANCELLATION)); } } - - private void shutDownAckReaderConnection() { - Connection conn = connection; - //attempt to unblock the ackreader thread by shutting down the inputStream, if it was stuck on a read - try { - if (conn != null && conn.getSocket() != null) { - conn.getSocket().shutdownInput(); - } - } catch (IOException e) { - logger.warn("Unable to shutdown AckReaderThread Connection"); - } - } } public void stopAckReaderThread() { if (this.ackReaderThread != null) { this.ackReaderThread.shutdown(); - } + } } @Override