ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject incubator-ignite git commit: # ignite-426 backup queue flush, test
Date Tue, 11 Aug 2015 15:03:40 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-426 fa494ff1c -> 31179c17c


# ignite-426 backup queue flush, test


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/31179c17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/31179c17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/31179c17

Branch: refs/heads/ignite-426
Commit: 31179c17c01dc80e7f6d2584c84690587d86a61c
Parents: fa494ff
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Aug 11 18:03:28 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Aug 11 18:03:28 2015 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtLocalPartition.java  |   4 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   1 -
 .../GridDhtPartitionsExchangeFuture.java        |   5 +
 .../continuous/CacheContinuousQueryEvent.java   |   6 +-
 .../continuous/CacheContinuousQueryHandler.java | 100 ++++++-
 .../CacheContinuousQueryListener.java           |   9 +
 .../continuous/CacheContinuousQueryManager.java |  34 ++-
 .../continuous/GridContinuousMessage.java       |   1 +
 .../continuous/GridContinuousProcessor.java     |  65 ++++-
 ...acheContinuousQueryFailoverAbstractTest.java | 272 +++++++++++++++++++
 .../CacheContinuousQueryFailoverAtomicTest.java |  38 +++
 11 files changed, 506 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 2864fa4..9e7d930 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -96,7 +96,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations =
new CopyOnWriteArrayList<>();
 
     /** Continuous query update index. */
-    private final AtomicLong contQueryUpdIdx = new AtomicLong();
+    private final AtomicLong contQryUpdIdx = new AtomicLong();
 
     /**
      * @param cctx Context.
@@ -590,7 +590,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      * @return Next update index.
      */
     public long nextContinuousQueryUpdateIndex() {
-        return contQueryUpdIdx.incrementAndGet();
+        return contQryUpdIdx.incrementAndGet();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 63edcaa..601f1d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -450,5 +450,4 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     @Override public String toString() {
         return S.toString(GridDhtAtomicUpdateFuture.class, this);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index cbf6b40..99c7fc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -757,6 +757,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     }
                 }
 
+                boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT;
+
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                     if (cacheCtx.isLocal())
                         continue;
@@ -767,6 +769,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     if (drCacheCtx.isDrEnabled())
                         drCacheCtx.dr().beforeExchange(topVer, exchId.isLeft());
 
+                    if (topChanged)
+                        cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion());
+
                     // Partition release future is done so we can flush the write-behind
store.
                     cacheCtx.store().forceFlush();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index 4a0d6f7..96fd4ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -58,8 +58,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K,
V> {
     }
 
     /** {@inheritDoc} */
-    @Override
-    public K getKey() {
+    @Override public K getKey() {
         return e.key().value(cctx.cacheObjectContext(), false);
     }
 
@@ -69,8 +68,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K,
V> {
     }
 
     /** {@inheritDoc} */
-    @Override
-    public V getOldValue() {
+    @Override public V getOldValue() {
         return CU.value(e.oldValue(), cctx, false);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 88ae39b..8e308a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
@@ -85,7 +86,13 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
     private transient boolean skipPrimaryCheck;
 
     /** Backup queue. */
-    private transient Queue<CacheContinuousQueryEntry> backupQueue;
+    private transient Collection<CacheContinuousQueryEntry> backupQueue;
+
+    /** */
+    private transient Map<Integer, Long> rcvCntrs;
+
+    /** */
+    private transient DuplicateEventFilter dupEvtFilter = new DuplicateEventFilter();
 
     /**
      * Required by {@link Externalizable}.
@@ -135,6 +142,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
         this.ignoreExpired = ignoreExpired;
         this.taskHash = taskHash;
         this.skipPrimaryCheck = skipPrimaryCheck;
+
+        this.rcvCntrs = new HashMap<>();
     }
 
     /** {@inheritDoc} */
@@ -216,23 +225,14 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
                 }
 
                 if (notify) {
-                    if (loc)
+                    if (loc && dupEvtFilter.apply(evt.entry()))
                         locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends
V>>asList(evt));
                     else {
                         try {
                             final CacheContinuousQueryEntry entry = evt.entry();
 
                             if (primary) {
-                                if (ctx.config().isPeerClassLoadingEnabled() && ctx.discovery().node(nodeId)
!= null) {
-                                    entry.prepareMarshal(cctx);
-
-                                    GridCacheDeploymentManager depMgr =
-                                        ctx.cache().internalCache(cacheName).context().deploy();
-
-                                    depMgr.prepare(entry);
-                                }
-                                else
-                                    entry.prepareMarshal(cctx);
+                                prepareEntry(cctx, nodeId, entry);
 
                                 ctx.continuous().addNotification(nodeId, routineId, entry,
topic, sync, true);
                             }
@@ -304,6 +304,25 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
                 }
             }
 
+            @Override public void flushBackupQueue(GridKernalContext ctx) {
+                if (backupQueue.isEmpty())
+                    return;
+
+                try {
+                    GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                    for (CacheContinuousQueryEntry e : backupQueue)
+                        prepareEntry(cctx, nodeId, e);
+
+                    ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue,
topic);
+
+                    backupQueue.clear();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(ctx.log(getClass()), "Failed to send backup event notification
to node: " + nodeId, e);
+                }
+            }
+
             @Override public boolean oldValueRequired() {
                 return oldValRequired;
             }
@@ -325,6 +344,23 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
         return mgr.registerListener(routineId, lsnr, internal);
     }
 
+    /**
+     * @param cctx Context.
+     * @param nodeId ID of the node that started routine.
+     * @param entry Entry.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void prepareEntry(GridCacheContext cctx, UUID nodeId, CacheContinuousQueryEntry
entry)
+        throws IgniteCheckedException {
+        if (cctx.kernalContext().config().isPeerClassLoadingEnabled() && cctx.discovery().node(nodeId)
!= null) {
+            entry.prepareMarshal(cctx);
+
+            cctx.deploy().prepare(entry);
+        }
+        else
+            entry.prepareMarshal(cctx);
+    }
+
     /** {@inheritDoc} */
     @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
         // No-op.
@@ -392,12 +428,40 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
                 @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry
e) {
                     return new CacheContinuousQueryEvent<>(cache, cctx, e);
                 }
