ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1186 review
Date Fri, 26 Feb 2016 10:49:43 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1186 2537d797f -> e80e906fa


ignite-1186 review


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e80e906f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e80e906f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e80e906f

Branch: refs/heads/ignite-1186
Commit: e80e906fae596fc9827825778e03f456973e8ad1
Parents: 2537d79
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Feb 26 13:49:26 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Feb 26 13:49:26 2016 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryHandler.java | 44 +++++-----
 .../CacheContinuousQueryHandlerV2.java          | 89 ++++++--------------
 .../continuous/CacheContinuousQueryManager.java | 12 +--
 3 files changed, 57 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e80e906f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 62c2aa8..47f5c52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -90,37 +90,37 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     private static final int BACKUP_ACK_THRESHOLD = 100;
 
     /** Cache name. */
-    protected String cacheName;
+    private String cacheName;
 
     /** Topic for ordered messages. */
-    protected Object topic;
+    private Object topic;
 
     /** Local listener. */
     private transient CacheEntryUpdatedListener<K, V> locLsnr;
 
     /** Remote filter. */
-    protected CacheEntryEventSerializableFilter<K, V> rmtFilter;
+    private CacheEntryEventSerializableFilter<K, V> rmtFilter;
 
     /** Deployable object for filter. */
-    protected DeployableObject rmtFilterDep;
+    private DeployableObject rmtFilterDep;
 
     /** Internal flag. */
-    protected boolean internal;
+    private boolean internal;
 
     /** Notify existing flag. */
-    protected boolean notifyExisting;
+    private boolean notifyExisting;
 
     /** Old value required flag. */
-    protected boolean oldValRequired;
+    private boolean oldValRequired;
 
     /** Synchronous flag. */
-    protected boolean sync;
+    private boolean sync;
 
     /** Ignore expired events flag. */
-    protected boolean ignoreExpired;
+    private boolean ignoreExpired;
 
     /** Task name hash code. */
-    protected int taskHash;
+    private int taskHash;
 
     /** Whether to skip primary check for REPLICATED cache. */
     private transient boolean skipPrimaryCheck;
@@ -144,7 +144,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     private transient AcknowledgeBuffer ackBuf;
 
     /** */
-    protected transient int cacheId;
+    private transient int cacheId;
 
     /** */
     private Map<Integer, Long> initUpdCntrs;
@@ -263,8 +263,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (locLsnr != null)
             ctx.resource().injectGeneric(locLsnr);
 
-        if (getRemoteFilter() != null)
-            ctx.resource().injectGeneric(getRemoteFilter());
+        final CacheEntryEventFilter filter = getRemoteFilter();
+
+        if (filter != null)
+            ctx.resource().injectGeneric(filter);
 
         entryBufs = new ConcurrentHashMap<>();
 
@@ -304,8 +306,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                         null,
                         null,
                         null,
-                        getRemoteFilter() instanceof CacheEntryEventSerializableFilter ?
-                            (CacheEntryEventSerializableFilter)getRemoteFilter() : null,
+                        filter instanceof CacheEntryEventSerializableFilter ?
+                            (CacheEntryEventSerializableFilter)filter : null,
                         null,
                         nodeId,
                         taskName()
@@ -334,9 +336,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
                 boolean notify = !evt.entry().isFiltered();
 
