ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [06/43] ignite git commit: ignite-5075 Implement logical 'cache groups' sharing the same physical caches
Date Thu, 08 Jun 2017 12:32:59 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 06a3416..b112e1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -104,7 +104,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
 
         assert cctx.config().getCacheMode() != LOCAL;
 
-        cctx.io().addHandler(cctx.cacheId(), GridCacheQueryRequest.class, new CI2<UUID, GridCacheQueryRequest>() {
+        cctx.io().addCacheHandler(cctx.cacheId(), GridCacheQueryRequest.class, new CI2<UUID, GridCacheQueryRequest>() {
             @Override public void apply(UUID nodeId, GridCacheQueryRequest req) {
                 processQueryRequest(nodeId, req);
             }
@@ -560,11 +560,11 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
 
             final Object topic = topic(cctx.nodeId(), req.id());
 
-            cctx.io().addOrderedHandler(topic, resHnd);
+            cctx.io().addOrderedCacheHandler(topic, resHnd);
 
             fut.listen(new CI1<IgniteInternalFuture<?>>() {
                 @Override public void apply(IgniteInternalFuture<?> fut) {
-                    cctx.io().removeOrderedHandler(topic);
+                    cctx.io().removeOrderedHandler(false, topic);
                 }
             });
 
@@ -744,11 +744,11 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
 
             final Object topic = topic(cctx.nodeId(), req.id());
 
-            cctx.io().addOrderedHandler(topic, resHnd);
+            cctx.io().addOrderedCacheHandler(topic, resHnd);
 
             fut.listen(new CI1<IgniteInternalFuture<?>>() {
                 @Override public void apply(IgniteInternalFuture<?> fut) {
-                    cctx.io().removeOrderedHandler(topic);
+                    cctx.io().removeOrderedHandler(false, topic);
                 }
             });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 07545a5..0c264db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -863,12 +863,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                 locPart = locPart0;
 
-                it = cctx.offheap().iterator(part);
+                it = cctx.offheap().cachePartitionIterator(cctx.cacheId(), part);
             }
             else {
                 locPart = null;
 
-                it = cctx.offheap().iterator(true, backups, topVer);
+                it = cctx.offheap().cacheIterator(cctx.cacheId(), true, backups, topVer);
             }
 
             return new PeekValueExpiryAwareIterator(it, plc, topVer, keyValFilter, qry.keepBinary(), locNode) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 00ddff8..9dc7817 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -47,7 +48,7 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
 /**
  * Query request.
  */
-public class GridCacheQueryRequest extends GridCacheMessage implements GridCacheDeployable {
+public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
index 4d8e658..521aacf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
@@ -46,7 +46,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Query request.
  */
-public class GridCacheQueryResponse extends GridCacheMessage implements GridCacheDeployable {
+public class GridCacheQueryResponse extends GridCacheIdMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
index 76147ee..ef0157e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.GridDirectMap;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -31,7 +31,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  * Batch acknowledgement.
  */
-public class CacheContinuousQueryBatchAck extends GridCacheMessage {
+public class CacheContinuousQueryBatchAck extends GridCacheIdMessage {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index 336f650..7a7c045 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -155,9 +155,12 @@ public class CacheContinuousQueryEventBuffer {
             batch = initBatch(entry.topologyVersion());
 
             if (batch == null || cntr < batch.startCntr) {
-                if (backup)
+                if (backup) {
                     backupQ.add(entry);
 
+                    return null;
+                }
+
                 return entry;
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index e5347c8..2b696a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -487,6 +487,70 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 onEntryUpdated(evt, primary, false, null);
             }
 
+            @Override public CounterSkipContext skipUpdateCounter(final GridCacheContext cctx,
+                @Nullable CounterSkipContext skipCtx,
+                int part,
+                long cntr,
+                AffinityTopologyVersion topVer,
+                boolean primary) {
+                if (skipCtx == null)
+                    skipCtx = new CounterSkipContext(part, cntr, topVer);
+
+                if (loc) {
+                    assert !locCache;
+
+                    final Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, skipCtx.entry());
+
+                    if (!evts.isEmpty()) {
+                        if (asyncCb) {
+                            ctx.asyncCallbackPool().execute(new Runnable() {
+                                @Override public void run() {
+                                    locLsnr.onUpdated(evts);
+                                }
+                            }, part);
+                        }
+                        else
+                            skipCtx.addProcessClosure(new Runnable() {
+                                @Override public void run() {
+                                    locLsnr.onUpdated(evts);
+                                }
+                            });
+                    }
+
+                    return skipCtx;
+                }
+
+                CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, part);
+
+                final Object entryOrList = buf.processEntry(skipCtx.entry(), !primary);
+
+                if (entryOrList != null) {
+                    skipCtx.addProcessClosure(new Runnable() {
+                        @Override public void run() {
+                            try {
+                                ctx.continuous().addNotification(nodeId,
+                                    routineId,
+                                    entryOrList,
+                                    topic,
+                                    false,
+                                    true);
+                            }
+                            catch (ClusterTopologyCheckedException ex) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to send event notification to node, node left cluster " +
+                                        "[node=" + nodeId + ", err=" + ex + ']');
+                            }
+                            catch (IgniteCheckedException ex) {
+                                U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY),
+                                    "Failed to send event notification to node: " + nodeId, ex);
+                            }
+                        }
+                    });
+                }
+
+                return skipCtx;
+            }
+
             @Override public void onPartitionEvicted(int part) {
                 entryBufs.remove(part);
             }
@@ -1011,7 +1075,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                         t.get1());
 
                     for (AffinityTopologyVersion topVer : t.get2()) {
-                        for (ClusterNode node : ctx.discovery().cacheAffinityNodes(cctx.name(), topVer)) {
+                        for (ClusterNode node : ctx.discovery().cacheGroupAffinityNodes(cctx.groupId(), topVer)) {
                             if (!node.isLocal()) {
                                 try {
                                     cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 84b22f9..7da657f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
 import java.util.Map;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
 import org.jetbrains.annotations.Nullable;
 
@@ -76,6 +77,25 @@ public interface CacheContinuousQueryListener<K, V> {
     public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer, boolean primary);
 
     /**
+     * For cache updates in shared cache group need notify others caches CQ listeners
+     * that generated counter should be skipped.
+     *
+     * @param cctx Cache context.
+     * @param skipCtx Context.
+     * @param part Partition.
+     * @param cntr Counter to skip.
+     * @param topVer Topology version.
+     * @return Context.
+     */
+    @Nullable public CounterSkipContext skipUpdateCounter(
+        GridCacheContext cctx,
+        @Nullable CounterSkipContext skipCtx,
+        int part,
+        long cntr,
+        AffinityTopologyVersion topVer,
+        boolean primary);
+
+    /**
      * @param part Partition.
      */
     public void onPartitionEvicted(int part);

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 1a655e9..f264056 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -127,7 +127,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name());
 
         if (cctx.affinityNode()) {
-            cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class,
+            cctx.io().addCacheHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class,
                 new CI2<UUID, CacheContinuousQueryBatchAck>() {
                     @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) {
                         CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId());
@@ -175,7 +175,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @param primary Primary.
      * @param topVer Topology version.
      */
-    public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
+    private void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
         KeyCacheObject key,
         int partId,
         long updCntr,
@@ -204,6 +204,24 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param skipCtx Context.
+     * @param part Partition number.
+     * @param cntr Update counter.
+     * @param topVer Topology version.
+     * @return Context.
+     */
+    @Nullable public CounterSkipContext skipUpdateCounter(@Nullable CounterSkipContext skipCtx,
+        int part,
+        long cntr,
+        AffinityTopologyVersion topVer,
+        boolean primary) {
+        for (CacheContinuousQueryListener lsnr : lsnrs.values())
+            skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr, topVer, primary);
+
+        return skipCtx;
+    }
+
+    /**
      * @param internal Internal entry flag (internal key or not user cache).
      * @param preload Whether update happened during preloading.
      * @return Registered listeners.
@@ -633,7 +651,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         hnd.localCache(cctx.isLocal());
 
         IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ?
-            F.nodeForNodeId(cctx.localNodeId()) : cctx.config().getNodeFilter();
+            F.nodeForNodeId(cctx.localNodeId()) : cctx.group().nodeFilter();
 
         assert pred != null : cctx.config();
 
@@ -658,7 +676,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         }
 
         if (notifyExisting) {
-            final Iterator<CacheDataRow> it = cctx.offheap().iterator(true, true, AffinityTopologyVersion.NONE);
+            final Iterator<CacheDataRow> it = cctx.offheap().cacheIterator(cctx.cacheId(),
+                true,
+                true,
+                AffinityTopologyVersion.NONE);
 
             locLsnr.onUpdated(new Iterable<CacheEntryEvent>() {
                 @Override public Iterator<CacheEntryEvent> iterator() {
@@ -807,16 +828,24 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 intLsnrCnt.incrementAndGet();
         }
         else {
-            added = lsnrs.putIfAbsent(lsnrId, lsnr) == null;
+            synchronized (this) {
+                if (lsnrCnt.get() == 0) {
+                    if (cctx.group().sharedGroup() && !cctx.isLocal())
+                        cctx.group().addCacheWithContinuousQuery(cctx);
+                }
 
-            if (added) {
-                lsnrCnt.incrementAndGet();
+                added = lsnrs.putIfAbsent(lsnrId, lsnr) == null;
 
-                lsnr.onExecution();
+                if (added)
+                    lsnrCnt.incrementAndGet();
             }
+
+            if (added)
+                lsnr.onExecution();
         }
 
-        return added ? GridContinuousHandler.RegisterStatus.REGISTERED : GridContinuousHandler.RegisterStatus.NOT_REGISTERED;
+        return added ? GridContinuousHandler.RegisterStatus.REGISTERED :
+            GridContinuousHandler.RegisterStatus.NOT_REGISTERED;
     }
 
     /**
@@ -834,11 +863,17 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             }
         }
         else {
-            if ((lsnr = lsnrs.remove(id)) != null) {
-                lsnrCnt.decrementAndGet();
+            synchronized (this) {
+                if ((lsnr = lsnrs.remove(id)) != null) {
+                    int cnt = lsnrCnt.decrementAndGet();
 
-                lsnr.onUnregister();
+                    if (cctx.group().sharedGroup() && cnt == 0 && !cctx.isLocal())
+                        cctx.group().removeCacheWithContinuousQuery(cctx);
+                }
             }
+
+            if (lsnr != null)
+                lsnr.onUnregister();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
new file mode 100644
index 0000000..23702bc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class CounterSkipContext {
+    /** */
+    private final CacheContinuousQueryEntry entry;
+
+    /** */
+    private List<Runnable> procC;
+
+    /**
+     * @param part Partition.
+     * @param cntr Filtered counter.
+     * @param topVer Topology version.
+     */
+    CounterSkipContext(int part, long cntr, AffinityTopologyVersion topVer) {
+        entry = new CacheContinuousQueryEntry(0,
+            null,
+            null,
+            null,
+            null,
+            false,
+            part,
+            cntr,
+            topVer,
+            (byte)0);
+
+        entry.markFiltered();
+    }
+
+    /**
+     * @return Entry for filtered counter.
+     */
+    CacheContinuousQueryEntry entry() {
+        return entry;
+    }
+
+    /**
+     * @return Entries
+     */
+    @Nullable public List<Runnable> processClosures() {
+        return procC;
+    }
+
+    /**
+     * @param c Closure send
+     */
+    void addProcessClosure(Runnable c) {
+        if (procC == null)
+            procC = new ArrayList<>();
+
+        procC.add(c);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 5cba0cf..96af425 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1480,7 +1480,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
             if (modified)
                 cacheVal = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(val));
 
-            GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP;
+            GridCacheOperation op = modified ? (cacheVal == null ? DELETE : UPDATE) : NOOP;
 
             if (op == NOOP) {
                 ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 163ed99..30aa335 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -565,7 +565,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
      */
     public void cached(GridCacheEntryEx entry) {
         assert entry == null || entry.context() == ctx : "Invalid entry assigned to tx entry [txEntry=" + this +
-            ", entry=" + entry + ", ctxNear=" + ctx.isNear() + ", ctxDht=" + ctx.isDht() + ']';
+            ", entry=" + entry +
+            ", ctxNear=" + ctx.isNear() +
+            ", ctxDht=" + ctx.isDht() + ']';
 
         this.entry = entry;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index a591517..ba3b2b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -136,68 +136,68 @@ public class IgniteTxHandler {
         txPrepareMsgLog = ctx.logger(CU.TX_MSG_PREPARE_LOG_CATEGORY);
         txFinishMsgLog = ctx.logger(CU.TX_MSG_FINISH_LOG_CATEGORY);
 
-        ctx.io().addHandler(0, GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(0, GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(0, GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(0, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(0, GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(0, GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(0, GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(0, GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(0, GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridCacheTxRecoveryRequest.class,
+        ctx.io().addCacheHandler(0, GridCacheTxRecoveryRequest.class,
             new CI2<UUID, GridCacheTxRecoveryRequest>() {
                 @Override public void apply(UUID nodeId, GridCacheTxRecoveryRequest req) {
                     processCheckPreparedTxRequest(nodeId, req);
                 }
             });
 
-        ctx.io().addHandler(0, GridCacheTxRecoveryResponse.class,
+        ctx.io().addCacheHandler(0, GridCacheTxRecoveryResponse.class,
             new CI2<UUID, GridCacheTxRecoveryResponse>() {
                 @Override public void apply(UUID nodeId, GridCacheTxRecoveryResponse res) {
                     processCheckPreparedTxResponse(nodeId, res);
@@ -506,8 +506,8 @@ public class IgniteTxHandler {
         for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
             GridCacheContext ctx = e.context();
 
-            Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), expVer);
-            Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), curVer);
+            Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), expVer);
+            Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), curVer);
 
             if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0)
                 return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 5a708d7..52a0f56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -660,6 +660,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                             txEntry.updateCounter())));
 
                                     if (op == CREATE || op == UPDATE) {
+                                        assert val != null : txEntry;
+
                                         GridCacheUpdateTxResult updRes = cached.innerSet(
                                             this,
                                             eventNodeId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java
index d1d6afd..94fe005 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java
@@ -70,6 +70,16 @@ public class TxLocksRequest extends GridCacheMessage {
         this.txKeys = txKeys;
     }
 
+    /** {@inheritDoc} */
+    @Override public int handlerId() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean cacheGroupMessage() {
+        return false;
+    }
+
     /**
      * @return Future ID.
      */
@@ -139,13 +149,13 @@ public class TxLocksRequest extends GridCacheMessage {
         }
 
         switch (writer.state()) {
-            case 3:
+            case 2:
                 if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
 
-            case 4:
+            case 3:
                 if (!writer.writeObjectArray("txKeysArr", txKeysArr, MessageCollectionItemType.MSG))
                     return false;
 
@@ -167,7 +177,7 @@ public class TxLocksRequest extends GridCacheMessage {
             return false;
 
         switch (reader.state()) {
-            case 3:
+            case 2:
                 futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
@@ -175,7 +185,7 @@ public class TxLocksRequest extends GridCacheMessage {
 
                 reader.incrementState();
 
-            case 4:
+            case 3:
                 txKeysArr = reader.readObjectArray("txKeysArr", MessageCollectionItemType.MSG, IgniteTxKey.class);
 
                 if (!reader.isLastRead())
@@ -195,7 +205,7 @@ public class TxLocksRequest extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 5;
+        return 4;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java
index 7856eaa..a5c8f09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java
@@ -73,6 +73,16 @@ public class TxLocksResponse extends GridCacheMessage {
         // No-op.
     }
 
+    /** {@inheritDoc} */
+    @Override public int handlerId() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean cacheGroupMessage() {
+        return false;
+    }
+
     /**
      * @return Future ID.
      */
@@ -229,25 +239,25 @@ public class TxLocksResponse extends GridCacheMessage {
         }
 
         switch (writer.state()) {
-            case 3:
+            case 2:
                 if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
 
-            case 4:
+            case 3:
                 if (!writer.writeObjectArray("locksArr", locksArr, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 5:
+            case 4:
                 if (!writer.writeObjectArray("nearTxKeysArr", nearTxKeysArr, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 6:
+            case 5:
                 if (!writer.writeObjectArray("txKeysArr", txKeysArr, MessageCollectionItemType.MSG))
                     return false;
 
@@ -269,7 +279,7 @@ public class TxLocksResponse extends GridCacheMessage {
             return false;
 
         switch (reader.state()) {
-            case 3:
+            case 2:
                 futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
@@ -277,7 +287,7 @@ public class TxLocksResponse extends GridCacheMessage {
 
                 reader.incrementState();
 
-            case 4:
+            case 3:
                 locksArr = reader.readObjectArray("locksArr", MessageCollectionItemType.MSG, TxLockList.class);
 
                 if (!reader.isLastRead())
@@ -285,7 +295,7 @@ public class TxLocksResponse extends GridCacheMessage {
 
                 reader.incrementState();
 
-            case 5:
+            case 4:
                 nearTxKeysArr = reader.readObjectArray("nearTxKeysArr", MessageCollectionItemType.MSG, IgniteTxKey.class);
 
                 if (!reader.isLastRead())
@@ -293,7 +303,7 @@ public class TxLocksResponse extends GridCacheMessage {
 
                 reader.incrementState();
 
-            case 6:
+            case 5:
                 txKeysArr = reader.readObjectArray("txKeysArr", MessageCollectionItemType.MSG, IgniteTxKey.class);
 
                 if (!reader.isLastRead())
@@ -313,7 +323,7 @@ public class TxLocksResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 6;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index b25b229..56f1183 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -134,7 +134,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
         cacheProc = ctx.cache();
         sharedCtx = cacheProc.context();
 
-        sharedCtx.io().addHandler(0,
+        sharedCtx.io().addCacheHandler(0,
             GridChangeGlobalStateMessageResponse.class,
             new CI2<UUID, GridChangeGlobalStateMessageResponse>() {
                 @Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
@@ -194,7 +194,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
     @Override public void stop(boolean cancel) throws IgniteCheckedException {
         super.stop(cancel);
 
-        sharedCtx.io().removeHandler(0, GridChangeGlobalStateMessageResponse.class);
+        sharedCtx.io().removeHandler(false, 0, GridChangeGlobalStateMessageResponse.class);
         ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
         IgniteCheckedException stopErr = new IgniteInterruptedCheckedException(
@@ -377,7 +377,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
 
         actx.setFail();
 
-        // revert change if activation request fail
+        // Revert change if activation request fail.
         if (actx.activate) {
             try {
                 cacheProc.onKernalStopCaches(true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 175bcea..edf8dc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -1561,7 +1561,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                     fut.onDone(e);
                 }
                 catch (Throwable e) {
-                    log.error("Failed to rebuild indexes for type: " + typeName, e);
+                    U.error(log, "Failed to rebuild indexes for type [cache=" + cacheName +
+                        ", type=" + typeName + ']', e);
 
                     fut.onDone(e);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
index 6ac2390..fd498ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
@@ -17,6 +17,19 @@
 
 package org.apache.ignite.internal.processors.query;
 
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.QueryEntity;
@@ -31,32 +44,18 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
-import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
 import org.apache.ignite.internal.processors.query.property.QueryBinaryProperty;
 import org.apache.ignite.internal.processors.query.property.QueryClassProperty;
 import org.apache.ignite.internal.processors.query.property.QueryFieldAccessor;
 import org.apache.ignite.internal.processors.query.property.QueryMethodsAccessor;
 import org.apache.ignite.internal.processors.query.property.QueryPropertyAccessor;
 import org.apache.ignite.internal.processors.query.property.QueryReadOnlyMethodsAccessor;
+import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
-import java.lang.reflect.Method;
-import java.math.BigDecimal;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_INDEXING_DISCOVERY_HISTORY_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java
index 69188c5..af65de0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java
@@ -124,7 +124,7 @@ public class VisorCachePartitionsTask extends VisorMultiNodeTask<VisorCacheParti
                     for (GridDhtLocalPartition part : locParts) {
                         int p = part.id();
 
-                        long sz = part.dataStore().size();
+                        long sz = part.dataStore().cacheSize(ca.context().cacheId());
 
                         // Pass NONE as topology version in order not to wait for topology version.
                         if (part.primary(AffinityTopologyVersion.NONE))

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java
index 5bd53bb..fdc98ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java
@@ -204,7 +204,7 @@ public class CacheAtomicSingleMessageCountSelfTest extends GridCommonAbstractTes
             throws IgniteSpiException {
 
             if (((GridIoMessage)msg).message() instanceof GridCacheMessage) {
-                int msgCacheId = ((GridCacheMessage)((GridIoMessage)msg).message()).cacheId();
+                int msgCacheId = ((GridCacheIdMessage)((GridIoMessage)msg).message()).cacheId();
 
                 if (filterCacheId == null || filterCacheId == msgCacheId) {
                     AtomicInteger cntr = cntMap.get(((GridIoMessage)msg).message().getClass());

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeferredDeleteQueueTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeferredDeleteQueueTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeferredDeleteQueueTest.java
index 65037d8..18a35c6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeferredDeleteQueueTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeferredDeleteQueueTest.java
@@ -116,7 +116,7 @@ public class CacheDeferredDeleteQueueTest extends GridCommonAbstractTest {
                         for (GridDhtLocalPartition p : top.currentLocalPartitions()) {
                             Collection<Object> rmvQueue = GridTestUtils.getFieldValue(p, "rmvQueue");
 
-                            if (!rmvQueue.isEmpty() || p.dataStore().size() != 0)
+                            if (!rmvQueue.isEmpty() || p.dataStore().fullSize() != 0)
                                 return false;
                         }
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
index c060eb3..7263656 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java
@@ -76,7 +76,7 @@ public class CacheDhtLocalPartitionAfterRemoveSelfTest extends GridCommonAbstrac
             cache = grid(g).cache(DEFAULT_CACHE_NAME);
 
             for (GridDhtLocalPartition p : dht(cache).topology().localPartitions()) {
-                int size = p.dataStore().size();
+                int size = p.dataStore().fullSize();
 
                 assertTrue("Unexpected size: " + size, size <= 32);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
index d8a2065..f3a2204 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
@@ -283,30 +283,33 @@ public class CacheExchangeMessageDuplicatedStateTest extends GridCommonAbstractT
         Map<Integer, Integer> dupPartsData,
         GridDhtPartitionsFullMessage msg)
     {
-        Integer cacheId;
-        Integer dupCacheId;
+        int cache1Grp = groupIdForCache(ignite(0), cache1);
+        int cache2Grp = groupIdForCache(ignite(0), cache2);
 
-        if (dupPartsData.containsKey(CU.cacheId(cache1))) {
-            cacheId = CU.cacheId(cache1);
-            dupCacheId = CU.cacheId(cache2);
+        Integer grpId;
+        Integer dupGrpId;
+
+        if (dupPartsData.containsKey(cache1Grp)) {
+            grpId = cache1Grp;
+            dupGrpId = cache2Grp;
         }
         else {
-            cacheId = CU.cacheId(cache2);
-            dupCacheId = CU.cacheId(cache1);
+            grpId = cache2Grp;
+            dupGrpId = cache1Grp;
         }
 
-        assertTrue(dupPartsData.containsKey(cacheId));
-        assertEquals(dupCacheId, dupPartsData.get(cacheId));
-        assertFalse(dupPartsData.containsKey(dupCacheId));
+        assertTrue(dupPartsData.containsKey(grpId));
+        assertEquals(dupGrpId, dupPartsData.get(grpId));
+        assertFalse(dupPartsData.containsKey(dupGrpId));
 
         Map<Integer, GridDhtPartitionFullMap> parts = msg.partitions();
 
-        GridDhtPartitionFullMap emptyFullMap = parts.get(cacheId);
+        GridDhtPartitionFullMap emptyFullMap = parts.get(grpId);
 
         for (GridDhtPartitionMap map : emptyFullMap.values())
             assertEquals(0, map.map().size());
 
-        GridDhtPartitionFullMap fullMap = parts.get(dupCacheId);
+        GridDhtPartitionFullMap fullMap = parts.get(dupGrpId);
 
         for (GridDhtPartitionMap map : fullMap.values())
             assertFalse(map.map().isEmpty());
@@ -323,29 +326,32 @@ public class CacheExchangeMessageDuplicatedStateTest extends GridCommonAbstractT
         Map<Integer, Integer> dupPartsData,
         GridDhtPartitionsSingleMessage msg)
     {
-        Integer cacheId;
-        Integer dupCacheId;
+        int cache1Grp = groupIdForCache(ignite(0), cache1);
+        int cache2Grp = groupIdForCache(ignite(0), cache2);
+
+        Integer grpId;
+        Integer dupGrpId;
 
-        if (dupPartsData.containsKey(CU.cacheId(cache1))) {
-            cacheId = CU.cacheId(cache1);
-            dupCacheId = CU.cacheId(cache2);
+        if (dupPartsData.containsKey(cache1Grp)) {
+            grpId = cache1Grp;
+            dupGrpId = cache2Grp;
         }
         else {
-            cacheId = CU.cacheId(cache2);
-            dupCacheId = CU.cacheId(cache1);
+            grpId = cache2Grp;
+            dupGrpId = cache1Grp;
         }
 
-        assertTrue(dupPartsData.containsKey(cacheId));
-        assertEquals(dupCacheId, dupPartsData.get(cacheId));
-        assertFalse(dupPartsData.containsKey(dupCacheId));
+        assertTrue(dupPartsData.containsKey(grpId));
+        assertEquals(dupGrpId, dupPartsData.get(grpId));
+        assertFalse(dupPartsData.containsKey(dupGrpId));
 
         Map<Integer, GridDhtPartitionMap> parts = msg.partitions();
 
-        GridDhtPartitionMap emptyMap = parts.get(cacheId);
+        GridDhtPartitionMap emptyMap = parts.get(grpId);
 
         assertEquals(0, emptyMap.map().size());
 
-        GridDhtPartitionMap map = parts.get(dupCacheId);
+        GridDhtPartitionMap map = parts.get(dupGrpId);
 
         assertFalse(map.map().isEmpty());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
index 1886c76..bdc5507 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
@@ -22,8 +22,7 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCacheEntry;
-import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.local.GridLocalCacheEntry;
 
@@ -97,9 +96,9 @@ public class CacheOffheapMapEntrySelfTest extends GridCacheAbstractSelfTest {
 
         checkCacheMapEntry(TRANSACTIONAL, PARTITIONED, GridNearCacheEntry.class);
 
-        checkCacheMapEntry(ATOMIC, REPLICATED, GridDhtAtomicCacheEntry.class);
+        checkCacheMapEntry(ATOMIC, REPLICATED, GridDhtCacheEntry.class);
 
-        checkCacheMapEntry(TRANSACTIONAL, REPLICATED, GridDhtColocatedCacheEntry.class);
+        checkCacheMapEntry(TRANSACTIONAL, REPLICATED, GridDhtCacheEntry.class);
     }
 
     /**
@@ -135,7 +134,7 @@ public class CacheOffheapMapEntrySelfTest extends GridCacheAbstractSelfTest {
 
             assertNotNull(entry);
 
-            assertEquals(entry.getClass(), entryCls);
+            assertEquals(entryCls, entry.getClass());
         }
         finally {
             jcache.destroy();

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
index eefdf9d..b0d9f0e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
@@ -167,10 +167,16 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
         }
     }
 
+    /**
+     * @return Cache context.
+     */
     protected GridCacheContext cacheContext() {
         return ((IgniteCacheProxy)grid(0).cache(DEFAULT_CACHE_NAME)).context();
     }
 
+    /**
+     * @return IO manager.
+     */
     protected GridCacheIoManager cacheIoManager() {
         return grid(0).context().cache().context().io();
     }
@@ -182,10 +188,22 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
         /** */
         public static final short DIRECT_TYPE = 302;
 
+        /** {@inheritDoc} */
+        @Override public int handlerId() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean cacheGroupMessage() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
         @Override public short directType() {
             return DIRECT_TYPE;
         }
 
+        /** {@inheritDoc} */
         @Override public byte fieldsCount() {
             return 3;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java
index a13ad64..cff9745 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -128,7 +129,7 @@ public class GridCacheLeakTest extends GridCommonAbstractTest {
                         GridCacheConcurrentMap map = ((IgniteKernal)grid(g)).internalCache(CACHE_NAME).map();
 
                         info("Map size for cache [g=" + g + ", size=" + map.internalSize() +
-                            ", pubSize=" + map.publicSize() + ']');
+                            ", pubSize=" + map.publicSize(CU.cacheId(CACHE_NAME)) + ']');
 
                         assertTrue("Wrong map size: " + map.internalSize(), map.internalSize() <= 8192);
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java
index 7562fe5..bc4f2cb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java
@@ -68,8 +68,10 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest {
     /** Caches rebalance finish times. */
     private ConcurrentHashMap8<Integer, ConcurrentHashMap8<String, Long>> times;
 
+    /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTestsStarted();
+
         times = new ConcurrentHashMap8<>();
 
         for (int i = 0; i < GRID_CNT; i++)
@@ -93,8 +95,8 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest {
         Map<IgnitePredicate<? extends Event>, int[]> listeners = new HashMap<>();
 
         listeners.put(new IgnitePredicate<CacheRebalancingEvent>() {
-            @Override public boolean apply(CacheRebalancingEvent event) {
-                times.get(gridIdx(event)).putIfAbsent(event.cacheName(), event.timestamp());
+            @Override public boolean apply(CacheRebalancingEvent evt) {
+                times.get(gridIdx(evt)).putIfAbsent(evt.cacheName(), evt.timestamp());
                 return true;
             }
         }, new int[]{EventType.EVT_CACHE_REBALANCE_STOPPED});
@@ -194,7 +196,11 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest {
         }
     }
 
-    private int gridIdx(Event event) {
-        return getTestIgniteInstanceIndex((String)event.node().attributes().get(GRID_NAME_ATTR));
+    /**
+     * @param evt Event.
+     * @return Index event node.
+     */
+    private int gridIdx(Event evt) {
+        return getTestIgniteInstanceIndex((String)evt.node().attributes().get(GRID_NAME_ATTR));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
index 18c0b32..52f19b7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
@@ -112,7 +112,8 @@ public class GridCacheTtlManagerSelfTest extends GridCommonAbstractTest {
                     assertNull(g.cache(DEFAULT_CACHE_NAME).get(key));
 
                     if (!g.internalCache(DEFAULT_CACHE_NAME).context().deferredDelete())
-                        assertNull(g.internalCache(DEFAULT_CACHE_NAME).map().getEntry(g.internalCache(DEFAULT_CACHE_NAME).context().toCacheKeyObject(key)));
+                        assertNull(g.internalCache(DEFAULT_CACHE_NAME).map().getEntry(g.internalCache(DEFAULT_CACHE_NAME).context(),
+                            g.internalCache(DEFAULT_CACHE_NAME).context().toCacheKeyObject(key)));
                 }
             });
         }


Mime
View raw message