ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: cc
Date Fri, 26 May 2017 06:31:23 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5075-cc1 [created] 13d378534


cc


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/13d37853
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/13d37853
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/13d37853

Branch: refs/heads/ignite-5075-cc1
Commit: 13d378534013b84a55656b8873bd7092544e302c
Parents: 01f45c1
Author: sboikov <sboikov@gridgain.com>
Authored: Fri May 26 09:31:12 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri May 26 09:31:12 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 12 +++-
 .../dht/GridClientPartitionTopology.java        |  5 ++
 .../distributed/dht/GridDhtLocalPartition.java  |  8 ++-
 .../dht/GridDhtPartitionTopology.java           |  2 +
 .../dht/GridDhtPartitionTopologyImpl.java       | 62 +++++++++++------
 .../GridNearAtomicSingleUpdateFuture.java       |  2 +
 .../GridDhtPartitionsExchangeFuture.java        | 27 ++++++--
 .../CacheContinuousQueryEventBuffer.java        | 70 ++++++++++++--------
 .../CacheContinuousQueryPartitionRecovery.java  |  5 +-
 9 files changed, 134 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 80f872c..ba21964 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -28,7 +28,6 @@ import javax.cache.Cache;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorResult;
-
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -61,8 +60,8 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConfl
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
 import org.apache.ignite.internal.processors.dr.GridDrType;
-import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple;
@@ -75,7 +74,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED;
@@ -1726,6 +1724,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
 
                         long updateCntr0 = nextPartCounter();
 
+                        //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(partition(),
updateCntr0, "set new counter conflict on " + (primary ? "primary" : "backup"));
+
                         if (updateCntr != null)
                             updateCntr0 = updateCntr;
 
@@ -2615,6 +2615,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
 
                 long updateCntr = 0;
 
+                assert preload;
+
                 if (!preload)
                     updateCntr = nextPartCounter(topVer);
 
@@ -4410,6 +4412,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
 
             long updateCntr0 = entry.nextPartCounter();
 
+            //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(entry.partition(),
updateCntr0, "new counter for update on " + (primary ? "primary" : "backup"));
+
             if (updateCntr != null)
                 updateCntr0 = updateCntr;
 
@@ -4490,6 +4494,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
                 // Must persist inside synchronization in non-tx mode.
                 cctx.store().remove(null, entry.key);
 
+            System.exit(11);
+
             long updateCntr0 = entry.nextPartCounter();
 
             if (updateCntr != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 1de64c5..2eebda3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -649,6 +649,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
         }
     }
 
