ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [11/19] ignite git commit: IGNITE-426 WIP
Date Fri, 23 Oct 2015 11:52:13 GMT
IGNITE-426 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1cfeacb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1cfeacb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1cfeacb

Branch: refs/heads/ignite-426-2-reb
Commit: b1cfeacb988cb7b600f9bb33bdcc6c9a051d57cd
Parents: 01dfcbb
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Tue Sep 29 16:12:46 2015 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Fri Oct 23 14:50:08 2015 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   8 +-
 .../continuous/CacheContinuousQueryEntry.java   |  26 ++-
 .../CacheContinuousQueryFilteredEntry.java      | 228 -------------------
 .../continuous/CacheContinuousQueryHandler.java | 226 +++++++++---------
 .../CacheContinuousQueryListener.java           |   9 +-
 .../CacheContinuousQueryLostPartition.java      |  72 +++---
 .../continuous/CacheContinuousQueryManager.java |  15 +-
 ...acheContinuousQueryFailoverAbstractTest.java |  87 ++++++-
 8 files changed, 282 insertions(+), 389 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b1cfeacb/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6eb9e17..3474f84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -92,7 +92,6 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryBatchAck;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFilteredEntry;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartition;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -693,16 +692,11 @@ public class GridIoMessageFactory implements MessageFactory {
                 break;
 
             case 115:
-                msg = new CacheContinuousQueryFilteredEntry();
-
-                break;
-
-            case 116:
                 msg = new CacheContinuousQueryLostPartition();
 
                 break;
 
-            // [-3..112] - this
+            // [-3..115] - this
             // [120..123] - DR
             // [-4..-22] - SQL
             default:

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1cfeacb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 470aa09..9e73142 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -83,6 +83,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message
{
     private long updateIdx;
 
     /** */
+    private boolean filtered;
+
+    /** */
     @GridToStringInclude
     @GridDirectTransient
     private AffinityTopologyVersion topVer;
@@ -152,6 +155,13 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
     }
 
     /**
+     * Mark this event as filtered.
+     */
+    void markFiltered() {
+        filtered = true;
+    }
+
+    /**
      * @return Update index.
      */
     long updateIndex() {
@@ -162,7 +172,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
      * @return Filtered entry.
      */
     boolean filtered() {
-        return false;
+        return filtered;
     }
 
     /**
@@ -286,6 +296,12 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
 
                 writer.incrementState();
 
+            case 7:
+                if (!writer.writeBoolean("filtered", filtered))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -359,6 +375,14 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
 
                 reader.incrementState();
 
+            case 7:
+                filtered = reader.readBoolean("filtered");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(CacheContinuousQueryEntry.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1cfeacb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java
deleted file mode 100644
index 14d8f51..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import java.nio.ByteBuffer;
-import javax.cache.event.EventType;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Continuous query entry.
- */
-public class CacheContinuousQueryFilteredEntry extends CacheContinuousQueryEntry {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private EventType evtType;
-
-    /** Cache name. */
-    private int cacheId;
-
-    /** Partition. */
-    private int part;
-
-    /** Update index. */
-    private long updateIdx;
-
-    /** */
-    @GridToStringInclude
-    @GridDirectTransient
-    private AffinityTopologyVersion topVer;
-
-    /**
-     * Required by {@link Message}.
-     */
-    public CacheContinuousQueryFilteredEntry() {
-        // No-op.
-    }
-
-    /**
-     * @param e Cache continuous query entry.
-     */
-    CacheContinuousQueryFilteredEntry(CacheContinuousQueryEntry e) {
-        this.cacheId = e.cacheId();
-        this.evtType = e.eventType();
-        this.part = e.partition();
-        this.updateIdx = e.updateIndex();
-        this.topVer = e.topologyVersion();
-    }
-
-    /**
-     * @return Topology version if applicable.
-     */
-    @Nullable AffinityTopologyVersion topologyVersion() {
-        return topVer;
-    }
-
-    /**
-     * @return Cache ID.
-     */
-    int cacheId() {
-        return cacheId;
-    }
-
-    /**
-     * @return Event type.
-     */
-    EventType eventType() {
-        return evtType;
-    }
-
-    /**
-     * @return Partition.
-     */
-    int partition() {
-        return part;
-    }
-
-    /**
-     * @return Update index.
-     */
-    long updateIndex() {
-        return updateIdx;
-    }
-
-    /** {@inheritDoc} */
-    @Override boolean filtered() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 115;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeInt("cacheId", cacheId))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeByte("evtType", evtType != null ? (byte)evtType.ordinal()
: -1))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeInt("part", part))
-                    return false;
-
-                writer.incrementState();
-
-            case 3:
-                if (!writer.writeLong("updateIdx", updateIdx))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                cacheId = reader.readInt("cacheId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                byte evtTypeOrd;
-
-                evtTypeOrd = reader.readByte("evtType");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                evtType = CacheContinuousQueryEntry.eventTypeFromOrdinal(evtTypeOrd);
-
-                reader.incrementState();
-
-            case 2:
-                part = reader.readInt("part");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 3:
-                updateIdx = reader.readLong("updateIdx");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-        }
-
-        return reader.afterMessageRead(CacheContinuousQueryFilteredEntry.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException
{
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 4;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CacheContinuousQueryFilteredEntry.class, this);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1cfeacb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index ca3579e..bb2558c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -41,6 +41,7 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.internal.GridKernalContext;
@@ -83,6 +84,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
     /** */
     private static final int BACKUP_ACK_THRESHOLD = 100;
 
+    /** */
+    private static final int QUERY_HOLE_THRESHOLD = 5;
+
     /** Cache name. */
     private String cacheName;
 
@@ -123,15 +127,12 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
     private transient Collection<CacheContinuousQueryEntry> backupQueue;
 
     /** */
-    private transient Map<Integer, Long> rcvCntrs;
+    private boolean localCache;
 
     /** */
     private transient ConcurrentMap<Integer, PartitionRecovery> rcvs;
 
     /** */
-    private transient IgnitePredicate<CacheContinuousQueryEntry> dupEvtFilter;
-
-    /** */
     private transient AcknowledgeBuffer ackBuf;
 
     /** */
@@ -187,16 +188,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
         this.ignoreExpired = ignoreExpired;
         this.taskHash = taskHash;
         this.skipPrimaryCheck = skipPrimaryCheck;
+        this.localCache = locCache;
 
-        if (locCache)
-            dupEvtFilter = F.alwaysTrue();
-        else {
-            rcvCntrs = new ConcurrentHashMap<>();
-
-            rcvs = new ConcurrentHashMap<>();
-
-            dupEvtFilter = new DuplicateEventFilter();
-        }
+        rcvs = new ConcurrentHashMap<>();
 
         cacheId = CU.cacheId(cacheName);
     }
