ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [2/3] ignite git commit: IGNITE-2004 Fixed review notes.
Date Wed, 20 Apr 2016 10:46:29 GMT
IGNITE-2004 Fixed review notes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9a456e43
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9a456e43
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9a456e43

Branch: refs/heads/ignite-2004
Commit: 9a456e4385eeecf9134ceb1d99322ead2f1ab9ae
Parents: 89a84f1
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Wed Apr 20 13:14:14 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Wed Apr 20 13:14:14 2016 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      |  4 +-
 .../continuous/CacheContinuousQueryHandler.java | 87 ++++++++------------
 .../thread/IgniteStripedThreadPoolExecutor.java | 22 ++---
 3 files changed, 42 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9a456e43/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 7b9f0ff..d04a760 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -43,6 +43,7 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lifecycle.LifecycleBean;
@@ -726,11 +727,12 @@ public class IgniteConfiguration {
     }
 
     /**
-     * Size of thread pool that is in charge of processing continuous query events.
+     * Size of thread pool that is in charge of processing asynchronous callbacks.
      * <p>
      * If not provided, executor service will have size {@link #DFLT_PUBLIC_THREAD_CNT}.
      *
      * @return Thread pool size to be used
+     * @see IgniteAsyncCallback
      */
     public int getAsyncCallbackPoolSize() {
         return callbackPoolSize;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a456e43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 8ef3d43..a0fe850 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -582,60 +582,47 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         final List<CacheContinuousQueryEntry> entries = (List<CacheContinuousQueryEntry>)objs;
 
-        if (!entries.isEmpty()) {
-            if (asyncCallback) {
-                int partId = entries.get(0).partition();
+        if (entries.isEmpty())
+            return;
 
-                if (entries.size() != 1) {
-                    Map<Integer, Collection<CacheContinuousQueryEntry>> entriesByPart
= null;
+        if (asyncCallback) {
+            int partId = entries.get(0).partition();
 
-                    for (int i = 0; i < entries.size(); i++) {
-                        int curPart = entries.get(i).partition();
+            int startIdx = 0;
 
-                        // If all entries from one partition avoid creation new collections.
-                        if (curPart == partId && entriesByPart == null)
-                            continue;
+            if (entries.size() != 1) {
+                for (int i = 1; i < entries.size(); i++) {
+                    int curPart = entries.get(i).partition();
 
-                        if (entriesByPart == null) {
-                            entriesByPart = new HashMap<>();
+                    // If all entries from one partition avoid creation new collections.
+                    if (curPart == partId)
+                        continue;
 
-                            entriesByPart.put(partId, new ArrayList<>(entries.subList(0,
i)));
-                        }
-
-                        Collection<CacheContinuousQueryEntry> entries0 = entriesByPart.get(curPart);
-
-                        if (entries0 == null) {
-                            entries0 = new ArrayList<>(entries.size() - i);
+                    final int i0 = i;
+                    final int startIdx0 = startIdx;
 
-                            entriesByPart.put(curPart, entries0);
+                    ctx.asyncCallbackPool().execute(new Runnable() {
+                        @Override public void run() {
+                            notifyCallback0(nodeId, ctx, entries.subList(startIdx0, i0));
                         }
+                    }, partId);
 
-                        entries0.add(entries.get(i));
-                    }
+                    startIdx = i0;
+                    partId = curPart;
+                }
+            }
 
-                    if (entriesByPart != null) {
-                        for (final Map.Entry<Integer, Collection<CacheContinuousQueryEntry>>
e :
-                            entriesByPart.entrySet()) {
-                            ctx.asyncCallbackPool().execute(new Runnable() {
-                                @Override public void run() {
-                                    notifyCallback0(nodeId, ctx, e.getValue());
-                                }
-                            }, e.getKey());
-                        }
+            final int startIdx0 = startIdx;
 
-                        return;
-                    }
+            ctx.asyncCallbackPool().execute(new Runnable() {
+                @Override public void run() {
+                    notifyCallback0(nodeId, ctx,
+                        startIdx0 == 0 ? entries : entries.subList(startIdx0, entries.size()));
                 }
-
-                ctx.asyncCallbackPool().execute(new Runnable() {
-                    @Override public void run() {
-                        notifyCallback0(nodeId, ctx, entries);
-                    }
-                }, partId);
-            }
-            else
-                notifyCallback0(nodeId, ctx, entries);
+            }, partId);
         }
+        else
+            notifyCallback0(nodeId, ctx, entries);
     }
 
     /**
@@ -648,7 +635,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         Collection<CacheContinuousQueryEntry> entries) {
         final GridCacheContext cctx = cacheContext(ctx);
 
-        final Collection<CacheEntryEvent<? extends K, ? extends V>> entries0
= new ArrayList<>();
+        final Collection<CacheEntryEvent<? extends K, ? extends V>> entries0
= new ArrayList<>(entries.size());
 
         for (CacheContinuousQueryEntry e : entries) {
             GridCacheDeploymentManager depMgr = cctx.deploy();
@@ -701,8 +688,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             if (e.isFiltered())
                 return Collections.emptyList();
             else
-                return F.<CacheEntryEvent<? extends K, ? extends V>>asList(
-                    new CacheContinuousQueryEvent<K, V>(cache, cctx, e));
+                return F.<CacheEntryEvent<? extends K, ? extends V>>asList(new
CacheContinuousQueryEvent<K, V>(cache, cctx, e));
         }
 
         // Initial query entry or evicted entry. These events should be fired immediately.
@@ -738,15 +724,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (!notify)
             entry.markFiltered();
 
-        if (!primary) {
-            if (!internal) {
-                // Skip init query and expire entries.
-                if (entry.updateCounter() != -1L) {
-                    entry.markBackup();
+        if (!primary && !internal && entry.updateCounter() != -1L /* Skip
init query and expire entries */) {
+            entry.markBackup();
 
-                    backupQueue.add(entry);
-                }
-            }
+            backupQueue.add(entry);
         }
 
         return notify;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a456e43/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