+    @Override
+    public void updateCounters(Map<Integer, T2<Long, Long>> cntrMap) {
+        throw new UnsupportedOperationException();
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Nullable @Override public GridDhtPartitionMap update(

http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 6fb557a..fd76d3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -799,7 +799,11 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl
implements
      * @return Current update index.
      */
     public long updateCounter() {
-        return store.updateCounter();
+        long cntr0 = store.updateCounter();
+
+        //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(id, cntr0, "provide
counter " + cctx.shared().exchange().lastTopologyFuture().topologyVersion());
+
+        return cntr0;
     }
 
     /**
@@ -813,6 +817,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl
implements
      * @param val Update index value.
      */
     public void updateCounter(long val) {
+        //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(id, val, "set new
counter");
+
         store.updateCounter(val);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index f9fd852..91289a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -241,6 +241,8 @@ public interface GridDhtPartitionTopology {
         GridDhtPartitionMap parts,
         @Nullable Map<Integer, T2<Long, Long>> cntrMap);
 
+    public void updateCounters(Map<Integer, T2<Long, Long>> cntrMap);
+
     /**
      * Checks if there is at least one owner for each partition in the cache topology.
      * If not, marks such a partition as LOST.

http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 8e79eda..d93fedf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1122,8 +1122,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
                     T2<Long, Long> cntr = cntrMap.get(part.id());
 
-                    if (cntr != null)
+                    if (cntr != null) {
+                        //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(part.id(),
cntr.get2(), "set new counter from full " + exchId.topologyVersion());
+
                         part.updateCounter(cntr.get2());
+                    }
                 }
             }
 
@@ -1255,29 +1258,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
         }
     }
 
-    /** {@inheritDoc} */
-    @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap update(
-        @Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts,
-        @Nullable Map<Integer, T2<Long, Long>> cntrMap
-    ) {
-        if (log.isDebugEnabled())
-            log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts)
+ ']');
-
-        if (!cctx.discovery().alive(parts.nodeId())) {
-            if (log.isDebugEnabled())
-                log.debug("Received partition update for non-existing node (will ignore)
[exchId=" + exchId +
-                    ", parts=" + parts + ']');
-
-            return null;
-        }
-
+    @Override
+    public void updateCounters(Map<Integer, T2<Long, Long>> cntrMap) {
         lock.writeLock().lock();
 
         try {
             if (stopping)
-                return null;
+                return;
 
             if (cntrMap != null) {
                 for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet())
{
@@ -1295,10 +1282,43 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
                     T2<Long, Long> cntr = cntrMap.get(part.id());
 
-                    if (cntr != null && cntr.get2() > part.updateCounter())
+                    if (cntr != null && cntr.get2() > part.updateCounter()) {
+                        //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(part.id(),
cntr.get2(), "set new counter from single");
+
                         part.updateCounter(cntr.get2());
+                    }
                 }
             }
+        }
+        finally {
+            lock.writeLock().unlock();
+
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
+    @Nullable @Override public GridDhtPartitionMap update(
+        @Nullable GridDhtPartitionExchangeId exchId,
+        GridDhtPartitionMap parts,
+        @Nullable Map<Integer, T2<Long, Long>> cntrMap
+    ) {
+        if (log.isDebugEnabled())
+            log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts)
+ ']');
+
+        if (!cctx.discovery().alive(parts.nodeId())) {
+            if (log.isDebugEnabled())
+                log.debug("Received partition update for non-existing node (will ignore)
[exchId=" + exchId +
+                    ", parts=" + parts + ']');
+
+            return null;
+        }
+
+        lock.writeLock().lock();
+
+        try {
+            if (stopping)
+                return null;
 
             if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId)
> 0) {
                 if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 6ffa373..16301e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -558,6 +558,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
         ClusterNode primary = nodes.get(0);
 
+        //TestDebugLog.addEntryMessage(cacheKey.partition(), primary.id(), "mapped primary");
+
         boolean needPrimaryRes = !mappingKnown || primary.isLocal() || nodes.size() == 1;
 
         GridNearAtomicAbstractUpdateRequest req;

http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 8b8c87c..4b25ac4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteNeedReconnectException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.PartitionLossPolicy;
@@ -47,18 +46,19 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
 import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.ClusterState;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
-import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -494,6 +494,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             crd = srvNodes.isEmpty() ? null : srvNodes.get(0);
 
+            //org.apache.ignite.spi.collision.TestDebugLog.addMessage("Start exchange topVer="
+ topologyVersion() + ", crd=" + crd.id());
+
             boolean crdNode = crd != null && crd.isLocal();
 
             skipPreload = cctx.kernalContext().clientNode();
@@ -1106,6 +1108,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     @Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable
err) {
         boolean realExchange = !dummy && !forcePreload;
 
+//        if (realExchange)
+//            org.apache.ignite.spi.collision.TestDebugLog.addMessage("End exchange topVer="
+ topologyVersion() + ", crd=" + (crd != null ? crd.id() : null));
+
         if (realExchange && !cctx.kernalContext().clientNode() && (serverNotDiscoveryEvent()
|| affChangeMsg != null)) {
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                 if (cacheCtx.isLocal())
@@ -1564,6 +1569,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 }
             }
 
