ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject [3/5] ignite git commit: IGNITE-2948 - Optimize usage of GridCacheConcurrentMap
Date Tue, 26 Apr 2016 12:07:46 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java
deleted file mode 100644
index 6d18b7d..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import javax.cache.Cache;
-import org.apache.ignite.internal.util.GridSerializableSet;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.lang.IgnitePredicate;
-
-/**
- * Key set based on provided entries with all remove operations backed
- * by underlying cache.
- */
-public class GridCacheKeySet<K, V> extends GridSerializableSet<K> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Cache context. */
-    private final GridCacheContext<K, V> ctx;
-
-    /** Filter. */
-    private final IgnitePredicate<Cache.Entry<K, V>>[] filter;
-
-    /** Base map. */
-    private final Map<K, Cache.Entry<K, V>> map;
-
-    /**
-     * @param ctx Cache context.
-     * @param c Entry collection.
-     * @param filter Filter.
-     */
-    public GridCacheKeySet(GridCacheContext<K, V> ctx, Collection<? extends Cache.Entry<K, V>> c,
-        IgnitePredicate<Cache.Entry<K, V>>[] filter) {
-        map = new HashMap<>();
-
-        assert ctx != null;
-
-        this.ctx = ctx;
-        this.filter = filter == null ? CU.<K, V>empty() : filter;
-
-        for (Cache.Entry<K, V> e : c) {
-            if (e != null)
-                map.put(e.getKey(), e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterator<K> iterator() {
-        return new GridCacheIterator<>(ctx, map.values(), F.<K, V>cacheEntry2Key(), filter);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void clear() {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"SuspiciousMethodCalls"})
-    @Override public boolean remove(Object o) {
-        Cache.Entry<K, V> e = map.get(o);
-
-        if (e == null || !F.isAll(e, filter))
-            return false;
-
-        map.remove(o);
-
-        ctx.grid().cache(ctx.name()).remove(e.getKey());
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        return F.size(map.values(), filter);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"SuspiciousMethodCalls"})
-    @Override public boolean contains(Object o) {
-        Cache.Entry<K, V> e = map.get(o);
-
-        return e != null && F.isAll(e, filter);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 75d96d8..c9ff138 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -3192,7 +3192,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (rmv) {
                 onMarkedObsolete();
 
-                cctx.cache().map().removeEntry(this);
+                cctx.cache().removeEntry(this);
             }
         }
     }
@@ -4229,7 +4229,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (rmv) {
                 onMarkedObsolete();
 
-                cctx.cache().map().removeEntry(this);
+                cctx.cache().removeEntry(this);
             }
         }
 
@@ -4287,18 +4287,32 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             flags |= IS_DELETED_MASK;
 