index 5876b08..183f970 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
@@ -19,6 +19,7 @@ package org.apache.ignite.thread;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -78,7 +79,10 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService
{
 
     /** {@inheritDoc} */
     @Override public List<Runnable> shutdownNow() {
-        List<Runnable> res = new ArrayList<>();
+        if (execs.length == 0)
+            return Collections.emptyList();
+
+        List<Runnable> res = new ArrayList<>(execs.length);
 
         for (ExecutorService exec : execs) {
             for (Runnable r : exec.shutdownNow())
@@ -120,29 +124,21 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService
{
 
     /** {@inheritDoc} */
     @NotNull @Override public <T> Future<T> submit(Callable<T> task) {
-        assert false;
-
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @NotNull @Override public <T> Future<T> submit(Runnable task, T res) {
-        assert false;
-
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @NotNull @Override public Future<?> submit(Runnable task) {
-        assert false;
-
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @NotNull @Override public <T> List<Future<T>> invokeAll(Collection<?
extends Callable<T>> tasks) {
-        assert false;
-
         throw new UnsupportedOperationException();
     }
 
@@ -150,29 +146,21 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService
{
     @NotNull @Override public <T> List<Future<T>> invokeAll(Collection<?
extends Callable<T>> tasks,
         long timeout,
         TimeUnit unit) {
-        assert false;
-
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @NotNull @Override public <T> T invokeAny(Collection<? extends Callable<T>>
tasks) {
-        assert false;
-
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public <T> T invokeAny(Collection<? extends Callable<T>>
tasks, long timeout, TimeUnit unit) {
-        assert false;
-
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public void execute(Runnable cmd) {
-        assert false;
-
         throw new UnsupportedOperationException();
     }
 


Mime
View raw message