ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [13/13] ignite git commit: IGNITE-2004 WIP
Date Fri, 08 Apr 2016 17:38:24 GMT
IGNITE-2004 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a9c81590
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a9c81590
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a9c81590

Branch: refs/heads/ignite-2004
Commit: a9c81590dcde6be6c7a7f25c0e3d5edef08d0252
Parents: 3fad0ab
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Fri Apr 8 20:37:59 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Fri Apr 8 20:37:59 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryEx.java      |   5 +-
 .../processors/cache/GridCacheMapEntry.java     |  10 +-
 .../cache/GridCacheUpdateAtomicResult.java      |  15 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  29 +--
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  23 +-
 .../distributed/near/GridNearAtomicCache.java   |   1 +
 .../continuous/CacheContinuousQueryClosure.java |  33 ---
 .../continuous/CacheContinuousQueryHandler.java | 218 +++++++------------
 .../CacheContinuousQueryListener.java           |   9 +-
 .../continuous/CacheContinuousQueryManager.java |  29 +--
 10 files changed, 120 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 3a7b5ec..8270c21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -24,6 +24,7 @@ import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.eviction.EvictableEntry;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -454,6 +455,7 @@ public interface GridCacheEntryEx {
      * @param subjId Subject ID initiated this update.
      * @param taskName Task name.
      * @param updateCntr Update counter.
+     * @param fut Dht atomic future.
      * @return Tuple where first value is flag showing whether operation succeeded,
      *      second value is old entry value if return value is requested, third is updated
entry value,
      *      fourth is the version to enqueue for deferred delete the fifth is DR conflict
context
@@ -489,7 +491,8 @@ public interface GridCacheEntryEx {
         @Nullable UUID subjId,
         String taskName,
         @Nullable CacheObject prevVal,
-        @Nullable Long updateCntr
+        @Nullable Long updateCntr,
+        @Nullable IgniteInternalFuture fut
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 061da27..921be85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -35,6 +35,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.eviction.EvictableEntry;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
@@ -47,7 +48,6 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtr
 import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -1246,6 +1246,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
                     true,
                     false,
                     updateCntr0,
+                    null,
                     topVer);
             }
 
@@ -1444,6 +1445,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
                     true,
                     false,
                     updateCntr0,
+                    null,
                     topVer);
             }
 
@@ -1821,6 +1823,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
                     true,
                     false,
                     updateCntr,
+                    null,
                     AffinityTopologyVersion.NONE);
             }
 
@@ -1870,7 +1873,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
         @Nullable UUID subjId,
         String taskName,
         @Nullable CacheObject prevVal,
-        @Nullable Long updateCntr
+        @Nullable Long updateCntr,
+        @Nullable IgniteInternalFuture fut
     ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException
{
         assert cctx.atomic();
 
@@ -1898,8 +1902,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
 
         Long updateCntr0 = null;
 
-        List<CacheContinuousQueryClosure> clsrs = null;
-
         synchronized (this) {
             boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM ||
!F.isEmptyOrNulls(filter);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index a96675b..10d37d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.List;
 import javax.cache.processor.EntryProcessor;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -65,9 +64,6 @@ public class GridCacheUpdateAtomicResult {
     /** Value computed by entry processor. */
     private IgniteBiTuple<Object, Exception> res;
 
-    /** Continuous query closures. */
-    private List<CacheContinuousQueryClosure> cntQryClsrs;
-
     /**
      * Constructor.
      *
@@ -91,8 +87,7 @@ public class GridCacheUpdateAtomicResult {
         @Nullable GridCacheVersion rmvVer,
         @Nullable GridCacheVersionConflictContext<?, ?> conflictRes,
         boolean sndToDht,
-        long updateCntr,
-        List<CacheContinuousQueryClosure> cntQryClsrs
+        long updateCntr
     ) {
         this.success = success;
         this.oldVal = oldVal;
@@ -104,7 +99,6 @@ public class GridCacheUpdateAtomicResult {
         this.conflictRes = conflictRes;
         this.sndToDht = sndToDht;
         this.updateCntr = updateCntr;
-        this.cntQryClsrs = cntQryClsrs;
     }
 
     /**
@@ -178,13 +172,6 @@ public class GridCacheUpdateAtomicResult {
         return sndToDht;
     }
 
-    /**
-     * @return Continuous query closures.
-     */
-    public List<CacheContinuousQueryClosure> continuousQueryClosures() {
-        return cntQryClsrs;
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheUpdateAtomicResult.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4713729..9768243 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -76,7 +76,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -2171,7 +2170,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                     req.subjectId(),
                     taskName,
                     null,
-                    null);
+                    null,
+                    dhtFut);
 
                 if (dhtFut == null && !F.isEmpty(filteredReaders)) {
                     dhtFut = createDhtFuture(ver, req, res, completionCb, true);
@@ -2199,8 +2199,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                                 newConflictVer,
                                 sndPrevVal,
                                 updRes.oldValue(),
-                                updRes.updateCounter(),
-                                updRes.continuousQueryClosures());
+                                updRes.updateCounter());
                         }
 
                         if (!F.isEmpty(filteredReaders))