-                if (notify && getRemoteFilter() != null) {
+                if (notify && filter != null) {
                     try {
-                        notify = getRemoteFilter().evaluate(evt);
+                        notify = filter.evaluate(evt);
                     }
                     catch (Exception e) {
                         U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter
failed: " + e);
@@ -424,8 +426,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                         null,
                         null,
                         null,
-                        getRemoteFilter() instanceof CacheEntryEventSerializableFilter ?
-                            (CacheEntryEventSerializableFilter)getRemoteFilter() : null,
+                        filter instanceof CacheEntryEventSerializableFilter ?
+                            (CacheEntryEventSerializableFilter)filter : null,
                         null,
                         nodeId,
                         taskName(),
@@ -438,8 +440,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             }
 
             @Override public void onUnregister() {
-                if (getRemoteFilter() instanceof PlatformContinuousQueryFilter)
-                    ((PlatformContinuousQueryFilter)getRemoteFilter()).onQueryUnregister();
+                if (filter instanceof PlatformContinuousQueryFilter)
+                    ((PlatformContinuousQueryFilter)filter).onQueryUnregister();
             }
 
             @Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs)
{

http://git-wip-us.apache.org/repos/asf/ignite/blob/e80e906f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
index 7f00fc1..4573e6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
@@ -29,7 +29,6 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
@@ -77,7 +76,6 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
         String cacheName,
         Object topic,
         CacheEntryUpdatedListener<K, V> locLsnr,
-        CacheEntryEventSerializableFilter<K, V> rmtFilter,
         Factory<? extends CacheEntryEventSerializableFilter<K, V>> rmtFilterFactory,
         boolean internal,
         boolean notifyExisting,
@@ -88,22 +86,36 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
         boolean skipPrimaryCheck,
         boolean locCache,
         boolean keepBinary,
-        boolean ignoreClassNotFound) {
-        super(cacheName, topic, locLsnr, rmtFilter, internal, notifyExisting, oldValRequired,
sync, ignoreExpired,
-            taskHash, skipPrimaryCheck, locCache, keepBinary, ignoreClassNotFound);
-
-        assert rmtFilter != null ^ rmtFilterFactory != null || rmtFilter == null &&
rmtFilterFactory == null :
-            "Remote Filter and Remote Filter Factory both are not null. Should be set only
one.";
+        boolean ignoreClsNotFound) {
+        super(cacheName,
+            topic,
+            locLsnr,
+            null,
+            internal,
+            notifyExisting,
+            oldValRequired,
+            sync,
+            ignoreExpired,
+            taskHash,
+            skipPrimaryCheck,
+            locCache,
+            keepBinary,
+            ignoreClsNotFound);
+
+        assert rmtFilterFactory != null;
 
         this.rmtFilterFactory = rmtFilterFactory;
-
-        if (rmtFilterFactory != null)
-            this.rmtNonSerFilter = rmtFilterFactory.create();
     }
 
     /** {@inheritDoc} */
     @Override protected CacheEntryEventFilter<K, V> getRemoteFilter() {
-        return rmtNonSerFilter != null ? rmtNonSerFilter : rmtFilter;
+        if (rmtNonSerFilter == null) {
+            assert rmtFilterFactory != null;
+
+            rmtNonSerFilter = rmtFilterFactory.create();
+        }
+
+        return rmtNonSerFilter;
     }
 
     /** {@inheritDoc} */
@@ -118,11 +130,8 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
     @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException
{
         super.p2pUnmarshal(nodeId, ctx);
 
-        if (rmtFilterFactoryDep != null) {
+        if (rmtFilterFactoryDep != null)
             rmtFilterFactory = rmtFilterFactoryDep.unmarshal(nodeId, ctx);
-
-            rmtFilter = rmtFilterFactory.create();
-        }
     }
 
     /** {@inheritDoc} */
@@ -137,31 +146,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
 
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeString(out, cacheName);
-        out.writeObject(topic);
-
-        if (rmtFilterFactory == null && rmtFilter != null) {
-            boolean b = rmtFilterDep != null;
-
-            out.writeBoolean(b);
-
-            if (b)
-                out.writeObject(rmtFilterDep);
-            else
-                out.writeObject(rmtFilter);
-        }
-        else {
-            out.writeBoolean(false);
-
-            out.writeObject(null);
-        }
-
-        out.writeBoolean(internal);
-        out.writeBoolean(notifyExisting);
-        out.writeBoolean(oldValRequired);
-        out.writeBoolean(sync);
-        out.writeBoolean(ignoreExpired);
-        out.writeInt(taskHash);
+        super.writeExternal(out);
 
         boolean b = rmtFilterFactoryDep != null;
 
@@ -176,33 +161,13 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
-        cacheName = U.readString(in);
-        topic = in.readObject();
+        super.readExternal(in);
 
         boolean b = in.readBoolean();
 
         if (b)
-            rmtFilterDep = (DeployableObject)in.readObject();
-        else
-            rmtFilter = (CacheEntryEventSerializableFilter<K, V>)in.readObject();
-
-        internal = in.readBoolean();
-        notifyExisting = in.readBoolean();
-        oldValRequired = in.readBoolean();
-        sync = in.readBoolean();
-        ignoreExpired = in.readBoolean();
-        taskHash = in.readInt();
-
-        cacheId = CU.cacheId(cacheName);
-
-        b = in.readBoolean();
-
-        if (b)
             rmtFilterFactoryDep = (DeployableObject)in.readObject();
         else
             rmtFilterFactory = (Factory<CacheEntryEventSerializableFilter<K, V>>)in.readObject();
-
-        if (rmtFilter == null && rmtFilterFactory != null)
-            rmtNonSerFilter = rmtFilterFactory.create();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e80e906f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 4469cf0..fd4d71e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -21,7 +21,6 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.Collection;
@@ -579,16 +578,17 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
 
         boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED
&& cctx.affinityNode();
 
-        boolean v2 = useV2Protocol(cctx.discovery().allNodes());
+        boolean v2 = rmtFilterFactory != null && useV2Protocol(cctx.discovery().allNodes());
 
         GridContinuousHandler hnd;
 
-        if (v2)
+        if (v2) {
+            assert rmtFilter == null : rmtFilter;
+
             hnd = new CacheContinuousQueryHandlerV2(
                 cctx.name(),
                 TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
                 locLsnr,
-                rmtFilter,
                 rmtFilterFactory,
                 internal,
                 notifyExisting,
@@ -600,6 +600,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
                 cctx.isLocal(),
                 keepBinary,
                 ignoreClassNotFound);
+        }
         else {
             CacheEntryEventFilter fltr = null;
 
@@ -711,9 +712,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
      *     otherwise {@code false}.
      */
     private boolean useV2Protocol(Collection<ClusterNode> nodes) {
-        for (ClusterNode node : nodes)
+        for (ClusterNode node : nodes) {
             if (QUERY_MSG_VER_2_SINCE.compareTo(node.version()) > 0)
                 return false;
+        }
 
         return true;
     }


Mime
View raw message