+            for (GridDhtPartitionsAbstractMessage msg0 : msgs.values()) {
+                if (msg0 instanceof GridDhtPartitionsSingleMessage) {
+                    for (Map.Entry<Integer, GridDhtPartitionMap> entry : ((GridDhtPartitionsSingleMessage)msg0).partitions().entrySet())
{
+                        Integer cacheId = entry.getKey();
+                        GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+
+                        GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology()
:
+                                cctx.exchange().clientTopology(cacheId, this);
+
+                        top.updateCounters(msg0.partitionUpdateCounters(cacheId));
+                    }
+                }
+            }
+
             if (discoEvt.type() == EVT_NODE_JOINED) {
                 if (cctx.kernalContext().state().active())
                     assignPartitionsStates();
@@ -1795,7 +1814,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
                 cctx.exchange().clientTopology(cacheId, this);
 
-            top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId));
+            top.update(exchId, entry.getValue(), null);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index c59b851..65b7d2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -143,6 +143,8 @@ public class CacheContinuousQueryEventBuffer {
      * @return Collected entries to pass to listener (single entry or entries list).
      */
     @Nullable Object processEntry(CacheContinuousQueryEntry e, boolean backup) {
+        //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(e.partition(), e.updateCounter(),
"processEntry on " + (backup ? "backup" : "primary"));
+
         return process0(e.updateCounter(), e, backup);
     }
 
@@ -235,11 +237,46 @@ public class CacheContinuousQueryEventBuffer {
             for (Map.Entry<Long, CacheContinuousQueryEntry> p : pending.headMap(batch.endCntr,
true).entrySet()) {
                 long cntr = p.getKey();
 
-                assert cntr >= batch.startCntr && cntr <= batch.endCntr : cntr;
+                assert cntr <= batch.endCntr;
+
+                if (pending.remove(p.getKey()) != null) {
+                    if (cntr < batch.startCntr)
+                        res = addResult(res, p.getValue(), backup);
+                    else
+                        res = batch.processEntry0(res, p.getKey(), p.getValue(), backup);
+                }
+            }
+        }
+
+        return res;
+    }
+
+    private Object addResult(Object res, CacheContinuousQueryEntry entry, boolean backup)
{
+        if (res == null) {
+            if (backup)
+                backupQ.add(entry);
+            else
+                res = entry;
+        }
+        else {
+            assert !backup;
+
+            List<CacheContinuousQueryEntry> resList;
+
+            if (res instanceof CacheContinuousQueryEntry) {
+                resList = new ArrayList<>();
+
+                resList.add((CacheContinuousQueryEntry)res);
+            }
+            else {
+                assert res instanceof List : res;
 
-                if (pending.remove(p.getKey()) != null)
-                    res = batch.processEntry0(res, p.getKey(), p.getValue(), backup);
+                resList = (List<CacheContinuousQueryEntry>)res;
             }
+
+            resList.add(entry);
+
+            res = resList;
         }
 
         return res;
@@ -399,32 +436,7 @@ public class CacheContinuousQueryEventBuffer {
 
                                 filtered = 0;
 
-                                if (res == null) {
-                                    if (backup)
-                                        backupQ.add(entry0);
-                                    else
-                                        res = entry0;
-                                }
-                                else {
-                                    assert !backup;
-
-                                    List<CacheContinuousQueryEntry> resList;
-
-                                    if (res instanceof CacheContinuousQueryEntry) {
-                                        resList = new ArrayList<>();
-
-                                        resList.add((CacheContinuousQueryEntry)res);
-                                    }
-                                    else {
-                                        assert res instanceof List : res;
-
-                                        resList = (List<CacheContinuousQueryEntry>)res;
-                                    }
-
-                                    resList.add(entry0);
-
-                                    res = resList;
-                                }
+                                res = addResult(res, entry0, backup);
                             }
                             else
                                 filtered++;

http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
index 59252d2..e1bc49a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
@@ -243,8 +243,11 @@ class CacheContinuousQueryPartitionRecovery {
                     else {
                         if (pending.isFiltered())
                             skippedFiltered = true;
-                        else
+                        else {
+                            //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(pending.partition(),
pending.updateCounter(), " stop process last=" + lastFiredEvt);
+
                             break;
+                        }
                     }
                 }
 


Mime
View raw message