-            }
+            },
+            dupEvtFilter
         );
 
         locLsnr.onUpdated(evts);
     }
 
+    /**
+     * @param e Entry.
+     * @return {@code True} if listener should be notified.
+     */
+    private boolean notifyListener(CacheContinuousQueryEntry e) {
+        Integer part = e.partition();
+
+        Long cntr = rcvCntrs.get(part);
+
+        if (cntr != null) {
+            long cntr0 = cntr;
+
+            if (e.updateIndex() > cntr0) {
+                // TODO IGNITE-426: remove assert.
+                assert e.updateIndex() == cntr0 + 1 : "Invalid entry [e=" + e + ", cntr="
+ cntr + ']';
+
+                rcvCntrs.put(part, cntr0);
+            }
+            else
+                return false;
+        }
+        else
+            rcvCntrs.put(part, e.updateIndex());
+
+        return true;
+    }
+
     /** {@inheritDoc} */
     @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException
{
         assert ctx != null;
@@ -530,6 +594,16 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
     }
 
     /**
+     *
+     */
+    private class DuplicateEventFilter implements IgnitePredicate<CacheContinuousQueryEntry>
{
+        /** {@inheritDoc} */
+        @Override public boolean apply(CacheContinuousQueryEntry e) {
+            return notifyListener(e);
+        }
+    }
+
+    /**
      * Deployable object.
      */
     private static class DeployableObject implements Externalizable {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index d5d5ff8..d955aa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
+import org.apache.ignite.internal.*;
+
 import java.util.*;
 
 /**
@@ -50,6 +52,13 @@ interface CacheContinuousQueryListener<K, V> {
     public void cleanupBackupQueue(Map<Integer, Long> updateIdxs);
 
     /**
+     * Flushes backup queue.
+     *
+     * @param ctx Context.
+     */
+    public void flushBackupQueue(GridKernalContext ctx);
+
+    /**
      * @return Whether old value is required.
      */
     public boolean oldValueRequired();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index c6a16c9..ce2b111 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -127,7 +127,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
      * @param key Key.
      * @param newVal New value.
      * @param oldVal Old value.
+     * @param primary {@code True} if called on primary node.
      * @param preload Whether update happened during preloading.
+     * @param topVer Topology version.
      * @throws IgniteCheckedException In case of error.
      */
     public void onEntryUpdated(GridCacheEntryEx e,
@@ -141,8 +143,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
     {
         assert e != null;
         assert key != null;
-
-        assert Thread.holdsLock(e);
+        assert Thread.holdsLock(e) : e;
 
         boolean internal = e.isInternal() || !e.context().userCache();
 
@@ -175,7 +176,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
 
         boolean initialized = false;
 
-        boolean recordIgniteEvt = !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+        boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
         for (CacheContinuousQueryListener lsnr : lsnrCol.values()) {
             if (preload && !lsnr.notifyExisting())
@@ -204,6 +205,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
                 e.partition(),
                 updateIdx);
 
+            log.info("Created entry [node=" + cctx.gridName() +
+                ", primary=" + primary +
+                ", preload=" + preload +
+                ", part=" + e.partition() +
+                ", idx=" + updateIdx + ']');
+
             CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                 cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
@@ -221,8 +228,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
         throws IgniteCheckedException {
         assert e != null;
         assert key != null;
-
-        assert Thread.holdsLock(e);
+        assert Thread.holdsLock(e) : e;
 
         if (e.isInternal())
             return;
@@ -374,6 +380,17 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
     }
 
     /**
+     * @param topVer Topology version.
+     */
+    public void beforeExchange(AffinityTopologyVersion topVer) {
+        for (CacheContinuousQueryListener lsnr : lsnrs.values())
+            lsnr.flushBackupQueue(cctx.kernalContext());
+
+        for (CacheContinuousQueryListener lsnr : intLsnrs.values())
+            lsnr.flushBackupQueue(cctx.kernalContext());
+    }
+
+    /**
      * @param locLsnr Local listener.
      * @param rmtFilter Remote filter.
      * @param bufSize Buffer size.
@@ -493,7 +510,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
                                 GridCacheEntryEx e = it.next();
 
                                 CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry(
-                                    cctx.cacheId(), CREATED, e.key(), e.rawGet(), null, 0,
0);
+                                    cctx.cacheId(),
+                                    CREATED,
+                                    e.key(),
+                                    e.rawGet(),
+                                    null,
+                                    0, 0);
 
                                 next = new CacheContinuousQueryEvent<>(
                                     cctx.kernalContext().cache().jcache(cctx.name()),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
index fe50fd8..4c7f8e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
@@ -49,6 +49,7 @@ public class GridContinuousMessage implements Message {
     private Object data;
 
     /** */
+    @GridToStringInclude
     @GridDirectCollection(Message.class)
     private Collection<Message> msgs;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 457f150..7e71b3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -593,6 +593,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     /**
      * @param nodeId ID of the node that started routine.
      * @param routineId Routine ID.
+     * @param objs Notification objects.
+     * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered
message will be sent.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void addBackupNotification(UUID nodeId,
+        final UUID routineId,
+        Collection<?> objs,
+        @Nullable Object orderedTopic)
+        throws IgniteCheckedException {
+        if (processorStopped)
+            return;
+
+        final RemoteRoutineInfo info = rmtInfos.get(routineId);
+
+        if (info != null) {
+            final GridContinuousBatch batch = info.addAll(objs);
+
+            sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, true,
null);
+        }
+    }
+
+    /**
+     * @param nodeId ID of the node that started routine.
+     * @param routineId Routine ID.
      * @param obj Notification object.
      * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered
message will be sent.
      * @param sync If {@code true} then waits for event acknowledgment.
@@ -642,7 +666,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 final GridContinuousBatch batch = info.add(obj);
 
                 if (batch != null) {
-                    CI1<IgniteException> ackClosure = new CI1<IgniteException>()
{
+                    CI1<IgniteException> ackC = new CI1<IgniteException>() {
                         @Override public void apply(IgniteException e) {
                             if (e == null) {
                                 try {
@@ -655,7 +679,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                         }
                     };
 
-                    sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic,
msg, ackClosure);
+                    sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic,
msg, ackC);
                 }
             }
         }
@@ -904,7 +928,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                                         }
                                     };
 
-                                    sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic(),
msg,
+                                    sendNotification(nodeId,
+                                        routineId,
+                                        null,
+                                        toSnd,
+                                        hnd.orderedTopic(),
+                                        msg,
                                         ackClosure);
                                 }
                                 catch (ClusterTopologyCheckedException ignored) {
@@ -1212,6 +1241,36 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         /**
+         * @param objs Objects to add.
+         * @return Batch to send.
+         */
+        GridContinuousBatch addAll(Collection<?> objs) {
+            assert objs != null;
+            assert objs.size() > 0;
+
+            GridContinuousBatch toSnd = null;
+
+            lock.writeLock().lock();
+
+            try {
+                for (Object obj : objs)
+                    batch.add(obj);
+
+                toSnd = batch;
+
+                batch = hnd.createBatch();
+
+                if (interval > 0)
+                    lastSndTime = U.currentTimeMillis();
+            }
+            finally {
+                lock.writeLock().unlock();
+            }
+
+            return toSnd;
+        }
+
+        /**
          * @param obj Object to add.
          * @return Batch to send or {@code null} if there is nothing to send for now.
          */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
new file mode 100644
index 0000000..fe3c817
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.continuous.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.event.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ *
+ */
+public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommonAbstractTest
{
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder).setForceServerMode(true);
+
+        cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(cacheMode());
+        ccfg.setAtomicityMode(atomicityMode());
+        ccfg.setBackups(1);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @return Cache mode.
+     */
+    protected abstract CacheMode cacheMode();
+
+    /**
+     * @return Atomicity mode.
+     */
+    protected abstract CacheAtomicityMode atomicityMode();
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBackupQueue() throws Exception {
+        final int SRV_NODES = 4;
+
+        startGridsMultiThreaded(SRV_NODES);
+
+        client = true;
+
+        Ignite qryClient = startGrid(SRV_NODES);
+
+        client = false;
+
+        IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
+
+        CacheEventListener lsnr = new CacheEventListener();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setAutoUnsubscribe(true);
+
+        qry.setLocalListener(lsnr);
+
+        QueryCursor<?> cur = qryClientCache.query(qry);
+
+        int PARTS = 1;
+
+        for (int i = 0; i < SRV_NODES - 1; i++) {
+            log.info("Stop iteration: " + i);
+
+            TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
+
+            Ignite ignite = ignite(i);
+
+            IgniteCache<Object, Object> cache = ignite.cache(null);
+
+            List<Integer> keys = testKeys(cache, PARTS);
+
+            lsnr.latch = new CountDownLatch(keys.size());
+
+            boolean first = true;
+
+            for (Integer key : keys) {
+                log.info("Put [node=" + ignite.name() + ", key=" + key + ']');
+
+                cache.put(key, key);
+
+                if (first) {
+                    spi.skipMsg = true;
+
+                    first = false;
+                }
+            }
+
+            stopGrid(i);
+
+            assertTrue("Failed to wait for notifications", lsnr.latch.await(5, SECONDS));
+
+            lsnr.latch = null;
+
+            awaitPartitionMapExchange();
+        }
+
+        for (int i = 0; i < SRV_NODES - 1; i++) {
+            log.info("Start iteration: " + i);
+
+            Ignite ignite = startGrid(i);
+
+            awaitPartitionMapExchange();
+
+            IgniteCache<Object, Object> cache = ignite.cache(null);
+
+            List<Integer> keys = testKeys(cache, PARTS);
+
+            lsnr.latch = new CountDownLatch(keys.size());
+
+            for (Integer key : keys) {
+                log.info("Put [node=" + ignite.name() + ", key=" + key + ']');
+
+                cache.put(key, key);
+            }
+
+            if (!lsnr.latch.await(5, SECONDS))
+                fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" +
lsnr.latch.getCount() + ']');
+
+            lsnr.latch = null;
+        }
+
+        cur.close();
+    }
+
+    /**
+     * @param cache Cache.
+     * @param parts Number of partitions.
+     * @return Keys.
+     */
+    private List<Integer> testKeys(IgniteCache<Object, Object> cache, int parts)
{
+        Ignite ignite = cache.unwrap(Ignite.class);
+
+        List<Integer> res = new ArrayList<>();
+
+        Affinity<Object> aff = ignite.affinity(cache.getName());
+
+        ClusterNode node = ignite.cluster().localNode();
+
+        int[] nodeParts = aff.primaryPartitions(node);
+
+        final int KEYS_PER_PART = 1;
+
+        for (int i = 0; i < parts; i++) {
+            int part = nodeParts[i];
+
+            int cnt = 0;
+
+            for (int key = 0; key < 100_000; key++) {
+                if (aff.partition(key) == part && aff.isPrimary(node, key)) {
+                    res.add(key);
+
+                    if (++cnt == KEYS_PER_PART)
+                        break;
+                }
+            }
+
+            assertEquals(KEYS_PER_PART, cnt);
+        }
+
+        assertEquals(parts * KEYS_PER_PART, res.size());
+
+        return res;
+    }
+
+    /**
+     *
+     */
+    private static class CacheEventListener implements CacheEntryUpdatedListener<Object,
Object> {
+        /** */
+        private volatile CountDownLatch latch;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
{
+            for (CacheEntryEvent<?, ?> evt : evts) {
+                ignite.log().info("Received cache event: " + evt);
+
+                CountDownLatch latch = this.latch;
+
+                assertTrue(latch != null);
+                assertTrue(latch.getCount() > 0);
+
+                latch.countDown();
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** */
+        private volatile boolean skipMsg;
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException>
ackClosure)
+            throws IgniteSpiException {
+            if (skipMsg && msg instanceof GridIoMessage) {
+                Object msg0 = ((GridIoMessage)msg).message();
+
+                if (msg0 instanceof GridContinuousMessage) {
+                    log.info("Skip continuous message: " + msg0);
+
+                    return;
+                }
+            }
+
+            super.sendMessage(node, msg, ackClosure);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31179c17/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java
new file mode 100644
index 0000000..8b38b7a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverAtomicTest extends CacheContinuousQueryFailoverAbstractTest
{
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+}


Mime
View raw message