-            cctx.decrementPublicSize(this);
+            decrementMapPublicSize();
         }
         else {
             assert deletedUnlocked() : this;
 
             flags &= ~IS_DELETED_MASK;
 
-            cctx.incrementPublicSize(this);
+            incrementMapPublicSize();
         }
     }
 
     /**
+     *  Increments public size of map.
+     */
+    protected void incrementMapPublicSize() {
+        cctx.incrementPublicSize(this);
+    }
+
+    /**
+     * Decrements public size of map.
+     */
+    protected void decrementMapPublicSize() {
+        cctx.decrementPublicSize(this);
+    }
+
+    /**
      * @return MVCC.
      */
     @Nullable protected GridCacheMvcc mvccExtras() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 5214078..3d5052b 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -79,7 +79,6 @@ import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProce
 import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridNoStorageCacheMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 84c5d73..a58c209 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -891,7 +891,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<V> values() {
+    @Override public Iterable<V> values() {
         CacheOperationContext prev = gate.enter(opCtx);
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 98b1b59..e7010f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -51,7 +51,6 @@ import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.store.CacheStoreSessionListener;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.GridKernalContext;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
new file mode 100644
index 0000000..2532882
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtOffHeapCacheEntry;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Empty cache map that will never store any entries.
+ */
+public class GridNoStorageCacheMap implements GridCacheConcurrentMap {
+    /** Context. */
+    private final GridCacheContext ctx;
+
+    /**
+     * @param ctx Cache context.
+     */
+    public GridNoStorageCacheMap(GridCacheContext ctx) {
+        this.ctx = ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridCacheMapEntry getEntry(KeyCacheObject key) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, KeyCacheObject key,
+        @Nullable CacheObject val, boolean create, boolean touch) {
+        if (create)
+            return ctx.useOffheapEntry() ?
+                new GridDhtOffHeapCacheEntry(ctx, topVer, key, key.hashCode(), val) :
+                new GridDhtCacheEntry(ctx, topVer, key, key.hashCode(), val);
+        else
+            return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean removeEntry(GridCacheEntryEx entry) {
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int publicSize() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void incrementPublicSize(GridCacheEntryEx e) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void decrementPublicSize(GridCacheEntryEx e) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridCacheMapEntry randomEntry() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<KeyCacheObject> keySet(CacheEntryPredicate... filter) {
+        return Collections.emptySet();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterable<GridCacheMapEntry> entries(CacheEntryPredicate... filter) {
+        return Collections.emptySet();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterable<GridCacheMapEntry> allEntries(CacheEntryPredicate... filter) {
+        return Collections.emptySet();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<GridCacheMapEntry> entrySet(CacheEntryPredicate... filter) {
+        return Collections.emptySet();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index 5294f6a..3dc3471 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -911,7 +911,7 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
      *
      * @return Collection of cached values.
      */
-    public Collection<V> values();
+    public Iterable<V> values();
 
     /**
      * Gets set of all entries cached on this node. You can remove

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java
index c214ce3..ffb846c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java
@@ -30,4 +30,15 @@ public interface KeyCacheObject extends CacheObject {
      * @return {@code True} if internal cache key.
      */
     public boolean internal();
+
+    /**
+     * @return Partition ID for this key or -1 if it is unknown.
+     */
+    public int partition();
+
+    /**
+     * Sets partition ID for this key.
+     * @param part Partition ID.
+     */
+    public void partition(int part);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
index e557c28..35e681c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectTransient;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -27,6 +28,10 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    @GridDirectTransient
+    private int part = -1;
+
     /**
      *
      */
@@ -39,10 +44,30 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
      * @param valBytes Value bytes.
      */
     public KeyCacheObjectImpl(Object val, byte[] valBytes) {
+        this(val, valBytes, -1);
+    }
+
+    /**
+     * @param val Value.
+     * @param valBytes Value bytes.
+     * @param part Partition.
+     */
+    public KeyCacheObjectImpl(Object val, byte[] valBytes, int part) {
         assert val != null;
 
         this.val = val;
         this.valBytes = valBytes;
+        this.part = part;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partition() {
+        return part;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void partition(int part) {
+        this.part = part;
     }
 
     /** {@inheritDoc} */
@@ -96,6 +121,11 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
     }
 
     /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
     @Override public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException {
         if (valBytes == null)
             valBytes = ctx.kernalContext().cacheObjects().marshal(ctx, val);
@@ -119,4 +149,4 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
 
         return val.equals(other.val);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 18e2d09..c043bc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -773,20 +773,36 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
 
     /** {@inheritDoc} */
     @Override public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj) {
+        return toCacheKeyObject(ctx, obj, userObj, -1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public KeyCacheObject toCacheKeyObject(
+        CacheObjectContext ctx,
+        Object obj,
+        boolean userObj,
+        int partition
+    ) {
         if (!((CacheObjectBinaryContext)ctx).binaryEnabled())
-            return super.toCacheKeyObject(ctx, obj, userObj);
+            return super.toCacheKeyObject(ctx, obj, userObj, partition);
+
+        if (obj instanceof KeyCacheObject) {
+            ((KeyCacheObject)obj).partition(partition);
 
-        if (obj instanceof KeyCacheObject)
             return (KeyCacheObject)obj;
+        }
 
         if (((CacheObjectBinaryContext)ctx).binaryEnabled()) {
             obj = toBinary(obj);
 
-            if (obj instanceof KeyCacheObject)
+            if (obj instanceof KeyCacheObject) {
+                ((KeyCacheObject)obj).partition(partition);
+
                 return (KeyCacheObject)obj;
+            }
         }
 
-        return toCacheKeyObject0(obj, userObj);
+        return toCacheKeyObject0(obj, userObj, partition);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 6e97ec5..03f6474 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -405,7 +405,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
 
                         try {
                             if (!locPart.isEmpty()) {
-                                for (GridDhtCacheEntry o : locPart.entries()) {
+                                for (GridCacheEntryEx o : locPart.allEntries()) {
                                     if (!o.obsoleteOrDeleted())
                                         dataLdr.removeDataInternal(o.key());
                                 }
@@ -428,7 +428,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                 if (near != null) {
                     GridCacheVersion obsoleteVer = ctx.versions().next();
 
-                    for (GridCacheEntryEx e : near.map().allEntries0()) {
+                    for (GridCacheEntryEx e : near.allEntries()) {
                         if (!e.valid(topVer) && e.markObsolete(obsoleteVer))
                             near.removeEntry(e);
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
index 5d07b6f..20cd52c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
@@ -78,6 +78,10 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
     @GridDirectCollection(KeyCacheObject.class)
     private List<KeyCacheObject> keys;
 
+    /** Partition IDs of keys to lock. */
+    @GridDirectCollection(int.class)
+    protected List<Integer> partIds;
+
     /** Array indicating whether value should be returned for a key. */
     @GridToStringInclude
     private boolean[] retVals;
@@ -232,7 +236,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
      *
      * @param skipStore Skip store flag.
      */
-    private void skipStore(boolean skipStore){
+    private void skipStore(boolean skipStore) {
         flags = skipStore ? (byte)(flags | SKIP_STORE_FLAG_MASK) : (byte)(flags & ~SKIP_STORE_FLAG_MASK);
     }
 
@@ -284,11 +288,15 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
         boolean retVal,
         GridCacheContext ctx
     ) throws IgniteCheckedException {
-        if (keys == null)
+        if (keys == null) {
             keys = new ArrayList<>(keysCount());
+            partIds = new ArrayList<>(keysCount());
+        }
 
         keys.add(key);
 
+        partIds.add(key.partition());
+
         retVals[idx] = retVal;
 
         idx++;
@@ -325,6 +333,13 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
         finishUnmarshalCacheObjects(keys, cctx, ldr);
+
+        if (partIds != null && !partIds.isEmpty()) {
+            assert partIds.size() == keys.size();
+
+            for (int i = 0; i < keys.size(); i++)
+                keys.get(i).partition(partIds.get(i));
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 2cf7276..2bb6bf3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
index 213a0ff..7854ace 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
@@ -44,6 +44,10 @@ public class GridDistributedUnlockRequest extends GridDistributedBaseMessage {
     @GridDirectCollection(KeyCacheObject.class)
     private List<KeyCacheObject> keys;
 
+    /** Partition IDs. */
+    @GridDirectCollection(int.class)
+    protected List<Integer> partIds;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -75,10 +79,13 @@ public class GridDistributedUnlockRequest extends GridDistributedBaseMessage {
      * @throws IgniteCheckedException If failed.
      */
     public void addKey(KeyCacheObject key, GridCacheContext ctx) throws IgniteCheckedException {
-        if (keys == null)
+        if (keys == null) {
             keys = new ArrayList<>(keysCount());
+            partIds = new ArrayList<>(keysCount());
+        }
 
         keys.add(key);
+        partIds.add(key.partition());
     }
 
     /** {@inheritDoc}
@@ -94,6 +101,13 @@ public class GridDistributedUnlockRequest extends GridDistributedBaseMessage {
         super.finishUnmarshal(ctx, ldr);
 
         finishUnmarshalCacheObjects(keys, ctx.cacheContext(cacheId), ldr);
+
+        if (partIds != null && !partIds.isEmpty()) {
+            assert partIds.size() == keys.size();
+
+            for (int i = 0; i < keys.size(); i++)
+                keys.get(i).partition(partIds.get(i));
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
new file mode 100644
index 0000000..ff95de9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.PartitionedReadOnlySet;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * An implementation of GridCacheConcurrentMap that will delegate all method calls to corresponding local partition.
+ */
+public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap {
+    /** Context. */
+    private final GridCacheContext ctx;
+
+    /**
+     * Constructor.
+     * @param ctx Context.
+     */
+    public GridCachePartitionedConcurrentMap(GridCacheContext ctx) {
+        this.ctx = ctx;
+    }
+
+    /**
+     * @param key Key.
+     * @param topVer Topology version.
+     * @param create Create flag.
+     * @return Local partition.
+     */
+    @Nullable private GridDhtLocalPartition localPartition(
+        KeyCacheObject key,
+        AffinityTopologyVersion topVer,
+        boolean create
+    ) {
+        int p = key.partition();
+
+        if (p == -1)
+            p = ctx.affinity().partition(key);
+
+        return ctx.topology().localPartition(p, topVer, create);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridCacheMapEntry getEntry(KeyCacheObject key) {
+        GridDhtLocalPartition part = localPartition(key, AffinityTopologyVersion.NONE, false);
+
+        if (part == null)
+            return null;
+
+        return part.getEntry(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, KeyCacheObject key,
+        @Nullable CacheObject val, boolean create, boolean touch) {
+        GridDhtLocalPartition part = localPartition(key, topVer, create);
+
+        if (part == null)
+            return null;
+
+        return part.putEntryIfObsoleteOrAbsent(topVer, key, val, create, touch);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        int size = 0;
+
+        for (GridDhtLocalPartition part : ctx.topology().localPartitions())
+            size += part.size();
+
+        return size;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int publicSize() {
+        int size = 0;
+
+        for (GridDhtLocalPartition part : ctx.topology().localPartitions())
+            size += part.publicSize();
+
+        return size;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void incrementPublicSize(GridCacheEntryEx e) {
+        localPartition(e.key(), AffinityTopologyVersion.NONE, true).incrementPublicSize(e);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void decrementPublicSize(GridCacheEntryEx e) {
+        localPartition(e.key(), AffinityTopologyVersion.NONE, true).decrementPublicSize(e);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean removeEntry(GridCacheEntryEx entry) {
+        GridDhtLocalPartition part = localPartition(entry.key(), AffinityTopologyVersion.NONE, false);
+
+        if (part == null)
+            return false;
+
+        return part.removeEntry(entry);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridCacheMapEntry randomEntry() {
+        return entries().iterator().next();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<KeyCacheObject> keySet(CacheEntryPredicate... filter) {
+        Collection<Set<KeyCacheObject>> sets = new ArrayList<>();
+
+        for (GridDhtLocalPartition partition : ctx.topology().localPartitions())
+            sets.add(partition.keySet(filter));
+
+        return new PartitionedReadOnlySet<>(sets);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterable<GridCacheMapEntry> entries(final CacheEntryPredicate... filter) {
+        return new Iterable<GridCacheMapEntry>() {
+            @Override public Iterator<GridCacheMapEntry> iterator() {
+                List<Iterator<GridCacheMapEntry>> iterators = new ArrayList<>();
+
+                for (GridDhtLocalPartition partition : ctx.topology().localPartitions())
+                    iterators.add(partition.entries(filter).iterator());
+
+                return F.flatIterators(iterators);
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterable<GridCacheMapEntry> allEntries(final CacheEntryPredicate... filter) {
+        return new Iterable<GridCacheMapEntry>() {
+            @Override public Iterator<GridCacheMapEntry> iterator() {
+                List<Iterator<GridCacheMapEntry>> iterators = new ArrayList<>();
+
+                for (GridDhtLocalPartition partition : ctx.topology().localPartitions())
+                    iterators.add(partition.allEntries(filter).iterator());
+
+                return F.flatIterators(iterators);
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<GridCacheMapEntry> entrySet(CacheEntryPredicate... filter) {
+        Collection<Set<GridCacheMapEntry>> sets = new ArrayList<>();
+
+        for (GridDhtLocalPartition partition : ctx.topology().localPartitions())
+            sets.add(partition.entrySet(filter));
+
+        return new PartitionedReadOnlySet<>(sets);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCachePartitionedConcurrentMap.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 3761d77..4635cad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -357,13 +357,6 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtLocalPartition onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry e) {
-        assert false : "Entry should not be added to client topology: " + e;
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
     @Override public void onRemoved(GridDhtCacheEntry e) {
         assert false : "Entry should not be removed from client topology: " + e;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index faa980e..5ff674f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -158,9 +158,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param ctx Context.
      */
     protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) {
-        super(ctx, ctx.config().getStartSize());
-
-        top = new GridDhtPartitionTopologyImpl(ctx);
+        this(ctx, new GridCachePartitionedConcurrentMap(ctx));
     }
 
     /**
@@ -171,27 +169,13 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      */
     protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) {
         super(ctx, map);
-
-        top = new GridDhtPartitionTopologyImpl(ctx);
     }
 
     /** {@inheritDoc} */
     @Override protected void init() {
-        map.setEntryFactory(new GridCacheMapEntryFactory() {
-            /** {@inheritDoc} */
-            @Override public GridCacheMapEntry create(
-                GridCacheContext ctx,
-                AffinityTopologyVersion topVer,
-                KeyCacheObject key,
-                int hash,
-                CacheObject val
-            ) {
-                if (ctx.useOffheapEntry())
-                    return new GridDhtOffHeapCacheEntry(ctx, topVer, key, hash, val);
+        super.init();
 
-                return new GridDhtCacheEntry(ctx, topVer, key, hash, val);
-            }
-        });
+        top = new GridDhtPartitionTopologyImpl(ctx, entryFactory());
     }
 
     /** {@inheritDoc} */
@@ -252,6 +236,26 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     }
 
     /**
+     * @return Cache map entry factory.
+     */
+    protected GridCacheMapEntryFactory entryFactory() {
+        return new GridCacheMapEntryFactory() {
+            @Override public GridCacheMapEntry create(
+                GridCacheContext ctx,
+                AffinityTopologyVersion topVer,
+                KeyCacheObject key,
+                int hash,
+                CacheObject val
+            ) {
+                if (ctx.useOffheapEntry())
+                    return new GridDhtOffHeapCacheEntry(ctx, topVer, key, hash, val);
+
+                return new GridDhtCacheEntry(ctx, topVer, key, hash, val);
+            }
+        };
+    }
+
+    /**
      * @return Near cache.
      */
     public abstract GridNearCacheAdapter<K, V> near();
@@ -404,7 +408,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      *
      * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid.
      */
-    @Override public GridCacheEntryEx entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException {
+    @Override public GridCacheEntryEx entryEx(KeyCacheObject key,
+        AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException {
         return super.entryEx(key, topVer);
     }
 
@@ -423,7 +428,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @return DHT entry.
      * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid.
      */
-    public GridDhtCacheEntry entryExx(KeyCacheObject key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException {
+    public GridDhtCacheEntry entryExx(KeyCacheObject key,
+        AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException {
         return (GridDhtCacheEntry)entryEx(key, topVer);
     }
 
@@ -640,7 +646,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         @Nullable IgniteCacheExpiryPolicy expiry,
         boolean skipVals,
         boolean canRemap
-        ) {
+    ) {
         return getAllAsync0(keys,
             readThrough,
             /*don't check local tx. */false,
@@ -1084,7 +1090,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             final GridDhtLocalPartition part = ctx.topology().localPartition(partId,
                 ctx.discovery().topologyVersionEx(), false);
 
-            Iterator<GridDhtCacheEntry> partIt = part == null ? null : part.entries().iterator();
+            Iterator<GridCacheMapEntry> partIt = part == null ? null : part.entries().iterator();
 
             return new PartitionEntryIterator(partIt);
         }
@@ -1152,7 +1158,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     }
 
     /** {@inheritDoc} */
-    @Override public List<GridCacheClearAllRunnable<K, V>> splitClearLocally(boolean srv, boolean near, boolean readers) {
+    @Override public List<GridCacheClearAllRunnable<K, V>> splitClearLocally(boolean srv, boolean near,
+        boolean readers) {
         return ctx.affinityNode() ? super.splitClearLocally(srv, near, readers) :
             Collections.<GridCacheClearAllRunnable<K, V>>emptyList();
     }
@@ -1164,8 +1171,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         GridDhtLocalPartition part = topology().localPartition(entry.partition(), AffinityTopologyVersion.NONE,
             false);
 
-        // Do not remove entry on replica topology. Instead, add entry to removal queue.
-        // It will be cleared eventually.
         if (part != null) {
             try {
                 part.onDeferredDelete(entry.key(), ver);
@@ -1211,16 +1216,16 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         assert primary || backup;
 
         if (primary && backup)
-            return iterator(map.entries0().iterator(), !ctx.keepBinary());
+            return iterator(entries().iterator(), !ctx.keepBinary());
         else {
             final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
             final Iterator<GridDhtLocalPartition> partIt = topology().currentLocalPartitions().iterator();
 
-            Iterator<GridCacheEntryEx> it = new Iterator<GridCacheEntryEx>() {
-                private GridCacheEntryEx next;
+            Iterator<GridCacheMapEntry> it = new Iterator<GridCacheMapEntry>() {
+                private GridCacheMapEntry next;
 
-                private Iterator<GridDhtCacheEntry> curIt;
+                private Iterator<GridCacheMapEntry> curIt;
 
                 {
                     advance();
@@ -1230,11 +1235,11 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                     return next != null;
                 }
 
-                @Override public GridCacheEntryEx next() {
+                @Override public GridCacheMapEntry next() {
                     if (next == null)
                         throw new NoSuchElementException();
 
-                    GridCacheEntryEx e = next;
+                    GridCacheMapEntry e = next;
 
                     advance();
 
@@ -1293,12 +1298,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         private Cache.Entry<K, V> last;
 
         /** Partition iterator. */
-        private final Iterator<GridDhtCacheEntry> partIt;
+        private final Iterator<GridCacheMapEntry> partIt;
 
         /**
          * @param partIt Partition iterator.
          */
-        private PartitionEntryIterator(@Nullable Iterator<GridDhtCacheEntry> partIt) {
+        private PartitionEntryIterator(@Nullable Iterator<GridCacheMapEntry> partIt) {
             this.partIt = partIt;
 
             advance();
@@ -1335,9 +1340,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         private void advance() {
             if (partIt != null) {
                 while (partIt.hasNext()) {
-                    GridDhtCacheEntry next = partIt.next();
+                    GridCacheEntryEx next = partIt.next();
 
-                    if (next.isInternal() || !next.visitable(CU.empty0()))
+                    if (next instanceof GridCacheMapEntry && (!((GridCacheMapEntry)next).visitable(CU.empty0())))
                         continue;
 
                     entry = next.wrapLazyValue();

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index ab51bdb..95ef10b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -88,7 +88,11 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
         super(ctx, key, hash, val);
 
         // Record this entry with partition.
-        locPart = ctx.dht().topology().onAdded(topVer, this);
+        int p = cctx.affinity().partition(key);
+
+        locPart = ctx.topology().localPartition(p, topVer, true);
+
+        assert locPart != null;
     }
 
     /** {@inheritDoc} */
@@ -182,8 +186,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
         boolean reenter,
         boolean tx,
         boolean implicitSingle)
-        throws GridCacheEntryRemovedException, GridDistributedLockCancelledException
-    {
+        throws GridCacheEntryRemovedException, GridDistributedLockCancelledException {
         assert serReadVer == null || serOrder != null;
         assert !reenter || serOrder == null;
 
@@ -331,7 +334,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
      * @throws GridCacheEntryRemovedException If entry has been removed.
      */
     @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext"})
-    @Nullable public synchronized IgniteBiTuple<GridCacheVersion, CacheObject> versionedValue(AffinityTopologyVersion topVer)
+    @Nullable public synchronized IgniteBiTuple<GridCacheVersion, CacheObject> versionedValue(
+        AffinityTopologyVersion topVer)
         throws GridCacheEntryRemovedException {
         if (isNew() || !valid(AffinityTopologyVersion.NONE) || deletedUnlocked())
             return null;
@@ -599,7 +603,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
         }
         finally {
             if (rmv)
-                cctx.cache().removeIfObsolete(key); // Clear cache.
+                cctx.cache().removeEntry(this); // Clear cache.
         }
     }
 
@@ -713,6 +717,16 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
         return S.toString(GridDhtCacheEntry.class, this, "super", super.toString());
     }
 
+    /** {@inheritDoc} */
+    @Override protected void incrementMapPublicSize() {
+        locPart.incrementPublicSize(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void decrementMapPublicSize() {
+        locPart.decrementPublicSize(this);
+    }
+
     /**
      * Reader ID.
      */
@@ -793,7 +807,6 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
             return txFut;
         }
 
-
         /** {@inheritDoc} */
         @Override public boolean equals(Object o) {
             if (this == o)

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 4fc1eaf..3ac4a41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -17,12 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -33,8 +31,15 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
+import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
+import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
@@ -53,8 +58,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
-import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.LongAdder8;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED;
@@ -66,7 +70,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 /**
  * Key partition.
  */
-public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, GridReservable {
+public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, GridReservable, GridCacheConcurrentMap {
     /** Maximum size for delete queue. */
     public static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE,
         200_000);
@@ -89,7 +93,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     private final GridFutureAdapter<?> rent;
 
     /** Entries map. */
-    private final ConcurrentMap<KeyCacheObject, GridDhtCacheEntry> map;
+    private final GridCacheConcurrentMap map;
 
     /** Context. */
     private final GridCacheContext cctx;
@@ -104,9 +108,6 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     /** Lock. */
     private final ReentrantLock lock = new ReentrantLock();
 
-    /** Public size counter. */
-    private final LongAdder8 mapPubSize = new LongAdder8();
-
     /** Remove queue. */
     private final GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue;
 
@@ -120,8 +121,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      * @param cctx Context.
      * @param id Partition ID.
      */
-    @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
-    GridDhtLocalPartition(GridCacheContext cctx, int id) {
+    @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") GridDhtLocalPartition(GridCacheContext cctx, int id,
+        GridCacheMapEntryFactory entryFactory) {
         assert cctx != null;
 
         this.id = id;
@@ -135,8 +136,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
             }
         };
 
-        map = new ConcurrentHashMap8<>(cctx.config().getStartSize() /
-            cctx.affinity().partitions());
+        map = new GridCacheConcurrentMapImpl(cctx, entryFactory, cctx.config().getStartSize() / cctx.affinity().partitions());
 
         int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 :
             Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20);
@@ -202,45 +202,30 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     }
 
     /**
-     * @return Entries belonging to partition.
-     */
-    public Collection<GridDhtCacheEntry> entries() {
-        return map.values();
-    }
-
-    /**
      * @return {@code True} if partition is empty.
      */
     public boolean isEmpty() {
-        return map.isEmpty();
+        return map.size() == 0;
     }
 
-    /**
-     * @return Number of entries in this partition (constant-time method).
-     */
-    public int size() {
+    /** {@inheritDoc} */
+    @Override public int size() {
         return map.size();
     }
 
-    /**
-     * Increments public size of the map.
-     */
-    public void incrementPublicSize() {
-        mapPubSize.increment();
+    /** {@inheritDoc} */
+    @Override public int publicSize() {
+        return map.publicSize();
     }
 
-    /**
-     * Decrements public size of the map.
-     */
-    public void decrementPublicSize() {
-        mapPubSize.decrement();
+    /** {@inheritDoc} */
+    @Override public void incrementPublicSize(GridCacheEntryEx e) {
+        map.incrementPublicSize(e);
     }
 
-    /**
-     * @return Number of public (non-internal) entries in this partition.
-     */
-    public int publicSize() {
-        return mapPubSize.intValue();
+    /** {@inheritDoc} */
+    @Override public void decrementPublicSize(GridCacheEntryEx e) {
+        map.decrementPublicSize(e);
     }
 
     /**
@@ -252,39 +237,57 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
         return state == MOVING || state == OWNING || state == RENTING;
     }
 
-    /**
-     * @param entry Entry to add.
-     */
-    void onAdded(GridDhtCacheEntry entry) {
-        GridDhtPartitionState state = state();
+    /** {@inheritDoc} */
+    @Override @Nullable public GridCacheMapEntry getEntry(KeyCacheObject key) {
+        return map.getEntry(key);
+    }
 
-        if (state == EVICTED)
-            throw new GridDhtInvalidPartitionException(id, "Adding entry to invalid partition " +
-                "(often may be caused by inconsistent 'key.hashCode()' implementation) [part=" + id + ']');
+    /** {@inheritDoc} */
+    @Override public boolean removeEntry(GridCacheEntryEx entry) {
+        return map.removeEntry(entry);
+    }
 
-        map.put(entry.key(), entry);
+    /** {@inheritDoc} */
+    @Override public Iterable<GridCacheMapEntry> entries(
+        CacheEntryPredicate... filter) {
+        return map.entries(filter);
+    }
 
-        if (!entry.isInternal()) {
-            assert !entry.deleted() : entry;
+    /** {@inheritDoc} */
+    @Override public Iterable<GridCacheMapEntry> allEntries(CacheEntryPredicate... filter) {
+        return map.allEntries(filter);
+    }
 
-            mapPubSize.increment();
-        }
+    /** {@inheritDoc} */
+    @Override public Set<GridCacheMapEntry> entrySet(CacheEntryPredicate... filter) {
+        return map.entrySet(filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public GridCacheMapEntry randomEntry() {
+        return map.randomEntry();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(
+        AffinityTopologyVersion topVer, KeyCacheObject key,
+        @Nullable CacheObject val, boolean create, boolean touch) {
+        return map.putEntryIfObsoleteOrAbsent(topVer, key, val, create, touch);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<KeyCacheObject> keySet(CacheEntryPredicate... filter) {
+        return map.keySet(filter);
     }
 
     /**
      * @param entry Entry to remove.
      */
-    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
     void onRemoved(GridDhtCacheEntry entry) {
         assert entry.obsolete() : entry;
 
         // Make sure to remove exactly this entry.
-        synchronized (entry) {
-            map.remove(entry.key(), entry);
-
-            if (!entry.isInternal() && !entry.deleted())
-                mapPubSize.decrement();
-        }
+        map.removeEntry(entry);
 
         // Attempt to evict.
         tryEvict();
@@ -312,7 +315,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     /**
      * Locks partition.
      */
-    @SuppressWarnings( {"LockAcquiredButNotSafelyReleased"})
+    @SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
     public void lock() {
         lock.lock();
     }
@@ -338,11 +341,11 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
         Map<KeyCacheObject, GridCacheVersion> evictHist0 = evictHist;
 
-        if (evictHist0 != null ) {
+        if (evictHist0 != null) {
             GridCacheVersion ver0 = evictHist0.get(key);
 
             if (ver0 == null || ver0.isLess(ver)) {
-                GridCacheVersion ver1  = evictHist0.put(key, ver);
+                GridCacheVersion ver1 = evictHist0.put(key, ver);
 
                 assert ver1 == ver0;
             }
@@ -366,7 +369,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
         Map<KeyCacheObject, GridCacheVersion> evictHist0 = evictHist;
 
-        if (evictHist0 != null)  {
+        if (evictHist0 != null) {
             GridCacheVersion ver0 = evictHist0.get(key);
 
             // Permit preloading if version in history
@@ -489,7 +492,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
         int ord = (int)(reservations >> 32);
 
-        if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) &&
+        if (isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) &&
             ord == RENTING.ordinal() && (reservations & 0xFFFF) == 0 &&
             casState(reservations, EVICTED)) {
             if (log.isDebugEnabled())
@@ -538,7 +541,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
         // Attempt to evict partition entries from cache.
         clearAll();
 
-        if (map.isEmpty() && casState(reservations, EVICTED)) {
+        if (isEmpty() && casState(reservations, EVICTED)) {
             if (log.isDebugEnabled())
                 log.debug("Evicted partition: " + this);
 
@@ -655,7 +658,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
         boolean rec = cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
 
-        Iterator<GridDhtCacheEntry> it = map.values().iterator();
+        Iterator<GridDhtCacheEntry> it = (Iterator)map.allEntries().iterator();
 
         GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> swapIt = null;
 
@@ -684,11 +687,9 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
                     cached = it.next();
 
                     if (cached.clearInternal(clearVer, swap, extras)) {
-                        map.remove(cached.key(), cached);
+                        map.removeEntry(cached);
 
                         if (!cached.isInternal()) {
-                            mapPubSize.decrement();
-
                             if (rec) {
                                 cctx.events().addEvent(cached.partition(),
                                     cached.key(),
@@ -709,7 +710,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
                     }
                 }
                 catch (GridDhtInvalidPartitionException e) {
-                    assert map.isEmpty() && state() == EVICTED: "Invalid error [e=" + e + ", part=" + this + ']';
+                    assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']';
                     assert swapEmpty() : "Invalid error when swap is not cleared [e=" + e + ", part=" + this + ']';
 
                     break; // Partition is already concurrently cleared and evicted.
@@ -789,7 +790,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
             }
 
             @Override public void remove() {
-                map.remove(lastEntry.key(), lastEntry);
+                map.removeEntry(lastEntry);
             }
         };
     }
@@ -811,7 +812,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings( {"OverlyStrongTypeCast"})
+    @SuppressWarnings({"OverlyStrongTypeCast"})
     @Override public boolean equals(Object obj) {
         return obj instanceof GridDhtLocalPartition && (obj == this || ((GridDhtLocalPartition)obj).id() == id);
     }
@@ -829,8 +830,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
         return S.toString(GridDhtLocalPartition.class, this,
             "state", state(),
             "reservations", reservations(),
-            "empty", map.isEmpty(),
-            "createTime", U.format(createTime),
-            "mapPubSize", mapPubSize);
+            "empty", isEmpty(),
+            "createTime", U.format(createTime));
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index 50167d8..95c6dfc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@ -419,6 +419,12 @@ public class GridDhtLockRequest extends GridDistributedLockRequest {
 
                 writer.incrementState();
 
+            case 30:
+                if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -515,6 +521,14 @@ public class GridDhtLockRequest extends GridDistributedLockRequest {
 
                 reader.incrementState();
 
+            case 30:
+                partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridDhtLockRequest.class);
@@ -527,7 +541,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 30;
+        return 31;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 7fba45d..2c483a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -195,13 +195,6 @@ public interface GridDhtPartitionTopology {
     public GridDhtPartitionFullMap partitionMap(boolean onlyActive);
 
     /**
-     * @param topVer Topology version.
-     * @param e Entry added to cache.
-     * @return Local partition.
-     */
-    public GridDhtLocalPartition onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry e);
-
-    /**
      * @param e Entry removed from cache.
      */
     public void onRemoved(GridDhtCacheEntry e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index f0ce6d1..9f5fbfb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
@@ -37,6 +36,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
@@ -50,7 +50,6 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
@@ -76,8 +75,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private final IgniteLogger log;
 
     /** */
-    private final ConcurrentMap<Integer, GridDhtLocalPartition> locParts =
-        new ConcurrentHashMap8<>();
+    private final GridDhtLocalPartition[] locParts;
 
     /** Node to partition map. */
     private GridDhtPartitionFullMap node2part;
@@ -103,6 +101,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** Lock. */
     private final StripedCompositeReadWriteLock lock = new StripedCompositeReadWriteLock(16);
 
+    /** */
+    private final GridCacheMapEntryFactory entryFactory;
+
     /** Partition update counter. */
     private Map<Integer, Long> cntrMap = new HashMap<>();
 
@@ -112,12 +113,15 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /**
      * @param cctx Context.
      */
-    GridDhtPartitionTopologyImpl(GridCacheContext<?, ?> cctx) {
+    GridDhtPartitionTopologyImpl(GridCacheContext<?, ?> cctx, GridCacheMapEntryFactory entryFactory) {
         assert cctx != null;
 
         this.cctx = cctx;
+        this.entryFactory = entryFactory;
 
         log = cctx.logger(getClass());
+
+        locParts = new GridDhtLocalPartition[cctx.config().getAffinity().partitions()];
     }
 
     /**
@@ -149,7 +153,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /**
      * @return Full map string representation.
      */
-    @SuppressWarnings( {"ConstantConditions"})
+    @SuppressWarnings({"ConstantConditions"})
     private String fullMapString() {
         return node2part == null ? "null" : FULL_MAP_DEBUG ? node2part.toFullString() : node2part.toString();
     }
@@ -158,7 +162,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param map Map to get string for.
      * @return Full map string representation.
      */
-    @SuppressWarnings( {"ConstantConditions"})
+    @SuppressWarnings({"ConstantConditions"})
     private String mapString(GridDhtPartitionMap2 map) {
         return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : map.toString();
     }
@@ -172,34 +176,65 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private boolean waitForRent() throws IgniteCheckedException {
         boolean changed = false;
 
-        // Synchronously wait for all renting partitions to complete.
-        for (Iterator<GridDhtLocalPartition> it = locParts.values().iterator(); it.hasNext();) {
-            GridDhtLocalPartition p = it.next();
+        GridDhtLocalPartition[] locPartsCopy = new GridDhtLocalPartition[locParts.length];
 
-            GridDhtPartitionState state = p.state();
+        lock.readLock().lock();
+
+        try {
+            for (int i = 0; i < locParts.length; i++)
+                locPartsCopy[i] = locParts[i];
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+
+        GridDhtLocalPartition part;
+
+        for (int i = 0; i < locPartsCopy.length; i++) {
+            part = locPartsCopy[i];
+
+            if (part == null)
+                continue;
+
+            GridDhtPartitionState state = part.state();
 
             if (state == RENTING || state == EVICTED) {
                 if (log.isDebugEnabled())
-                    log.debug("Waiting for renting partition: " + p);
+                    log.debug("Waiting for renting partition: " + part);
 
                 // Wait for partition to empty out.
-                p.rent(true).get();
+                part.rent(true).get();
 
                 if (log.isDebugEnabled())
-                    log.debug("Finished waiting for renting partition: " + p);
+                    log.debug("Finished waiting for renting partition: " + part);
+            }
+        }
 
-                // Remove evicted partition.
-                it.remove();
+        // Remove evicted partition.
+        lock.writeLock().lock();
 
-                changed = true;
+        try {
+            for (int i = 0; i < locParts.length; i++) {
+                part = locParts[i];
+
+                if (part == null)
+                    continue;
+
+                if (part.state() == EVICTED) {
+                    locParts[i] = null;
+                    changed = true;
+                }
             }
         }
+        finally {
+            lock.writeLock().unlock();
+        }
 
         return changed;
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings( {"LockAcquiredButNotSafelyReleased"})
+    @SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
     @Override public void readLock() {
         lock.readLock().lock();
     }
@@ -267,7 +302,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void initPartitions(GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException {
+    @Override public void initPartitions(
+        GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException {
         U.writeLock(lock);
 
         try {
@@ -300,12 +336,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         assert topVer.equals(exchFut.topologyVersion()) :
             "Invalid topology [topVer=" + topVer +
-            ", cache=" + cctx.name() +
-            ", futVer=" + exchFut.topologyVersion() + ']';
+                ", cache=" + cctx.name() +
+                ", futVer=" + exchFut.topologyVersion() + ']';
         assert cctx.affinity().affinityTopologyVersion().equals(exchFut.topologyVersion()) :
             "Invalid affinity [topVer=" + cctx.affinity().affinityTopologyVersion() +
-            ", cache=" + cctx.name()+
-            ", futVer=" + exchFut.topologyVersion() + ']';
+                ", cache=" + cctx.name() +
+                ", futVer=" + exchFut.topologyVersion() + ']';
 
         List<List<ClusterNode>> aff = cctx.affinity().assignments(exchFut.topologyVersion());
 
@@ -587,7 +623,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create)
+    @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer,
+        boolean create)
         throws GridDhtInvalidPartitionException {
         return localPartition(p, topVer, create, true);
     }
@@ -597,21 +634,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @return Partition.
      */
     private GridDhtLocalPartition createPartition(int p) {
-        GridDhtLocalPartition loc = locParts.get(p);
-
-        if (loc != null && loc.state() == EVICTED) {
-            boolean rmv = locParts.remove(p, loc);
+        assert lock.isWriteLockedByCurrentThread();
 
-            assert rmv;
-
-            loc = null;
-        }
+        GridDhtLocalPartition loc = locParts[p];
 
-        if (loc == null) {
-            GridDhtLocalPartition old = locParts.putIfAbsent(p, loc = new GridDhtLocalPartition(cctx, p));
-
-            assert old == null : old;
-        }
+        if (loc == null || loc.state() == EVICTED)
+            locParts[p] = loc = new GridDhtLocalPartition(cctx, p, entryFactory);
 
         return loc;
     }
@@ -627,54 +655,59 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         AffinityTopologyVersion topVer,
         boolean create,
         boolean updateSeq) {
-        boolean belongs = create && cctx.affinity().localNode(p, topVer);
+        GridDhtLocalPartition loc;
 
-        while (true) {
-            GridDhtLocalPartition loc = locParts.get(p);
+        lock.readLock().lock();
 
-            if (loc != null && loc.state() == EVICTED) {
-                locParts.remove(p, loc);
+        try {
+            loc = locParts[p];
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+
+        if (loc != null && loc.state() != EVICTED)
+            return loc;
+
+        if (!create)
+            return null;
+
+        lock.writeLock().lock();
+
+        try {
+            loc = locParts[p];
+
+            boolean belongs = cctx.affinity().localNode(p, topVer);
 
-                if (!create)
-                    return null;
+            if (loc != null && loc.state() == EVICTED) {
+                locParts[p] = loc = null;
 
                 if (!belongs)
                     throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " +
                         "(often may be caused by inconsistent 'key.hashCode()' implementation) " +
                         "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
-
-                continue;
             }
 
-            if (loc == null && create) {
+            if (loc == null) {
                 if (!belongs)
                     throw new GridDhtInvalidPartitionException(p, "Creating partition which does not belong to " +
                         "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " +
                         "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
 
-                lock.writeLock().lock();
-
-                try {
-                    GridDhtLocalPartition old = locParts.putIfAbsent(p,
-                        loc = new GridDhtLocalPartition(cctx, p));
+                locParts[p] = loc = new GridDhtLocalPartition(cctx, p, entryFactory);
 
-                    if (old != null)
-                        loc = old;
-                    else {
-                        if (updateSeq)
-                            this.updateSeq.incrementAndGet();
+                if (updateSeq)
+                    this.updateSeq.incrementAndGet();
 
-                        if (log.isDebugEnabled())
-                            log.debug("Created local partition: " + loc);
-                    }
-                }
-                finally {
-                    lock.writeLock().unlock();
-                }
+                if (log.isDebugEnabled())
+                    log.debug("Created local partition: " + loc);
             }
-
-            return loc;
         }
+        finally {
+            lock.writeLock().unlock();
+        }
+
+        return loc;
     }
 
     /** {@inheritDoc} */
@@ -682,8 +715,20 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         assert parts != null;
         assert parts.length > 0;
 
+        GridDhtLocalPartition[] locPartsCopy = new GridDhtLocalPartition[parts.length];
+
+        lock.readLock().lock();
+
+        try {
+            for (int i = 0; i < parts.length; i++)
+                locPartsCopy[i] = locParts[parts[i]];
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+
         for (int i = 0; i < parts.length; i++)
-            locParts.get(parts[i]).release();
+            locPartsCopy[i].release();
     }
 
     /** {@inheritDoc} */
@@ -693,31 +738,28 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public List<GridDhtLocalPartition> localPartitions() {
-        return new LinkedList<>(locParts.values());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<GridDhtLocalPartition> currentLocalPartitions() {
-        return locParts.values();
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridDhtLocalPartition onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry e) {
-        /*
-         * Make sure not to acquire any locks here as this method
-         * may be called from sensitive synchronization blocks.
-         * ===================================================
-         */
+        LinkedList<GridDhtLocalPartition> list = new LinkedList<>();
 
-        int p = cctx.affinity().partition(e.key());
+        lock.readLock().lock();
 
-        GridDhtLocalPartition loc = localPartition(p, topVer, true);
+        try {
+            for (int i = 0; i < locParts.length; i++) {
+                GridDhtLocalPartition part = locParts[i];
 
-        assert loc != null;
+                if (part != null)
+                    list.add(part);
+            }
 
-        loc.onAdded(e);
+            return list;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
 
-        return loc;
+    /** {@inheritDoc} */
+    @Override public Collection<GridDhtLocalPartition> currentLocalPartitions() {
+        return localPartitions();
     }
 
     /** {@inheritDoc} */
@@ -736,11 +778,22 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public GridDhtPartitionMap2 localPartitionMap() {
+        Map<Integer, GridDhtPartitionState> map = new HashMap<>();
+
         lock.readLock().lock();
 
         try {
+            for (int i = 0; i < locParts.length; i++) {
+                GridDhtLocalPartition part = locParts[i];
+
+                if (part == null)
+                    continue;
+
+                map.put(i, part.state());
+            }
+
             return new GridDhtPartitionMap2(cctx.nodeId(), updateSeq.get(), topVer,
-                F.viewReadOnly(locParts, CU.part2state()), true);
+                Collections.unmodifiableMap(map), true);
         }
         finally {
             lock.readLock().unlock();
@@ -943,7 +996,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         this.cntrMap.put(e.getKey(), e.getValue());
                 }
 
-                for (GridDhtLocalPartition part : locParts.values()) {
+                for (int i = 0; i < locParts.length; i++) {
+                    GridDhtLocalPartition part = locParts[i];
+
+                    if (part == null)
+                        continue;
+
                     Long cntr = cntrMap.get(part.id());
 
                     if (cntr != null)
@@ -992,7 +1050,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     }
                 }
 
-                for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext();) {
+                for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) {
                     UUID nodeId = it.next();
 
                     if (!cctx.discovery().alive(nodeId)) {
@@ -1078,7 +1136,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         this.cntrMap.put(e.getKey(), e.getValue());
                 }
 
-                for (GridDhtLocalPartition part : locParts.values()) {
+                for (int i = 0; i < locParts.length; i++) {
+                    GridDhtLocalPartition part = locParts[i];
+
+                    if (part == null)
+                        continue;
+
                     Long cntr = cntrMap.get(part.id());
 
                     if (cntr != null)
@@ -1178,12 +1241,15 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         UUID locId = cctx.nodeId();
 
-        for (GridDhtLocalPartition part : locParts.values()) {
+        for (int p = 0; p < locParts.length; p++) {
+            GridDhtLocalPartition part = locParts[p];
+
+            if (part == null)
+                continue;
+
             GridDhtPartitionState state = part.state();
 
             if (state.active()) {
-                int p = part.id();
-
                 List<ClusterNode> affNodes = aff.get(p);
 
                 if (!affNodes.contains(cctx.localNode())) {
@@ -1403,7 +1469,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         try {
             Map<Integer, Long> res = new HashMap<>(cntrMap);
 
-            for (GridDhtLocalPartition part : locParts.values()) {
+            for (int i = 0; i < locParts.length; i++) {
+                GridDhtLocalPartition part = locParts[i];
+
+                if (part == null)
+                    continue;
+
                 Long cntr0 = res.get(part.id());
                 Long cntr1 = part.updateCounter();
 
@@ -1429,11 +1500,23 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     @Override public void printMemoryStats(int threshold) {
         X.println(">>>  Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']');
 
-        for (GridDhtLocalPartition part : locParts.values()) {
-            int size = part.size();
+        lock.readLock().lock();
+
+        try {
+            for (int i = 0; i < locParts.length; i++) {
+                GridDhtLocalPartition part = locParts[i];
+
+                if (part == null)
+                    continue;
+
+                int size = part.size();
 
-            if (size >= threshold)
-                X.println(">>>   Local partition [part=" + part.id() + ", size=" + size + ']');
+                if (size >= threshold)
+                    X.println(">>>   Local partition [part=" + part.id() + ", size=" + size + ']');
+            }
+        }
+        finally {
+            lock.readLock().unlock();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3be3d16b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
index 38152a7..1d067da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
@@ -120,6 +120,11 @@ public class GridDhtUnlockRequest extends GridDistributedUnlockRequest {
 
                 writer.incrementState();
 
+            case 9:
+                if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -144,6 +149,14 @@ public class GridDhtUnlockRequest extends GridDistributedUnlockRequest {
 
                 reader.incrementState();
 
+            case 9:
+                partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridDhtUnlockRequest.class);
@@ -156,6 +169,6 @@ public class GridDhtUnlockRequest extends GridDistributedUnlockRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 9;
+        return 10;
     }
 }


Mime
View raw message