@@ -2217,10 +2216,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                                 "[entry=" + entry + ", filter=" + Arrays.toString(req.filter())
+ ']');
                     }
                 }
-                else if (lsnrs != null && updRes.continuousQueryClosures() != null)
{
-                    for (CacheContinuousQueryClosure clsr : updRes.continuousQueryClosures())
-                        clsr.onEntryUpdate();
-                }
 
                 if (hasNear) {
                     if (primary && updRes.sendToDht()) {
@@ -2465,7 +2460,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                         req.subjectId(),
                         taskName,
                         null,
-                        null);
+                        null,
+                        dhtFut);
 
                     assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED ||
expiry != null :
                         "success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ",
expiry=" + expiry;
@@ -2508,8 +2504,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                                 null,
                                 sndPrevVal,
                                 updRes.oldValue(),
-                                updRes.updateCounter(),
-                                updRes.continuousQueryClosures());
+                                updRes.updateCounter());
                         }
 
                         if (!F.isEmpty(filteredReaders))
@@ -2520,10 +2515,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                                 updRes.newTtl(),
                                 CU.EXPIRE_TIME_CALCULATE);
                     }
-                    else if (lsnrs != null && updRes.continuousQueryClosures() !=
null) {
-                        for (CacheContinuousQueryClosure clsr : updRes.continuousQueryClosures())
-                            clsr.onEntryUpdate();
-                    }
 
                     if (hasNear) {
                         if (primary) {
@@ -2960,16 +2951,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                             req.subjectId(),
                             taskName,
                             prevVal,
-                            updateIdx);
+                            updateIdx,
+                            null);
 
                         if (updRes.removeVersion() != null)
                             ctx.onDeferredDelete(entry, updRes.removeVersion());
 
-                        if (lsnrs != null && updRes.continuousQueryClosures() !=
null) {
-                            for (CacheContinuousQueryClosure clsr : updRes.continuousQueryClosures())
-                                clsr.onEntryUpdate();
-                        }
-
                         entry.onUnlock();
 
                         break; // While.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 32d10c1..73a2ede 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -38,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryClosure;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -104,9 +103,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     /** Response count. */
     private volatile int resCnt;
 
-    /** Continuous query closures. */
-    private List<CacheContinuousQueryClosure> contQryClsrs;
-
     /**
      * @param cctx Cache context.
      * @param completionCb Callback to invoke when future is completed.
@@ -221,7 +217,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      * @param addPrevVal If {@code true} sends previous value to backups.
      * @param prevVal Previous value.
      * @param updateCntr Partition update counter.
-     * @param clsrs
      */
     public void addWriteEntry(GridDhtCacheEntry entry,
         @Nullable CacheObject val,
@@ -231,7 +226,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         @Nullable GridCacheVersion conflictVer,
         boolean addPrevVal,
         @Nullable CacheObject prevVal,
-        long updateCntr, List<CacheContinuousQueryClosure> clsrs) {
+        long updateCntr) {
         AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
         Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(),
topVer);
@@ -243,13 +238,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
         keys.add(entry.key());
 
