Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8170318D06 for ; Wed, 19 Aug 2015 07:40:55 +0000 (UTC) Received: (qmail 69847 invoked by uid 500); 19 Aug 2015 07:40:21 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 69817 invoked by uid 500); 19 Aug 2015 07:40:21 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 69808 invoked by uid 99); 19 Aug 2015 07:40:21 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Aug 2015 07:40:21 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id BCFD7DF0E7 for ; Wed, 19 Aug 2015 07:40:20 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.419 X-Spam-Level: * X-Spam-Status: No, score=1.419 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.381] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id Q_J3bOdXnd0P for ; Wed, 19 Aug 2015 07:40:05 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 70B0F215D9 for ; Wed, 19 Aug 2015 07:39:56 +0000 (UTC) Received: (qmail 67545 invoked by uid 99); 19 Aug 2015 07:39:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Aug 2015 07:39:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 18121E0F7D; Wed, 19 Aug 2015 07:39:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 19 Aug 2015 07:40:20 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [26/50] incubator-ignite git commit: Fixed threads cleanup in continuous processor Fixed threads cleanup in continuous processor Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ba3abcec Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ba3abcec Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ba3abcec Branch: refs/heads/ignite-gg-9615-1 Commit: ba3abceca10a1745253a3c28e7a6fe6f5833d266 Parents: 6697b0c Author: Valentin Kulichenko Authored: Thu Aug 13 15:50:39 2015 -0700 Committer: Valentin Kulichenko Committed: Thu Aug 13 15:50:39 2015 -0700 ---------------------------------------------------------------------- .../continuous/GridContinuousProcessor.java | 16 +++++++++++----- .../GridCacheContinuousQueryAbstractSelfTest.java | 2 +- 2 files changed, 12 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ba3abcec/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 5f1c4bb..a360e35 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -29,7 +29,6 @@ import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.timeout.*; -import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; @@ -72,7 +71,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { private final ConcurrentMap stopFuts = new ConcurrentHashMap8<>(); /** Threads started by this processor. */ - private final Collection threads = new GridConcurrentHashSet<>(); + private final Map bufCheckThreads = new ConcurrentHashMap8<>(); /** */ private final ConcurrentMap syncMsgFuts = new ConcurrentHashMap8<>(); @@ -311,8 +310,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ctx.io().removeMessageListener(TOPIC_CONTINUOUS); - U.interrupt(threads); - U.joinThreads(threads, log); + for (IgniteThread thread : bufCheckThreads.values()) { + U.interrupt(thread); + U.join(thread); + } if (log.isDebugEnabled()) log.debug("Continuous processor stopped."); @@ -915,7 +916,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } }); - threads.add(checker); + bufCheckThreads.put(routineId, checker); checker.start(); } @@ -947,6 +948,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ctx.io().removeMessageListener(hnd.orderedTopic()); hnd.unregister(routineId, ctx); + + IgniteThread checker = bufCheckThreads.remove(routineId); + + if (checker != null) + checker.interrupt(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ba3abcec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index 4681071..7b628b4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@ -29,7 +29,6 @@ import org.apache.ignite.internal.processors.datastructures.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.optimized.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; @@ -177,6 +176,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size()); assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "startFuts")).size()); assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "stopFuts")).size()); + assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "bufCheckThreads")).size()); CacheContinuousQueryManager mgr = grid(i).context().cache().internalCache().context().continuousQueries();