ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject ignite git commit: IGNITE-2004 Fixed review notes.
Date Thu, 14 Apr 2016 11:56:08 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2004 ff595dbe7 -> 970861673


IGNITE-2004 Fixed review notes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/97086167
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/97086167
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/97086167

Branch: refs/heads/ignite-2004
Commit: 970861673ac7f6b8451d5c60a687ff49bf89d21b
Parents: ff595db
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Thu Apr 14 14:56:03 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Thu Apr 14 14:56:03 2016 +0300

----------------------------------------------------------------------
 .../ignite/cache/query/ContinuousQuery.java     |  9 +--
 .../internal/GridEventConsumeHandler.java       |  4 +-
 .../internal/GridMessageListenHandler.java      |  4 +-
 .../continuous/CacheContinuousQueryHandler.java | 60 +++++++++++++-------
 .../continuous/GridContinuousHandler.java       |  4 +-
 .../continuous/GridContinuousProcessor.java     |  3 +-
 .../apache/ignite/lang/IgniteAsyncCallback.java |  6 +-
 7 files changed, 56 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/97086167/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index cb5b05e..b37c899 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -175,10 +175,11 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K,
V>> {
      * synchronization or transactional cache operations), should be executed asynchronously
without
      * blocking the thread that called the callback. Otherwise, you can get deadlocks.
      * * <p>
-     * If listener implements {@link IgniteAsyncCallback} marker interface then cache operations
are allowed.
-     * see {@link IgniteAsyncCallback}.
+     * If listener has {@link IgniteAsyncCallback} annotation then cache operations are allowed.
+     *
      *
      * @param locLsnr Local callback.