@@ -268,7 +262,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
                 if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
                     return;
 
-                GridCacheContext<K, V> cctx = cacheContext(ctx);
+                final GridCacheContext<K, V> cctx = cacheContext(ctx);
 
                 // Check that cache stopped.
                 if (cctx == null)
@@ -289,12 +283,53 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
                 }
 
                 try {
-                    final CacheContinuousQueryEntry entry = notify ? evt.entry() :
-                        new CacheContinuousQueryFilteredEntry(evt.entry());
+                    final CacheContinuousQueryEntry entry = evt.entry();
+
+                    if (!notify)
+                        entry.markFiltered();
 
                     if (primary || skipPrimaryCheck) {
                         if (loc) {
-                            if (dupEvtFilter.apply(entry)) {
+                            if (!localCache) {
+                                PartitionRecovery rcv = rcvs.get(entry.partition());
+
+                                if (rcv == null) {
+                                    rcv = new PartitionRecovery(ctx.log(getClass()));
+
+                                    PartitionRecovery oldRec = rcvs.putIfAbsent(entry.partition(),
rcv);
+
+                                    if (oldRec != null)
+                                        rcv = oldRec;
+                                }
+
+                                rcv.add(entry);
+
+                                Collection<CacheContinuousQueryEntry> entries = rcv.entries();
+
+                                if (!entries.isEmpty()) {
+                                    final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
+
+                                    Iterable<CacheEntryEvent<? extends K, ? extends
V>> evts = F.viewReadOnly(entries,
+                                        new C1<CacheContinuousQueryEntry, CacheEntryEvent<?
extends K, ? extends V>>() {
+                                            @Override public CacheEntryEvent<? extends
K, ? extends V> apply(
+                                                CacheContinuousQueryEntry e) {
+                                                return new CacheContinuousQueryEvent<>(cache,
cctx, e);
+                                            }
+                                        },
+                                        new IgnitePredicate<CacheContinuousQueryEntry>()
{
+                                            @Override public boolean apply(CacheContinuousQueryEntry
entry) {
+                                                return !entry.filtered();
+                                            }
+                                        }
+                                    );
+
+                                    locLsnr.onUpdated(evts);
+
+                                    if (!skipPrimaryCheck)
+                                        sendBackupAcknowledge(ackBuf.onAcknowledged(entry),
routineId, ctx);
+                                }
+                            }
+                            else {
                                 locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ?
extends V>>asList(evt));
 
                                 if (!skipPrimaryCheck)
@@ -343,7 +378,16 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
                 }
             }
 
-            @Override public void partitionLost(String cacheName0, int partId) {
+            @Override public void partitionLost(int partId) {
+                assert rcvs != null;
+
+                PartitionRecovery rcv = rcvs.get(partId);
+
+                if (rcv != null)
+                    rcv.reset();
+            }
+
+            @Override public void firePartitionLostEvent(String cacheName0, final int partId)
{
                 GridCacheContext<K, V> cctx = cacheContext(ctx);
 
                 // Check that cache stopped.
@@ -352,25 +396,33 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
 
                 if ((cacheName == null && cacheName0 == null) || // Check default
cache.
                     (cacheName0 != null && cacheName != null && cacheName0.equals(cacheName)))
{
+                    ctx.closure().runLocalSafe(new Runnable() {
+                        @Override public void run() {
+                            GridCacheContext<K, V> cctx = cacheContext(ctx);
 
-                    final CacheContinuousQueryEntry entry =
-                        new CacheContinuousQueryLostPartition(cctx.cacheId(), partId);
+                            CacheContinuousQueryLostPartition msg = new CacheContinuousQueryLostPartition(
+                                routineId,
+                                cctx.cacheId(),
+                                partId);
 
-                    try {
-                        prepareEntry(cctx, nodeId, entry);
+                            try {
+                                cctx.io().send(nodeId, msg, GridIoPolicy.SYSTEM_POOL);
+                            }
+                            catch (ClusterTopologyCheckedException e) {
+                                IgniteLogger log = ctx.log(getClass());
 
-                        ctx.continuous().addNotification(nodeId, routineId, entry, topic,
sync, true);
-                    }
-                    catch (ClusterTopologyCheckedException ex) {
-                        IgniteLogger log = ctx.log(getClass());
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to send lost partition message, node
left " +
+                                        "[msg=" + msg + ", nodeId=" + routineId + ']');
+                            }
+                            catch (IgniteCheckedException e) {
+                                IgniteLogger log = ctx.log(getClass());
 
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to send event notification to node, node left
cluster " +
-                                "[node=" + nodeId + ", err=" + ex + ']');
-                    }
-                    catch (IgniteCheckedException ex) {
-                        U.error(ctx.log(getClass()), "Failed to send event notification to
node: " + nodeId, ex);
-                    }
+                                U.error(log, "Failed to send lost partition message " +
+                                    "[msg=" + msg + ", nodeId=" + routineId + ']', e);
+                            }
+                        }
+                    });
                 }
             }
 
@@ -537,14 +589,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
                     rec = oldRec;
             }
 
-            if (e instanceof CacheContinuousQueryLostPartition)
-                rec.reset();
-            else {
-                rec.add(e);
+            rec.add(e);
 
-                if (!parts.containsKey(e.partition()))
-                    parts.put(e.partition(), rec);
-            }
+            if (!parts.containsKey(e.partition()))
+                parts.put(e.partition(), rec);
         }
 
         Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