-        if (clsrs != null) {
-            if (contQryClsrs == null)
-                contQryClsrs = new ArrayList<>(keys.size());
-
-            contQryClsrs.addAll(clsrs);
-        }
-
         for (ClusterNode node : dhtNodes) {
             UUID nodeId = node.id();
 
@@ -356,15 +344,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             if (err != null) {
                 for (KeyCacheObject key : keys)
                     updateRes.addFailedKey(key, err);
-
-                if (contQryClsrs != null)
-                    for (CacheContinuousQueryClosure clsr : contQryClsrs)
-                        clsr.skipEvent();
-            }
-            else {
-                if (contQryClsrs != null)
-                    for (CacheContinuousQueryClosure clsr : contQryClsrs)
-                        clsr.onEntryUpdate();
             }
 
             if (updateReq.writeSynchronizationMode() == FULL_SYNC)

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 5bb9aaa..e0c7187 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -271,6 +271,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K,
V> {
                         subjId,
                         taskName,
                         null,
+                        null,
                         null);
 
                     if (updRes.removeVersion() != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java
deleted file mode 100644
index 3fd9e57..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryClosure.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-/**
- * Continuous query closure.
- */
-public interface CacheContinuousQueryClosure extends Runnable {
-    /**
-     * Callback for case when future completed successfully.
-     */
-    public void onEntryUpdate();
-
-    /**
-     * Callback for case when future completed with error..
-     */
-    public void skipEvent();
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 56c02d6..3ecac40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -52,6 +52,7 @@ import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
@@ -170,6 +171,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     /** */
     private transient boolean asyncCallback;
 
+    /** */
+    private transient UUID nodeId;
+
+    /** */
+    private transient UUID routineId;
+
+    /** */
+    private transient GridKernalContext ctx;
+
+    /** */
+    private transient IgniteLogger log;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -317,10 +330,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         rcvs = new ConcurrentHashMap<>();
 
+        this.nodeId = nodeId;
+
+        this.routineId = routineId;
+
+        this.ctx = ctx;
+
         final boolean loc = nodeId.equals(ctx.localNodeId());
 
         assert !skipPrimaryCheck || loc;
 
+        log = ctx.log(CacheContinuousQueryHandler.class);
+
         CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K,
V>() {
             @Override public void onExecution() {
                 if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -346,47 +367,32 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 return keepBinary;
             }
 
-            @Override public CacheContinuousQueryClosure onEntryUpdated(CacheContinuousQueryEvent<K,
V> evt,
+            @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt,
                 boolean primary,
                 boolean recordIgniteEvt,
-                boolean fireEvent) {
+                boolean fireEvent,
+                IgniteInternalFuture<?> fut) {
                 if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
-                    return null;
+                    return ;
 
                 final GridCacheContext<K, V> cctx = cacheContext(ctx);
 
                 // Check that cache stopped.
                 if (cctx == null)
-                    return null;
-
-                final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
+                    return;
 
                 // skipPrimaryCheck is set only when listen locally for replicated cache
events.
                 assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
 
-                final ContinuousQueryClosureImpl clsr = new ContinuousQueryClosureImpl(taskName(),
-                    recordIgniteEvt,
-                    routineId,
-                    nodeId,
-                    ctx,
-                    loc,
-                    primary,
-                    cctx,
-                    filter,
-                    evt,
-                    fireEvent,
-                    cache);
-
-                if (!asyncCallback) {
-                    clsr.filter();
-
-                    if (fireEvent)
-                        clsr.onEntryUpdate();
-                }
-                else
-                    ctx.continuousQueryPool().execute(clsr, evt.partitionId());
+                if (asyncCallback) {
+                    ContinuousQueryClosureImpl clsr = new ContinuousQueryClosureImpl(
+                        primary,
+                        evt,
+                        recordIgniteEvt,
+                        fut);
 
-                return clsr;
+                    ctx.continuousQueryPool().execute(clsr, evt.partitionId());
+                }
             }
 
             @Override public void onUnregister() {
@@ -432,15 +438,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
             }
 
-            @Override public CacheContinuousQueryClosure skipUpdateEvent(CacheContinuousQueryEvent<K,
V> evt,
-                AffinityTopologyVersion topVer, boolean primary, boolean fireEvnt) {
+            @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt,
+                AffinityTopologyVersion topVer, boolean primary, boolean fireEvt) {
                 assert evt != null;
 
                 CacheContinuousQueryEntry e = evt.entry();
 
                 e.markFiltered();
 
-                return onEntryUpdated(evt, primary, false, fireEvnt);
+                onEntryUpdated(evt, primary, false, fireEvt, null);
             }
 
             @Override public void onPartitionEvicted(int part) {
@@ -1249,101 +1255,53 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     /**
      *
      */
-    private class ContinuousQueryClosureImpl implements CacheContinuousQueryClosure {
-        /** */
-        private final IgniteCache cache;
-
-        /** */
-        private final IgniteLogger log;
-
-        /** */
-        private final boolean fireEvent;
-
+    private class ContinuousQueryClosureImpl implements Runnable {
         /** */
         private CacheContinuousQueryEvent<K, V> evt;
 
         /** */
-        private CacheEntryEventFilter filter;
-
-        /** */
-        private final GridCacheContext<K, V> cctx;
-
-        /** */
         private boolean primary;
 
         /** */
-        private boolean loc;
-
-        /** */
-        private GridKernalContext ctx;
-
-        /** */
-        private UUID nodeId;
-
-        /** */
-        private UUID routineId;
-
-        /** */
-        private boolean recordIgniteEvt;
-
-        /** */
-        private final String taskName;
-
-        /** */
         private boolean notify;
 
         /** */
-        private boolean backup;
+        private boolean recordIgniteEvt;
 
         /** */
-        private final CountDownLatch latch = new CountDownLatch(1);
+        private IgniteInternalFuture<?> fut;
 
         /**
-         * @param taskName Task name.
-         * @param recordIgniteEvt Fired event.
-         * @param routineId Routine id.
-         * @param nodeId Node id.
-         * @param ctx Kernal context.
-         * @param loc Local.
          * @param primary Primary flag.
-         * @param cctx Cache context.
-         * @param filter Filter.
          * @param evt Event.
-         * @param fireEvent Immediately fire event.
-         * @param cache Cache.
+         * @param recordIgniteEvt Fired event.
          */
-        ContinuousQueryClosureImpl(String taskName,
-            boolean recordIgniteEvt,
-            UUID routineId,
-            UUID nodeId,
-            GridKernalContext ctx,
-            boolean loc,
+        ContinuousQueryClosureImpl(
             boolean primary,
-            GridCacheContext<K, V> cctx,
-            CacheEntryEventFilter filter,
             CacheContinuousQueryEvent<K, V> evt,
-            boolean fireEvent, IgniteCache cache) {
-            this.taskName = taskName;
-            this.recordIgniteEvt = recordIgniteEvt;
-            this.routineId = routineId;
-            this.nodeId = nodeId;
-            this.ctx = ctx;
-            this.loc = loc;
+            boolean recordIgniteEvt,
+            IgniteInternalFuture<?> fut) {
             this.primary = primary;
-            this.cctx = cctx;
-            this.filter = filter;
             this.evt = evt;
-            this.cache = cache;
-            this.fireEvent = fireEvent;
-
-            log = ctx.log(CacheContinuousQueryHandler.class);
+            this.recordIgniteEvt = recordIgniteEvt;
+            this.fut = fut;
         }
 
         /** {@inheritDoc} */
         @Override public void run() {
-            filter();
+            if (!filter())
+                return;
 
-            if (fireEvent || waitIfAsync())
+            if (fut != null) {
+                if (waitIfAsync())
+                    onEntryUpdate0();
+                else {
+                    evt.entry().markFiltered();
+
+                    onEntryUpdate0();
+                }
+            }
+            else
                 onEntryUpdate0();
         }
 
@@ -1358,49 +1316,29 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
          * @return {@code False} if filter sync.
          */
         private boolean waitIfAsync() {
-            if (backup)
-                return false;
-
             try {
-                U.await(latch);
+                fut.get();
             }
-            catch (IgniteInterruptedCheckedException e) {
-                log.error("Failed to wait latch.");
+            catch (IgniteCheckedException e) {
+                return false;
             }
 
             return true;
         }
 
-        /** {@inheritDoc} */
-        @Override public void skipEvent() {
-            if (evt != null && evt.entry() != null)
-                evt.entry().markFiltered();
-
-            onEntryUpdate();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onEntryUpdate() {
-            if (backup)
-                return;
-
-            if (!fireEvent && asyncCallback) {
-                latch.countDown();
-
-                return;
-            }
-
-            onEntryUpdate0();
-        }
-
         /**
          *
          */
         private void onEntryUpdate0() {
             try {
+                GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                if (cctx == null)
+                    return;
+
                 final CacheContinuousQueryEntry entry = evt.entry();
 
-                if (loc) {
+                if (routineId.equals(nodeId)) {
                     if (!locCache) {
                         T2<Collection<CacheEntryEvent<? extends K, ? extends V>>,
PartitionRecovery> events =
                             handleEvent(ctx, entry, asyncCallback);
@@ -1454,11 +1392,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                     null,
                     null,
                     null,
-                    filter instanceof CacheEntryEventSerializableFilter ?
-                        (CacheEntryEventSerializableFilter)filter : null,
+                    getEventFilter() instanceof CacheEntryEventSerializableFilter ?
+                        (CacheEntryEventSerializableFilter)getEventFilter() : null,
                     null,
                     nodeId,
-                    taskName,
+                    taskName(),
                     evt.getKey(),
                     evt.getValue(),
                     evt.getOldValue(),
@@ -1468,16 +1406,16 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         }
 
         /**
-         *
+         * @return {@code True} if event happen on primary node otherwise {@code false}.
          */
-        public void filter() {
+        public boolean filter() {
             CacheContinuousQueryEntry entry = evt.entry();
 
             notify = !entry.isFiltered();
 
             try {
-                if (notify && filter != null)
-                    notify = filter.evaluate(evt);
+                if (notify && getEventFilter() != null)
+                    notify = getEventFilter().evaluate(evt);
             }
             catch (Exception e) {
                 U.error(log, "CacheEntryEventFilter failed: " + e);
@@ -1496,8 +1434,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                     }
                 }
 
-                backup = true;
+                return false;
             }
+
+            return true;
+        }
+
+        private String taskName() {
+            return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 3aefafe..e86ec47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -19,7 +19,9 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import java.util.Map;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Continuous query listener.
@@ -37,9 +39,10 @@ public interface CacheContinuousQueryListener<K, V> {
      * @param primary Primary flag.
      * @param recordIgniteEvt Whether to record event.
      * @param fireEvent Immediately fired events.
+     * @param fut Dht atomic future.
      */
-    public CacheContinuousQueryClosure onEntryUpdated(CacheContinuousQueryEvent<K, V>
evt, boolean primary,
-        boolean recordIgniteEvt, boolean fireEvent);
+    public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
+        boolean recordIgniteEvt, boolean fireEvent, @Nullable IgniteInternalFuture<?>
fut);
 
     /**
      * Listener unregistered callback.
@@ -71,7 +74,7 @@ public interface CacheContinuousQueryListener<K, V> {
      * @param topVer Topology version.
      * @param primary Primary
      */
-    public CacheContinuousQueryClosure skipUpdateEvent(CacheContinuousQueryEvent<K, V>
evt,
+    public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt,
         AffinityTopologyVersion topVer, boolean primary, boolean fireEvnt);
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9c81590/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 12819c9..f6ab8b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -47,6 +47,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cluster.ClusterNode;
@@ -172,7 +173,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
      * @param fireEvnt
      * @param topVer Topology version.
      */
-    public List<CacheContinuousQueryClosure> skipUpdateEvent(Map<UUID, CacheContinuousQueryListener>
lsnrs,
+    public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
         KeyCacheObject key,
         int partId,
         long updCntr,
@@ -181,8 +182,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
         AffinityTopologyVersion topVer) {
         assert lsnrs != null;
 
-        List<CacheContinuousQueryClosure> clsrs = new ArrayList<>(lsnrs.size());
-
         for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
             CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
                 cctx.cacheId(),
@@ -198,10 +197,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                 cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
-            clsrs.add(lsnr.skipUpdateEvent(evt, topVer, primary, fireEvnt));
+            lsnr.skipUpdateEvent(evt, topVer, primary, fireEvnt);
         }
-
-        return clsrs;
     }
 
     /**
@@ -234,6 +231,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
      * @param primary {@code True} if called on primary node.
      * @param preload Whether update happened during preloading.
      * @param updateCntr Update counter.
+     * @param fut Dht atomic future.
      * @param topVer Topology version.
      * @throws IgniteCheckedException In case of error.
      */
@@ -247,6 +245,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
         boolean fireEvnt,
         boolean preload,
         long updateCntr,
+        @Nullable IgniteInternalFuture<?> fut,
         AffinityTopologyVersion topVer) throws IgniteCheckedException {
         Map<UUID, CacheContinuousQueryListener> lsnrCol = updateListeners(internal,
preload);
 
@@ -262,6 +261,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
                 fireEvnt,
                 preload,
                 updateCntr,
+                fut,
                 topVer);
         }
     }
@@ -278,9 +278,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
      * @param preload Whether update happened during preloading.
      * @param updateCntr Update counter.
      * @param topVer Topology version.
+     * @param fut Dht atomic future.
      * @throws IgniteCheckedException In case of error.
      */
-    public List<CacheContinuousQueryClosure> onEntryUpdated(
+    public void onEntryUpdated(
         Map<UUID, CacheContinuousQueryListener> lsnrCol,
         KeyCacheObject key,
         CacheObject newVal,
@@ -291,6 +292,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
         boolean fireEvnt,
         boolean preload,
         long updateCntr,
+        @Nullable IgniteInternalFuture<?> fut,
         AffinityTopologyVersion topVer)
         throws IgniteCheckedException
     {
@@ -301,7 +303,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
         boolean hasOldVal = oldVal != null;
 
         if (!hasNewVal && !hasOldVal)
-            return skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, fireEvnt, topVer);
+            skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, fireEvnt, topVer);
 
         EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED;
 
@@ -309,8 +311,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
 
         boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
-        List<CacheContinuousQueryClosure> clsrs = new ArrayList<>(lsnrCol.size());
-
         for (CacheContinuousQueryListener lsnr : lsnrCol.values()) {
             if (preload && !lsnr.notifyExisting())
                 continue;
@@ -343,13 +343,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                 cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
-            CacheContinuousQueryClosure clsr = lsnr.onEntryUpdated(evt, primary, recordIgniteEvt,
fireEvnt);
-
-            if (clsr != null)
-                clsrs.add(clsr);
+            lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fireEvnt, fut);
         }
-
-        return clsrs;
     }
 
     /**
@@ -402,7 +397,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
                 CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent(
                     cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
-                lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, true);
+                lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, true, null);
             }
         }
     }


Mime
View raw message