+     * @see IgniteAsyncCallback
      * @return {@code this} for chaining.
      */
     public ContinuousQuery<K, V> setLocalListener(CacheEntryUpdatedListener<K, V>
locLsnr) {
@@ -232,10 +233,10 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K,
V>> {
      * (e.g., synchronization or transactional cache operations), should be executed asynchronously
      * without blocking the thread that called the filter. Otherwise, you can get deadlocks.
      * <p>
-     * If filter implements {@link IgniteAsyncCallback} marker interface then cache operations
are allowed.
-     * see {@link IgniteAsyncCallback}.
+     * If filter has {@link IgniteAsyncCallback} annotation then cache operations are allowed.
      *
      * @param rmtFilterFactory Key-value filter factory.
+     * @see IgniteAsyncCallback
      * @return {@code this} for chaining.
      */
     public ContinuousQuery<K, V> setRemoteFilterFactory(

http://git-wip-us.apache.org/repos/asf/ignite/blob/97086167/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 19bf1a7..cc656f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -21,8 +21,8 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.util.Collection;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
@@ -301,7 +301,7 @@ class GridEventConsumeHandler implements GridContinuousHandler {
      * @param nodeId Node ID.
      * @param objs Notification objects.
      */
-    @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?>
objs, GridKernalContext ctx) {
+    @Override public void notifyCallback(UUID nodeId, UUID routineId, List<?> objs,
GridKernalContext ctx) {
         assert nodeId != null;
         assert routineId != null;
         assert objs != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/97086167/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 0ac6877..70b9da7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -21,7 +21,7 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
@@ -149,7 +149,7 @@ public class GridMessageListenHandler implements GridContinuousHandler
{
     }
 
     /** {@inheritDoc} */
-    @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?>
objs, GridKernalContext ctx) {
+    @Override public void notifyCallback(UUID nodeId, UUID routineId, List<?> objs,
GridKernalContext ctx) {
         assert false;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/97086167/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index e2ee7c5..294a3ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -573,47 +573,65 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     @SuppressWarnings("unchecked")
     @Override public void notifyCallback(final UUID nodeId,
         final UUID routineId,
-        Collection<?> objs,
+        List<?> objs,
         final GridKernalContext ctx) {
         assert nodeId != null;
         assert routineId != null;
         assert objs != null;
         assert ctx != null;
 
-        final Collection<CacheContinuousQueryEntry> entries = (Collection<CacheContinuousQueryEntry>)objs;
+        final List<CacheContinuousQueryEntry> entries = (List<CacheContinuousQueryEntry>)objs;
 
         if (!entries.isEmpty()) {
             if (asyncCallback) {
+                int partId = entries.get(0).partition();
+
                 if (entries.size() != 1) {
-                    Map<Integer, Collection<CacheContinuousQueryEntry>> entriesByPart
= new HashMap<>();
+                    Map<Integer, Collection<CacheContinuousQueryEntry>> entriesByPart
= null;
+
+                    for (int i = 0; i < entries.size(); i++) {
+                        int curPart = entries.get(i).partition();
+
+                        // If all entries from one partition avoid creation new collections.
+                        if (curPart == partId && entriesByPart == null)
+                            continue;
+
+                        if (entriesByPart == null) {
+                            entriesByPart = new HashMap<>();
 
-                    for (CacheContinuousQueryEntry e : entries) {
-                        Collection<CacheContinuousQueryEntry> ents = entriesByPart.get(e.partition());
+                            entriesByPart.put(partId, entries.subList(0, i));
+                        }
+
+                        Collection<CacheContinuousQueryEntry> ents = entriesByPart.get(curPart);
 
                         if (ents == null) {
-                            ents = new ArrayList<>(entries.size());
+                            ents = new ArrayList<>(entries.size() - i);
 
-                            entriesByPart.put(e.partition(), ents);
+                            entriesByPart.put(curPart, ents);
                         }
 
-                        ents.add(e);
+                        ents.add(entries.get(i));
                     }
 
-                    for (final Map.Entry<Integer, Collection<CacheContinuousQueryEntry>>
e : entriesByPart.entrySet()) {
-                        ctx.asyncCallbackPool().execute(new Runnable() {
-                            @Override public void run() {
-                                notifyCallback0(nodeId, ctx, e.getValue());
-                            }
-                        }, e.getKey());
-                    }
-                }
-                else {
-                    ctx.asyncCallbackPool().execute(new Runnable() {
-                        @Override public void run() {
-                            notifyCallback0(nodeId, ctx, entries);
+                    if (entriesByPart != null) {
+                        for (final Map.Entry<Integer, Collection<CacheContinuousQueryEntry>>
e :
+                            entriesByPart.entrySet()) {
+                            ctx.asyncCallbackPool().execute(new Runnable() {
+                                @Override public void run() {
+                                    notifyCallback0(nodeId, ctx, e.getValue());
+                                }
+                            }, e.getKey());
                         }
-                    }, entries.iterator().next().partition());
+
+                        return;
+                    }
                 }
+
+                ctx.asyncCallbackPool().execute(new Runnable() {
+                    @Override public void run() {
+                        notifyCallback0(nodeId, ctx, entries);
+                    }
+                }, partId);
             }
             else
                 notifyCallback0(nodeId, ctx, entries);

http://git-wip-us.apache.org/repos/asf/ignite/blob/97086167/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 46e87af..318f5ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.continuous;
 
 import java.io.Externalizable;
-import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
@@ -80,7 +80,7 @@ public interface GridContinuousHandler extends Externalizable, Cloneable
{
      * @param objs Notification objects.
      * @param ctx Kernal context.
      */
-    public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext
ctx);
+    public void notifyCallback(UUID nodeId, UUID routineId, List<?> objs, GridKernalContext
ctx);
 
     /**
      * Deploys and marshals inner objects (called only if peer deployment is enabled).

http://git-wip-us.apache.org/repos/asf/ignite/blob/97086167/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index d7838f3..277f829 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
@@ -963,7 +964,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             LocalRoutineInfo routine = locInfos.get(routineId);
 
             if (routine != null)
-                routine.hnd.notifyCallback(nodeId, routineId, (Collection<?>)msg.data(),
ctx);
+                routine.hnd.notifyCallback(nodeId, routineId, (List<?>)msg.data(),
ctx);
         }
         finally {
             if (msg.futureId() != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/97086167/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java
index 88e6684..4800b55 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java
@@ -32,8 +32,10 @@ import org.apache.ignite.configuration.IgniteConfiguration;
  * annotated this annotation then they will be executing on a separate thread pool. It allows
  * to use cache API in a callbacks.
  * <p/>
- * Different implementations can use different thread pools. For example continuous query
will use continuous query
- * thread poll which can be configured by {@link IgniteConfiguration#setAsyncCallbackPoolSize(int)}
+ * For executing callbacks using callback thread pool which can be configured by
+ * {@link IgniteConfiguration#setAsyncCallbackPoolSize(int)}
+ *
+ * @see IgniteConfiguration#setAsyncCallbackPoolSize(int)
  */
 @Retention(RetentionPolicy.RUNTIME)
 @Target(ElementType.TYPE)


Mime
View raw message