@@ -569,29 +617,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
     }
 
     /**
-     * @param e Entry.
-     * @return {@code True} if listener should be notified.
-     */
-    private boolean notifyListener(CacheContinuousQueryEntry e) {
-        Integer part = e.partition();
-
-        Long cntr = rcvCntrs.get(part);
-
-        if (cntr != null) {
-            long cntr0 = cntr;
-
-            if (e.updateIndex() > cntr0)
-                rcvCntrs.put(part, e.updateIndex());
-            else
-                return false;
-        }
-        else
-            rcvCntrs.put(part, e.updateIndex());
-
-        return true;
-    }
-
-    /**
      *
      */
     private static class PartitionRecovery {
@@ -617,15 +642,13 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
          * @param e Cache continuous qeury entry.
          */
         public void add(CacheContinuousQueryEntry e) {
-            synchronized (pendingEnts) {
-                if (pendingEnts.containsKey(e.updateIndex()) || e.updateIndex() <= lastFiredEvt)
-                    e.cacheId();
-                    //log.info("Skip duplicate continuous query entry. Entry: " + e);
-                else {
-                    //log.info("Added continuous query entry. Entry: " + e);
+            assert e != null;
 
+            synchronized (pendingEnts) {
+                if (!pendingEnts.containsKey(e.updateIndex()) && e.updateIndex()
> lastFiredEvt)
                     pendingEnts.put(e.updateIndex(), e);
-                }
+                else if (log.isDebugEnabled())
+                    log.debug("Skip duplicate continuous query message: " + e);
             }
         }
 
@@ -641,45 +664,53 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
 
                 Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEnts.entrySet().iterator();
 
-                Map.Entry<Long, CacheContinuousQueryEntry> prev = null;
-
-                Set<Long> rmvEnts = new HashSet<>();
+                boolean fired = false;
 
+                // The elements are consistently.
                 while (iter.hasNext()) {
                     Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
 
-                    // The elements are consistently.
                     if (e.getKey() == lastFiredEvt + 1) {
                         ++lastFiredEvt;
 
                         entries.add(e.getValue());
 
                         iter.remove();
+
+                        fired = true;
+                    }
+                }
+
+                if (!fired && lastFiredEvt == 0 && pendingEnts.size() >=
QUERY_HOLE_THRESHOLD) {
+                    Long prevCnt = null;
+
+                    int orderedCnt = 0;
+
+                    for (Long cnt : pendingEnts.keySet()) {
+                        if (prevCnt != null) {
+                            if (prevCnt + 1 != cnt)
+                                break;
+                            else
+                                ++orderedCnt;
+                        }
+
+                        prevCnt = cnt;
                     }
-                    // Handle hole in sequence.
-                    else if (prev != null && prev.getKey() + 1 == e.getKey()) {
-                        entries.add(prev.getValue());
 
-                        lastFiredEvt = prev.getKey();
+                    if (orderedCnt >= QUERY_HOLE_THRESHOLD) {
+                        iter = pendingEnts.entrySet().iterator();
 
-                        rmvEnts.add(prev.getKey());
+                        while (entries.size() < orderedCnt) {
+                            Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
 
-                        if (!iter.hasNext()) {
                             entries.add(e.getValue());
 
                             lastFiredEvt = e.getKey();
 
-                            rmvEnts.add(e.getKey());
+                            iter.remove();
                         }
                     }
-                    else if (prev != null)
-                        break;
-
-                    prev = e;
                 }
-
-                for (Long rmKey : rmvEnts)
-                    pendingEnts.remove(rmKey);
             }
 
             return entries;
@@ -737,12 +768,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
 
     /** {@inheritDoc} */
     @Override public void partitionLost(String cacheName, int partId) {
-        if (this.cacheName == null) {
-            int z = 0;
-
-            ++z;
-        }
-
         if ((this.cacheName == null && cacheName == null) // Check default caches.
             || (cacheName != null && this.cacheName != null && cacheName.equals(this.cacheName)))
{
             PartitionRecovery rcv = rcvs.get(partId);
@@ -962,19 +987,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
     }
 
     /**
-     *
-     */
-    private class DuplicateEventFilter implements IgnitePredicate<CacheContinuousQueryEntry>
{
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public boolean apply(CacheContinuousQueryEntry e) {
-            return notifyListener(e);
-        }
-    }
-
-    /**
      * Deployable object.
      */
     private static class DeployableObject implements Externalizable {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1cfeacb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 735e808..a706105 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -60,7 +60,14 @@ interface CacheContinuousQueryListener<K, V> {
      * @param cacheName Cache name.
      * @param partId Partition ID.
      */
-    public void partitionLost(String cacheName, int partId);
+    public void firePartitionLostEvent(String cacheName, int partId);
+
+    /**
+     * Handle partition lost event.
+     *
+     * @param partId Partition ID.
+     */
+    public void partitionLost(int partId);
 
     /**
      * Flushes backup queue.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1cfeacb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
index 734d072..eeb20cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
@@ -18,27 +18,22 @@
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import java.nio.ByteBuffer;
-import javax.cache.event.EventType;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Continuous query entry.
  */
-public class CacheContinuousQueryLostPartition extends CacheContinuousQueryEntry {
+public class CacheContinuousQueryLostPartition extends GridCacheMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Cache name. */
-    private int cacheId;
+    /** Routine ID. */
+    private UUID routineId;
 
     /** Partition. */
     private int part;
@@ -54,34 +49,38 @@ public class CacheContinuousQueryLostPartition extends CacheContinuousQueryEntry
      * @param cacheId Cache ID.
      * @param part Partition ID.
      */
-    CacheContinuousQueryLostPartition(int cacheId, int part) {
+    CacheContinuousQueryLostPartition(UUID routineId, int cacheId, int part) {
+        this.routineId = routineId;
         this.cacheId = cacheId;
         this.part = part;
     }
 
     /**
-     * @return Cache ID.
+     * @return Partition.
      */
-    int cacheId() {
-        return cacheId;
+    int partition() {
+        return part;
     }
 
     /**
-     * @return Partition.
+     * @return Routine ID.
      */
-    int partition() {
-        return part;
+    UUID routineId() {
+        return routineId;
     }
 
     /** {@inheritDoc} */
     @Override public byte directType() {
-        return 116;
+        return 115;
     }
 
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 
+        if (!super.writeTo(buf, writer))
+            return false;
+
         if (!writer.isHeaderWritten()) {
             if (!writer.writeHeader(directType(), fieldsCount()))
                 return false;
@@ -90,17 +89,18 @@ public class CacheContinuousQueryLostPartition extends CacheContinuousQueryEntry
         }
 
         switch (writer.state()) {
-            case 0:
-                if (!writer.writeInt("cacheId", cacheId))
+            case 3:
+                if (!writer.writeInt("part", part))
                     return false;
 
                 writer.incrementState();
 
-            case 1:
-                if (!writer.writeInt("part", part))
+            case 4:
+                if (!writer.writeUuid("routineId", routineId))
                     return false;
 
                 writer.incrementState();
+
         }
 
         return true;
@@ -113,44 +113,36 @@ public class CacheContinuousQueryLostPartition extends CacheContinuousQueryEntry
         if (!reader.beforeMessageRead())
             return false;
 
+        if (!super.readFrom(buf, reader))
+            return false;
+
         switch (reader.state()) {
-            case 0:
-                cacheId = reader.readInt("cacheId");
+            case 3:
+                part = reader.readInt("part");
 
                 if (!reader.isLastRead())
                     return false;
 
                 reader.incrementState();
 
-            case 1:
-                part = reader.readInt("part");
+            case 4:
+                routineId = reader.readUuid("routineId");
 
                 if (!reader.isLastRead())
                     return false;
 
-                reader.incrementState();
         }
 
-        return reader.afterMessageRead(CacheContinuousQueryLostPartition.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException
{
-        // No-op.
+        return true;
     }
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 2;
+        return 5;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(CacheContinuousQueryLostPartition.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1cfeacb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 35c6696..d0d877d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -46,7 +46,6 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.events.CacheRebalancingEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -128,6 +127,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
                 }
             });
 
+        cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryLostPartition.class,
+            new CI2<UUID, CacheContinuousQueryLostPartition>() {
+                @Override public void apply(UUID uuid, CacheContinuousQueryLostPartition
msg) {
+                    CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId());
+
+                    if (lsnr != null)
+                        lsnr.partitionLost(msg.partition());
+                }
+            });
+
         cctx.time().schedule(new Runnable() {
             @Override public void run() {
                 for (CacheContinuousQueryListener lsnr : lsnrs.values())
@@ -145,10 +154,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
                 CacheRebalancingEvent evt0 = (CacheRebalancingEvent)evt;
 
                 for (CacheContinuousQueryListener lsnr : lsnrs.values())
-                    lsnr.partitionLost(evt0.cacheName(), evt0.partition());
+                    lsnr.firePartitionLostEvent(evt0.cacheName(), evt0.partition());
 
                 for (CacheContinuousQueryListener lsnr : intLsnrs.values())
-                    lsnr.partitionLost(evt0.cacheName(), evt0.partition());
+                    lsnr.firePartitionLostEvent(evt0.cacheName(), evt0.partition());
             }
         }, EVT_CACHE_REBALANCE_PART_DATA_LOST);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1cfeacb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index 3bba5e6..61fa6cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -264,6 +264,84 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends
GridCommo
     /**
      * @throws Exception If failed.
      */
+    public void testStartStopQuery() throws Exception {
+        this.backups = 1;
+
+        final int SRV_NODES = 3;
+
+        startGridsMultiThreaded(SRV_NODES);
+
+        client = true;
+
+        final Ignite qryClient = startGrid(SRV_NODES);
+
+        client = false;
+
+        IgniteCache<Object, Object> clnCache = qryClient.cache(null);
+
+        Ignite igniteSrv = ignite(0);
+
+        IgniteCache<Object, Object> srvCache = igniteSrv.cache(null);
+
+        List<Integer> keys = testKeys(srvCache, 1);
+
+        int keyCnt = keys.size();
+
+        for (int j = 0; j < 50; ++j) {
+            ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+            final TestLocalListener lsnr = new TestLocalListener();
+
+            qry.setLocalListener(lsnr);
+
+            int keyIter = 0;
+
+            for (; keyIter < keyCnt / 2; keyIter++) {
+                int key = keys.get(keyIter);
+
+                clnCache.put(key, key);
+            }
+
+            assert lsnr.evts.isEmpty();
+
+            QueryCursor<Cache.Entry<Object, Object>> query = clnCache.query(qry);
+
+            Map<Object, T2<Object, Object>> updates = new HashMap<>();
+
+            final List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
+
+            Affinity<Object> aff = affinity(srvCache);
+
+            for (; keyIter < keys.size(); keyIter++) {
+                int key = keys.get(keyIter);
+
+                log.info("Put [key=" + key + ", part=" + aff.partition(key) + ']');
+
+                T2<Object, Object> t = updates.get(key);
+
+                if (t == null) {
+                    updates.put(key, new T2<>((Object)key, null));
+
+                    expEvts.add(new T3<>((Object)key, (Object)key, null));
+                }
+                else {
+                    updates.put(key, new T2<>((Object)key, (Object)key));
+
+                    expEvts.add(new T3<>((Object)key, (Object)key, (Object)key));
+                }
+
+                srvCache.put(key, key);
+            }
+
+            checkEvents(expEvts, lsnr);
+
+            query.close();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testLeftPrimaryAndBackupNodes() throws Exception {
         this.backups = 1;
 
@@ -745,8 +823,13 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends
GridCommo
      * @param expEvts Expected events.
      * @param lsnr Listener.
      */
-    private void checkEvents(List<T3<Object, Object, Object>> expEvts, TestLocalListener
lsnr) {
-        assert lsnr.evts.size() == expEvts.size();
+    private void checkEvents(final List<T3<Object, Object, Object>> expEvts,
final TestLocalListener lsnr)
+        throws Exception {
+        assert GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return lsnr.evts.size() == expEvts.size();
+            }
+        }, 2000L);
 
         for (T3<Object, Object, Object> exp : expEvts) {
             CacheEntryEvent<?, ?> e = lsnr.evts.get(exp.get1());


Mime
View raw message