Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 712BC19283 for ; Wed, 20 Apr 2016 10:46:29 +0000 (UTC) Received: (qmail 25910 invoked by uid 500); 20 Apr 2016 10:46:29 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 25874 invoked by uid 500); 20 Apr 2016 10:46:29 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 25799 invoked by uid 99); 20 Apr 2016 10:46:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Apr 2016 10:46:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B3CF5DFE16; Wed, 20 Apr 2016 10:46:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ntikhonov@apache.org To: commits@ignite.apache.org Date: Wed, 20 Apr 2016 10:46:29 -0000 Message-Id: <12aebdc979fb4bf481ff58d3bdb9c08d@git.apache.org> In-Reply-To: <75055a0c1e9e4740b3a8e274d3957a1a@git.apache.org> References: <75055a0c1e9e4740b3a8e274d3957a1a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] ignite git commit: IGNITE-2004 Fixed review notes. IGNITE-2004 Fixed review notes. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9a456e43 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9a456e43 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9a456e43 Branch: refs/heads/ignite-2004 Commit: 9a456e4385eeecf9134ceb1d99322ead2f1ab9ae Parents: 89a84f1 Author: nikolay_tikhonov Authored: Wed Apr 20 13:14:14 2016 +0300 Committer: nikolay_tikhonov Committed: Wed Apr 20 13:14:14 2016 +0300 ---------------------------------------------------------------------- .../configuration/IgniteConfiguration.java | 4 +- .../continuous/CacheContinuousQueryHandler.java | 87 ++++++++------------ .../thread/IgniteStripedThreadPoolExecutor.java | 22 ++--- 3 files changed, 42 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9a456e43/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 7b9f0ff..d04a760 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -43,6 +43,7 @@ import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lifecycle.LifecycleBean; @@ -726,11 +727,12 @@ public class IgniteConfiguration { } /** - * Size of thread pool that is in charge of processing continuous query events. + * Size of thread pool that is in charge of processing asynchronous callbacks. *

* If not provided, executor service will have size {@link #DFLT_PUBLIC_THREAD_CNT}. * * @return Thread pool size to be used + * @see IgniteAsyncCallback */ public int getAsyncCallbackPoolSize() { return callbackPoolSize; http://git-wip-us.apache.org/repos/asf/ignite/blob/9a456e43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 8ef3d43..a0fe850 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -582,60 +582,47 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler final List entries = (List)objs; - if (!entries.isEmpty()) { - if (asyncCallback) { - int partId = entries.get(0).partition(); + if (entries.isEmpty()) + return; - if (entries.size() != 1) { - Map> entriesByPart = null; + if (asyncCallback) { + int partId = entries.get(0).partition(); - for (int i = 0; i < entries.size(); i++) { - int curPart = entries.get(i).partition(); + int startIdx = 0; - // If all entries from one partition avoid creation new collections. - if (curPart == partId && entriesByPart == null) - continue; + if (entries.size() != 1) { + for (int i = 1; i < entries.size(); i++) { + int curPart = entries.get(i).partition(); - if (entriesByPart == null) { - entriesByPart = new HashMap<>(); + // If all entries from one partition avoid creation new collections. + if (curPart == partId) + continue; - entriesByPart.put(partId, new ArrayList<>(entries.subList(0, i))); - } - - Collection entries0 = entriesByPart.get(curPart); - - if (entries0 == null) { - entries0 = new ArrayList<>(entries.size() - i); + final int i0 = i; + final int startIdx0 = startIdx; - entriesByPart.put(curPart, entries0); + ctx.asyncCallbackPool().execute(new Runnable() { + @Override public void run() { + notifyCallback0(nodeId, ctx, entries.subList(startIdx0, i0)); } + }, partId); - entries0.add(entries.get(i)); - } + startIdx = i0; + partId = curPart; + } + } - if (entriesByPart != null) { - for (final Map.Entry> e : - entriesByPart.entrySet()) { - ctx.asyncCallbackPool().execute(new Runnable() { - @Override public void run() { - notifyCallback0(nodeId, ctx, e.getValue()); - } - }, e.getKey()); - } + final int startIdx0 = startIdx; - return; - } + ctx.asyncCallbackPool().execute(new Runnable() { + @Override public void run() { + notifyCallback0(nodeId, ctx, + startIdx0 == 0 ? entries : entries.subList(startIdx0, entries.size())); } - - ctx.asyncCallbackPool().execute(new Runnable() { - @Override public void run() { - notifyCallback0(nodeId, ctx, entries); - } - }, partId); - } - else - notifyCallback0(nodeId, ctx, entries); + }, partId); } + else + notifyCallback0(nodeId, ctx, entries); } /** @@ -648,7 +635,7 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler Collection entries) { final GridCacheContext cctx = cacheContext(ctx); - final Collection> entries0 = new ArrayList<>(); + final Collection> entries0 = new ArrayList<>(entries.size()); for (CacheContinuousQueryEntry e : entries) { GridCacheDeploymentManager depMgr = cctx.deploy(); @@ -701,8 +688,7 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler if (e.isFiltered()) return Collections.emptyList(); else - return F.>asList( - new CacheContinuousQueryEvent(cache, cctx, e)); + return F.>asList(new CacheContinuousQueryEvent(cache, cctx, e)); } // Initial query entry or evicted entry. These events should be fired immediately. @@ -738,15 +724,10 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler if (!notify) entry.markFiltered(); - if (!primary) { - if (!internal) { - // Skip init query and expire entries. - if (entry.updateCounter() != -1L) { - entry.markBackup(); + if (!primary && !internal && entry.updateCounter() != -1L /* Skip init query and expire entries */) { + entry.markBackup(); - backupQueue.add(entry); - } - } + backupQueue.add(entry); } return notify; http://git-wip-us.apache.org/repos/asf/ignite/blob/9a456e43/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java index 5876b08..183f970 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java @@ -19,6 +19,7 @@ package org.apache.ignite.thread; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -78,7 +79,10 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService { /** {@inheritDoc} */ @Override public List shutdownNow() { - List res = new ArrayList<>(); + if (execs.length == 0) + return Collections.emptyList(); + + List res = new ArrayList<>(execs.length); for (ExecutorService exec : execs) { for (Runnable r : exec.shutdownNow()) @@ -120,29 +124,21 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService { /** {@inheritDoc} */ @NotNull @Override public Future submit(Callable task) { - assert false; - throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @NotNull @Override public Future submit(Runnable task, T res) { - assert false; - throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @NotNull @Override public Future submit(Runnable task) { - assert false; - throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @NotNull @Override public List> invokeAll(Collection> tasks) { - assert false; - throw new UnsupportedOperationException(); } @@ -150,29 +146,21 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService { @NotNull @Override public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) { - assert false; - throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @NotNull @Override public T invokeAny(Collection> tasks) { - assert false; - throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) { - assert false; - throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public void execute(Runnable cmd) { - assert false; - throw new UnsupportedOperationException(); }