ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [5/7] ignite git commit: ignite-3300 Fixed issue with partition value changing stored in KeyCacheObject. (cherry picked from commit a441bb9)
Date Wed, 27 Jul 2016 07:19:34 GMT
ignite-3300 Fixed issue with partition value changing stored in KeyCacheObject.
(cherry picked from commit a441bb9)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9d07e3e7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9d07e3e7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9d07e3e7

Branch: refs/heads/master
Commit: 9d07e3e7a34d1cbe67a9656630bc6215cd213f0a
Parents: f3e4f78
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Jul 27 09:19:12 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Jul 27 10:04:28 2016 +0300

----------------------------------------------------------------------
 .../internal/binary/BinaryObjectImpl.java       |  11 +
 .../affinity/GridAffinityProcessor.java         | 117 +++++---
 .../processors/cache/CacheObjectContext.java    |  12 +
 .../processors/cache/GridCacheAdapter.java      |   8 +-
 .../cache/GridCacheAffinityManager.java         |  24 +-
 .../processors/cache/GridCacheContext.java      |  26 +-
 .../processors/cache/GridCacheEntryInfo.java    |   2 +-
 .../processors/cache/KeyCacheObject.java        |   7 +
 .../processors/cache/KeyCacheObjectImpl.java    |   8 +
 .../cache/binary/CacheObjectBinaryContext.java  |  10 +-
 .../binary/CacheObjectBinaryProcessorImpl.java  |  38 ++-
 .../dht/atomic/GridDhtAtomicCache.java          |   6 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   4 +-
 .../cacheobject/IgniteCacheObjectProcessor.java |  15 +-
 .../IgniteCacheObjectProcessorImpl.java         |  69 ++++-
 .../datastreamer/DataStreamerImpl.java          |   6 +-
 .../affinity/AffinityClientNodeSelfTest.java    |  73 ++++-
 .../binary/BinaryMarshallerSelfTest.java        |   7 +-
 .../BinaryObjectBuilderAdditionalSelfTest.java  |  46 ++-
 .../cache/CacheGetEntryAbstractTest.java        |   2 +-
 .../expiry/IgniteCacheTtlCleanupSelfTest.java   |   2 +-
 .../CacheBinaryKeyConcurrentQueryTest.java      | 298 +++++++++++++++++++
 .../IgniteCacheWithIndexingTestSuite.java       |   2 +
 23 files changed, 625 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
index ae110f1..7b42c03 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
@@ -98,6 +98,17 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
     }
 
     /** {@inheritDoc} */
+    @Override public KeyCacheObject copy(int part) {
+        if (this.part == part)
+            return this;
+
+        BinaryObjectImpl cp = new BinaryObjectImpl(ctx, arr, start);
+        cp.part = part;
+
+        return cp;
+    }
+
+    /** {@inheritDoc} */
     @Override public int partition() {
         return part;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 0d3d36d..19e0842 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.internal.util.GridLeanMap;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -144,6 +145,56 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param cacheName Cache name.
+     * @param key Key.
+     * @return Key partition.
+     * @throws IgniteCheckedException If failed.
+     */
+    public int partition(@Nullable String cacheName, Object key) throws IgniteCheckedException {
+        return partition(cacheName, key, null);
+    }
+
+    /**
+     * @param cacheName Cache name (needed only if {@code aff} is not provided.
+     * @param key Key.
+     * @param aff Affinity information.
+     * @return Key partition.
+     * @throws IgniteCheckedException If failed.
+     */
+    public int partition(@Nullable String cacheName,
+        Object key,
+        @Nullable AffinityInfo aff) throws IgniteCheckedException {
+        if (key instanceof KeyCacheObject) {
+            int part = ((KeyCacheObject)key).partition();
+
+            if (part >= 0)
+                return part;
+        }
+
+        return partition0(cacheName, key, aff);
+    }
+
+    /**
+     * @param cacheName Cache name (needed only if {@code aff} is not provided.
+     * @param key Key.
+     * @param aff Affinity.
+     * @return Key partition.
+     * @throws IgniteCheckedException If failed.
+     */
+    public int partition0(@Nullable String cacheName,
+        Object key,
+        @Nullable AffinityInfo aff) throws IgniteCheckedException {
+        if (aff == null) {
+            aff = affinityCache(cacheName, ctx.discovery().topologyVersionEx());
+
+            if (aff == null)
+                throw new IgniteCheckedException("Failed to get cache affinity.");
+        }
+
+        return aff.affFunc.partition(aff.affinityKey(key));
+    }
+
+    /**
      * Maps keys to nodes for given cache.
      *
      * @param cacheName Cache name.
@@ -206,7 +257,9 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
         if (affInfo == null)
             return Collections.emptyList();
 
-        return primaryAndBackups(affInfo, key);
+        int part = partition(cacheName, key, affInfo);
+
+        return affInfo.assignment.get(part);
     }
 
     /**
@@ -224,13 +277,10 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
 
         AffinityInfo affInfo = affinityCache(cacheName, ctx.discovery().topologyVersionEx());
 
-        if (affInfo == null || affInfo.mapper == null)
+        if (affInfo == null)
             return null;
 
-        if (key instanceof CacheObject)
-            key = ((CacheObject)key).value(affInfo.cacheObjCtx, false);
-
-        return affInfo.mapper.affinityKey(key);
+        return affInfo.affinityKey(key);
     }
 
     /**
@@ -484,10 +534,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException In case of error.
      */
     private <K> ClusterNode primary(AffinityInfo aff, K key) throws IgniteCheckedException {
-        if (key instanceof CacheObject && !(key instanceof BinaryObject))
-            key = ((CacheObject)key).value(aff.cacheObjCtx, false);
-
-        int part = aff.affFunc.partition(aff.mapper.affinityKey(key));
+        int part = partition(null, key, aff);
 
         Collection<ClusterNode> nodes = aff.assignment.get(part);
 
@@ -497,20 +544,6 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
         return nodes.iterator().next();
     }
 
-    /**
-     * @param aff Affinity function.
-     * @param key Key to check.
-     * @return Primary and backup nodes.
-     */
-    private <K> List<ClusterNode> primaryAndBackups(AffinityInfo aff, K key) {
-        if (key instanceof CacheObject && !(key instanceof BinaryObject))
-            key = ((CacheObject)key).value(aff.cacheObjCtx, false);
-
-        int part = aff.affFunc.partition(aff.mapper.affinityKey(key));
-
-        return aff.assignment.get(part);
-    }
-
     /** {@inheritDoc} */
     @Override public void printMemoryStats() {
         X.println(">>>");
@@ -551,6 +584,17 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
         }
 
         /**
+         * @param key Key.
+         * @return Affinity key.
+         */
+        private Object affinityKey(Object key) {
+            if (key instanceof CacheObject && !(key instanceof BinaryObject))
+                key = ((CacheObject)key).value(cacheObjCtx, false);
+
+            return mapper.affinityKey(key);
+        }
+
+        /**
          * @return Cache affinity function.
          */
         private AffinityFunction affinityFunction() {
@@ -564,13 +608,6 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
             return assignment;
         }
 
-        /**
-         * @return Key mapper.
-         */
-        private AffinityKeyMapper keyMapper() {
-            return mapper;
-        }
-
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(AffinityInfo.class, this);
@@ -658,7 +695,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
             ctx.gateway().readLock();
 
             try {
-                return cache().affinityFunction().partition(key);
+                return partition0(cacheName, key, cache());
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);
@@ -769,10 +806,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
             ctx.gateway().readLock();
 
             try {
-                if (key instanceof CacheObject)
-                    key = ((CacheObject)key).value(cache().cacheObjCtx, false);
-
-                return cache().keyMapper().affinityKey(key);
+                return cache().affinityKey(key);
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);
@@ -828,7 +862,9 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
             ctx.gateway().readLock();
 
             try {
-                return cache().assignment().get(partition(key));
+                AffinityInfo aff = cache();
+
+                return aff.assignment().get(GridAffinityProcessor.this.partition(cacheName, key, aff));
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);
@@ -861,12 +897,17 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
                 Map<Integer, ClusterNode> map = new HashMap<>();
 
                 if (!F.isEmpty(parts)) {
+                    AffinityInfo aff = cache();
+
                     for (int p : parts)
-                        map.put(p, mapPartitionToNode(p));
+                        map.put(p, F.first(aff.assignment().get(p)));
                 }
 
                 return map;
             }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
             finally {
                 ctx.gateway().readUnlock();
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
index d7fdb83..c4203ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
@@ -41,6 +41,9 @@ import org.apache.ignite.internal.util.typedef.F;
     private IgniteCacheObjectProcessor proc;
 
     /** */
+    private String cacheName;
+
+    /** */
     private AffinityKeyMapper dfltAffMapper;
 
     /** */
@@ -63,11 +66,13 @@ import org.apache.ignite.internal.util.typedef.F;
      * @param addDepInfo {@code true} if deployment info should be associated with the objects of this cache.
      */
     public CacheObjectContext(GridKernalContext kernalCtx,
+        String cacheName,
         AffinityKeyMapper dfltAffMapper,
         boolean cpyOnGet,
         boolean storeVal,
         boolean addDepInfo) {
         this.kernalCtx = kernalCtx;
+        this.cacheName = cacheName;
         this.dfltAffMapper = dfltAffMapper;
         this.cpyOnGet = cpyOnGet;
         this.storeVal = storeVal;
@@ -78,6 +83,13 @@ import org.apache.ignite.internal.util.typedef.F;
     }
 
     /**
+     * @return Cache name.
+     */
+    public String cacheName() {
+        return cacheName;
+    }
+
+    /**
      * @return {@code True} if peer class loading is enabled.
      */
     public boolean p2pEnabled() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 90669e0..1ac94a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1387,15 +1387,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         long start = statsEnabled ? System.nanoTime() : 0L;
 
-        boolean keeyBinary = ctx.keepBinary();
+        boolean keepBinary = ctx.keepBinary();
 
-        if (keeyBinary)
+        if (keepBinary)
             key = (K)ctx.toCacheKeyObject(key);
 
-        V val = get(key, !keeyBinary, false);
+        V val = get(key, !keepBinary, false);
 
         if (ctx.config().getInterceptor() != null) {
-            key = keeyBinary ? (K) ctx.unwrapBinaryIfNeeded(key, true, false) : key;
+            key = keepBinary ? (K) ctx.unwrapBinaryIfNeeded(key, true, false) : key;
 
             val = (V)ctx.config().getInterceptor().onGet(key, val);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 5e843dc..71ae5c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -187,28 +187,36 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param key Key.
+     * @return Partition.
+     */
+    public int partition(Object key) {
+        return partition(key, true);
+    }
+
+    /**
      * NOTE: Use this method always when you need to calculate partition id for
      * a key provided by user. It's required since we should apply affinity mapper
      * logic in order to find a key that will eventually be passed to affinity function.
      *
      * @param key Key.
+     * @param useKeyPart If {@code true} can use pre-calculated partition stored in KeyCacheObject.
      * @return Partition.
      */
-    public int partition(Object key) {
+    public int partition(Object key, boolean useKeyPart) {
         GridAffinityAssignmentCache aff0 = aff;
 
-        if (key instanceof KeyCacheObject && ((KeyCacheObject)key).partition() != -1)
-            return ((KeyCacheObject)key).partition();
-
         if (aff0 == null)
             throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name());
 
-        int p = affFunction.partition(affinityKey(key));
+        if (useKeyPart && (key instanceof KeyCacheObject)) {
+            int part = ((KeyCacheObject)key).partition();
 
-        if (key instanceof KeyCacheObject)
-            ((KeyCacheObject)key).partition(p);
+            if (part != -1)
+                return part;
+        }
 
-        return p;
+        return affFunction.partition(affinityKey(key));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 36d9104..ba923df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1788,20 +1788,9 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @return Cache key object.
      */
     public KeyCacheObject toCacheKeyObject(Object obj) {
-        return toCacheKeyObject(obj, false);
-    }
-
-    /**
-     * @param obj Object.
-     * @return Cache key object.
-     */
-    public KeyCacheObject toCacheKeyObject(Object obj, boolean includePartition) {
         assert validObjectForCache(obj) : obj;
 
-        if (includePartition)
-            return cacheObjects().toCacheKeyObject(cacheObjCtx, obj, true, affinity().partition(obj));
-        else
-            return cacheObjects().toCacheKeyObject(cacheObjCtx, obj, true);
+        return cacheObjects().toCacheKeyObject(cacheObjCtx, this, obj, true);
     }
 
     /**
@@ -1822,7 +1811,7 @@ public class GridCacheContext<K, V> implements Externalizable {
     public KeyCacheObject toCacheKeyObject(byte[] bytes) throws IgniteCheckedException {
         Object obj = ctx.cacheObjects().unmarshal(cacheObjCtx, bytes, deploy().localLoader());
 
-        return cacheObjects().toCacheKeyObject(cacheObjCtx, obj, false);
+        return cacheObjects().toCacheKeyObject(cacheObjCtx, this, obj, false);
     }
 
     /**
@@ -1970,21 +1959,12 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @return Read-only collection of KeyCacheObject instances.
      */
     public Collection<KeyCacheObject> cacheKeysView(Collection<?> keys) {
-        return cacheKeysView(keys, false);
-    }
-
-    /**
-     * @param keys Keys.
-     * @param includePartition Include partition.
-     * @return Read-only collection of KeyCacheObject instances.
-     */
-    public Collection<KeyCacheObject> cacheKeysView(Collection<?> keys, final boolean includePartition) {
         return F.viewReadOnly(keys, new C1<Object, KeyCacheObject>() {
             @Override public KeyCacheObject apply(Object key) {
                 if (key == null)
                     throw new NullPointerException("Null key.");
 
-                return toCacheKeyObject(key, includePartition);
+                return toCacheKeyObject(key);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
index c42e788..f281227 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
@@ -407,7 +407,7 @@ public class GridCacheEntryInfo implements Message {
 
             Object key0 = ctx.cacheObjects().unmarshal(cacheObjCtx, keyBytes, clsLdr);
 
-            key = ctx.cacheObjects().toCacheKeyObject(cacheObjCtx, key0, false);
+            key = ctx.cacheObjects().toCacheKeyObject(cacheObjCtx, ctx, key0, false);
         }
         else
             key.finishUnmarshal(ctx.cacheObjectContext(), clsLdr);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java
index ffb846c..21b1f89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java
@@ -38,7 +38,14 @@ public interface KeyCacheObject extends CacheObject {
 
     /**
      * Sets partition ID for this key.
+     *
      * @param part Partition ID.
      */
     public void partition(int part);
+
+    /**
+     * @param part Partition ID.
+     * @return Copy of this object with given partition set.
+     */
+    public KeyCacheObject copy(int part);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
index 35e681c..146e554 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
@@ -61,6 +61,14 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
     }
 
     /** {@inheritDoc} */
+    @Override public KeyCacheObject copy(int part) {
+        if (this.part == part)
+            return this;
+
+        return new KeyCacheObjectImpl(val, valBytes, part);
+    }
+
+    /** {@inheritDoc} */
     @Override public int partition() {
         return part;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java
index ec01f48..26c713c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java
@@ -31,18 +31,24 @@ public class CacheObjectBinaryContext extends CacheObjectContext {
 
     /**
      * @param kernalCtx Kernal context.
+     * @param cacheName Cache name.
      * @param binaryEnabled Binary enabled flag.
      * @param cpyOnGet Copy on get flag.
      * @param storeVal {@code True} if should store unmarshalled value in cache.
      * @param depEnabled {@code true} if deployment is enabled for the given cache.
      */
     public CacheObjectBinaryContext(GridKernalContext kernalCtx,
+        String cacheName,
         boolean cpyOnGet,
         boolean storeVal,
         boolean binaryEnabled,
         boolean depEnabled) {
-        super(kernalCtx, binaryEnabled ? new CacheDefaultBinaryAffinityKeyMapper() :
-            new GridCacheDefaultAffinityKeyMapper(), cpyOnGet, storeVal, depEnabled);
+        super(kernalCtx,
+            cacheName,
+            binaryEnabled ? new CacheDefaultBinaryAffinityKeyMapper() : new GridCacheDefaultAffinityKeyMapper(),
+            cpyOnGet,
+            storeVal,
+            depEnabled);
 
         this.binaryEnabled = binaryEnabled;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 8400594..6d980a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -728,6 +728,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
         CacheObjectContext ctx0 = super.contextForCache(cfg);
 
         CacheObjectContext res = new CacheObjectBinaryContext(ctx,
+            cfg.getName(),
             ctx0.copyOnGet(),
             ctx0.storeValue(),
             binaryEnabled,
@@ -760,43 +761,38 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     }
 
     /** {@inheritDoc} */
-    @Override public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj) {
-        return toCacheKeyObject(ctx, obj, userObj, -1);
-    }
-
-    /** {@inheritDoc} */
     @Override public KeyCacheObject toCacheKeyObject(
         CacheObjectContext ctx,
+        @Nullable GridCacheContext cctx,
         Object obj,
-        boolean userObj,
-        int partition
+        boolean userObj
     ) {
         if (!((CacheObjectBinaryContext)ctx).binaryEnabled())
-            return super.toCacheKeyObject(ctx, obj, userObj, partition);
+            return super.toCacheKeyObject(ctx, cctx, obj, userObj);
 
         if (obj instanceof KeyCacheObject) {
-            if (obj instanceof BinaryObjectImpl) {
+            KeyCacheObject key = (KeyCacheObject)obj;
+
+            if (key instanceof BinaryObjectImpl) {
                 // Need to create a copy because the key can be reused at the application layer after that (IGNITE-3505).
-                BinaryObjectImpl bObj = (BinaryObjectImpl)obj;
-                obj = new BinaryObjectImpl(bObj.context(), bObj.array(), bObj.start());
+                key = key.copy(partition(ctx, cctx, key));
             }
+            else if (key.partition() == -1)
+                // Assume others KeyCacheObjects can not be reused for another cache.
+                key.partition(partition(ctx, cctx, key));
 
-            ((KeyCacheObject)obj).partition(partition);
-
-            return (KeyCacheObject)obj;
+            return key;
         }
 
-        if (((CacheObjectBinaryContext)ctx).binaryEnabled()) {
-            obj = toBinary(obj);
+        obj = toBinary(obj);
 
-            if (obj instanceof KeyCacheObject) {
-                ((KeyCacheObject)obj).partition(partition);
+        if (obj instanceof BinaryObjectImpl) {
+            ((BinaryObjectImpl)obj).partition(partition(ctx, cctx, obj));
 
-                return (KeyCacheObject)obj;
-            }
+            return (KeyCacheObject)obj;
         }
 
-        return toCacheKeyObject0(obj, userObj, partition);
+        return toCacheKeyObject0(ctx, cctx, obj, userObj);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 561c6c6..3616082 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -344,7 +344,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final boolean skipStore = opCtx != null && opCtx.skipStore();
 
         try {
-            return getAsync0(ctx.toCacheKeyObject(key, true),
+            return getAsync0(ctx.toCacheKeyObject(key),
                 !ctx.config().isReadFromBackup(),
                 subjId,
                 taskName,
@@ -390,7 +390,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         return asyncOp(new CO<IgniteInternalFuture<V>>() {
             @Override public IgniteInternalFuture<V> apply() {
-                return getAsync0(ctx.toCacheKeyObject(key, true),
+                return getAsync0(ctx.toCacheKeyObject(key),
                     forcePrimary,
                     subjId0,
                     taskName,
@@ -436,7 +436,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() {
             @Override public IgniteInternalFuture<Map<K, V>> apply() {
-                return getAllAsync0(ctx.cacheKeysView(keys, true),
+                return getAllAsync0(ctx.cacheKeysView(keys),
                     forcePrimary,
                     subjId0,
                     taskName,

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index b2f2704..0d88ef8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -883,7 +883,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             if (val == null && op != GridCacheOperation.DELETE)
                 continue;
 
-            KeyCacheObject cacheKey = cctx.toCacheKeyObject(key, true);
+            KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
 
             if (remapKeys != null && !remapKeys.contains(cacheKey))
                 continue;
@@ -1000,7 +1000,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         if (val == null && op != GridCacheOperation.DELETE)
             throw new NullPointerException("Null value.");
 
-        KeyCacheObject cacheKey = cctx.toCacheKeyObject(key, true);
+        KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
 
         if (op != TRANSFORM)
             val = cctx.toCacheObject(val);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index b8ac301..27000b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -133,23 +133,14 @@ public interface IgniteCacheObjectProcessor extends GridProcessor {
     public CacheObjectContext contextForCache(CacheConfiguration ccfg) throws IgniteCheckedException;
 
     /**
-     * @param ctx Cache context.
-     * @param obj Key value.
-     * @param userObj If {@code true} then given object is object provided by user and should be copied
-     *        before stored in cache.
-     * @return Cache key object.
-     */
-    public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj);
-
-    /**
-     * @param ctx Cache context.
+     * @param ctx Cache objects context.
+     * @param cctx Cache context if cache is available.
      * @param obj Key value.
      * @param userObj If {@code true} then given object is object provided by user and should be copied
      *        before stored in cache.
-     * @param partition ID of partition this key belongs to.
      * @return Cache key object.
      */
-    public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj, int partition);
+    public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, @Nullable GridCacheContext cctx, Object obj, boolean userObj);
 
     /**
      * @param ctx Cache context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 3203548..9fd4c1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -108,18 +108,21 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
     }
 
     /** {@inheritDoc} */
-    @Override @Nullable public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj) {
-        return toCacheKeyObject(ctx, obj, userObj, -1);
-    }
-
-    /** {@inheritDoc} */
-    @Override public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj, int partition) {
+    @Override public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx,
+        @Nullable GridCacheContext cctx,
+        Object obj,
+        boolean userObj) {
         if (obj instanceof KeyCacheObject) {
-            ((KeyCacheObject)obj).partition(partition);
+            KeyCacheObject key = (KeyCacheObject)obj;
+
+            if (key.partition() == -1)
+                // Assume all KeyCacheObjects except BinaryObject can not be reused for another cache.
+                key.partition(partition(ctx, cctx, key));
+
             return (KeyCacheObject)obj;
         }
 
-        return toCacheKeyObject0(obj, userObj, partition);
+        return toCacheKeyObject0(ctx, cctx, obj, userObj);
     }
 
     /**
@@ -129,11 +132,16 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
      * @return Key cache object.
      */
     @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
-    protected KeyCacheObject toCacheKeyObject0(Object obj, boolean userObj, int partititon) {
+    protected KeyCacheObject toCacheKeyObject0(CacheObjectContext ctx,
+        @Nullable GridCacheContext cctx,
+        Object obj,
+        boolean userObj) {
+        int part = partition(ctx, cctx, obj);
+
         if (!userObj)
-            return new KeyCacheObjectImpl(obj, null, partititon);
+            return new KeyCacheObjectImpl(obj, null, part);
 
-        return new UserKeyCacheObjectImpl(obj, partititon);
+        return new UserKeyCacheObjectImpl(obj, part);
     }
 
     /** {@inheritDoc} */
@@ -207,6 +215,25 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
         return new UserCacheObjectImpl(obj, null);
     }
 
+    /**
+     * @param ctx Cache objects context.
+     * @param cctx Cache context.
+     * @param obj Object.
+     * @return Object partition.
+     */
+    protected final int partition(CacheObjectContext ctx, @Nullable GridCacheContext cctx, Object obj) {
+        try {
+            return cctx != null ?
+                cctx.affinity().partition(obj, false) :
+                ctx.kernalContext().affinity().partition0(ctx.cacheName(), obj, null);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to get partition");
+
+            return  -1;
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public CacheObjectContext contextForCache(CacheConfiguration ccfg) throws IgniteCheckedException {
         assert ccfg != null;
@@ -218,6 +245,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
             !ccfg.isCopyOnRead();
 
         CacheObjectContext res = new CacheObjectContext(ctx,
+            ccfg.getName(),
             ccfg.getAffinityMapper() != null ? ccfg.getAffinityMapper() : new GridCacheDefaultAffinityKeyMapper(),
             ccfg.isCopyOnRead() && memMode != OFFHEAP_VALUES,
             storeVal,
@@ -304,8 +332,23 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
         /**
          * @param key Key.
          */
-        UserKeyCacheObjectImpl(Object key, int partition) {
-            super(key, null, partition);
+        UserKeyCacheObjectImpl(Object key, int part) {
+            super(key, null, part);
+        }
+
+        /**
+         * @param key Key.
+         */
+        UserKeyCacheObjectImpl(Object key, byte[] valBytes, int part) {
+            super(key, valBytes, part);
+        }
+
+        /** {@inheritDoc} */
+        @Override public KeyCacheObject copy(int part) {
+            if (this.partition() == part)
+                return this;
+
+            return new UserKeyCacheObjectImpl(val, valBytes, part);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 21df559..e565cba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -519,12 +519,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                 keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
 
                 for (Map.Entry<K, V> entry : entries)
-                    keys.add(cacheObjProc.toCacheKeyObject(cacheObjCtx, entry.getKey(), true));
+                    keys.add(cacheObjProc.toCacheKeyObject(cacheObjCtx, null, entry.getKey(), true));
             }
 
             Collection<? extends DataStreamerEntry> entries0 = F.viewReadOnly(entries, new C1<Entry<K, V>, DataStreamerEntry>() {
                 @Override public DataStreamerEntry apply(Entry<K, V> e) {
-                    KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, e.getKey(), true);
+                    KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, e.getKey(), true);
                     CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, e.getValue(), true);
 
                     return new DataStreamerEntry(key, val);
@@ -619,7 +619,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         else
             checkSecurityPermission(SecurityPermission.CACHE_PUT);
 
-        KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, key, true);
+        KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, key, true);
         CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true);
 
         return addDataInternal(Collections.singleton(new DataStreamerEntry(key0, val0)));

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java
index 47b01f4..fc50541 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java
@@ -57,15 +57,15 @@ public class AffinityClientNodeSelfTest extends GridCommonAbstractTest {
     /** */
     private static final String CACHE4 = "cache4";
 
+    /** */
+    private static final String CACHE5 = "cache5";
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
-        if (gridName.equals(getTestGridName(NODE_CNT - 1)))
-            cfg.setClientMode(true);
-
         CacheConfiguration ccfg1 = new CacheConfiguration();
 
         ccfg1.setBackups(1);
@@ -92,7 +92,18 @@ public class AffinityClientNodeSelfTest extends GridCommonAbstractTest {
         ccfg4.setName(CACHE4);
         ccfg4.setNodeFilter(new TestNodesFilter());
 
-        cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg3, ccfg4);
+        CacheConfiguration ccfg5 = new CacheConfiguration();
+
+        ccfg5.setBackups(1);
+        ccfg5.setName(CACHE5);
+
+        if (gridName.equals(getTestGridName(NODE_CNT - 1))) {
+            cfg.setClientMode(true);
+
+            cfg.setCacheConfiguration(ccfg5);
+        }
+        else
+            cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg3, ccfg4);
 
         return cfg;
     }
@@ -123,6 +134,8 @@ public class AffinityClientNodeSelfTest extends GridCommonAbstractTest {
 
         checkCache(CACHE4, 3);
 
+        checkCache(CACHE5, 2);
+
         Ignite client = ignite(NODE_CNT - 1);
 
         CacheConfiguration ccfg = new CacheConfiguration();
@@ -157,6 +170,8 @@ public class AffinityClientNodeSelfTest extends GridCommonAbstractTest {
     private void checkCache(String cacheName, int expNodes) {
         log.info("Test cache: " + cacheName);
 
+        Affinity<Object> aff0 = ignite(0).affinity(cacheName);
+
         Ignite client = ignite(NODE_CNT - 1);
 
         assertTrue(client.configuration().isClientMode());
@@ -166,14 +181,22 @@ public class AffinityClientNodeSelfTest extends GridCommonAbstractTest {
         for (int i = 0; i < NODE_CNT; i++) {
             Ignite ignite = ignite(i);
 
-            Affinity<Integer> aff = ignite.affinity(cacheName);
+            Affinity<Object> aff = ignite.affinity(cacheName);
 
             for (int part = 0; part < aff.partitions(); part++) {
                 Collection<ClusterNode> nodes = aff.mapPartitionToPrimaryAndBackups(part);
 
                 assertEquals(expNodes, nodes.size());
+                assertEquals(aff0.mapPartitionToPrimaryAndBackups(part), nodes);
 
                 assertFalse(nodes.contains(clientNode));
+
+                assertEquals(aff0.partition(part), aff.partition(part));
+
+                TestKey key = new TestKey(part, part + 1);
+
+                assertEquals(aff0.partition(key), aff.partition(key));
+                assertEquals(aff0.mapKeyToPrimaryAndBackups(key), aff.mapKeyToPrimaryAndBackups(key));
             }
         }
     }
@@ -181,6 +204,46 @@ public class AffinityClientNodeSelfTest extends GridCommonAbstractTest {
     /**
      *
      */
+    static class TestKey {
+        /** */
+        private int id;
+
+        /** */
+        @AffinityKeyMapped
+        private int affId;
+
+        /**
+         * @param id ID.
+         * @param affId Affinity ID.
+         */
+        public TestKey(int id, int affId) {
+            this.id = id;
+            this.affId = affId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey testKey = (TestKey)o;
+
+            return id == testKey.id;
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+    }
+
+    /**
+     *
+     */
     private static class TestNodesFilter implements IgnitePredicate<ClusterNode> {
         /** {@inheritDoc} */
         @Override public boolean apply(ClusterNode clusterNode) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
index 1285db7..f4c1bf7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
@@ -26,7 +26,6 @@ import java.lang.reflect.Field;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
 import java.lang.reflect.Proxy;
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -90,13 +89,11 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.INSTANCE;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertNotEquals;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 /**
  * Binary marshaller tests.
  */
@@ -2401,7 +2398,7 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest {
 
         BinaryObjectImpl po = marshal(simpleObject(), marsh);
 
-        CacheObjectContext coCtx = new CacheObjectContext(newContext(), null, false, true, false);
+        CacheObjectContext coCtx = new CacheObjectContext(newContext(), null, null, false, true, false);
 
         assert po.value(coCtx, false) == po.value(coCtx, false);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectBuilderAdditionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectBuilderAdditionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectBuilderAdditionalSelfTest.java
index be2ce9b..f999ad3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectBuilderAdditionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectBuilderAdditionalSelfTest.java
@@ -21,6 +21,26 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
 import junit.framework.TestCase;
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteCache;
@@ -29,9 +49,6 @@ import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjectBuilder;
 import org.apache.ignite.binary.BinaryObjectException;
 import org.apache.ignite.binary.BinaryType;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.BinaryConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -46,27 +63,6 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Assert;
 
-import java.lang.reflect.Field;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.sql.Timestamp;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
-
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 
@@ -1497,7 +1493,5 @@ public class BinaryObjectBuilderAdditionalSelfTest extends GridCommonAbstractTes
         assert OBJ.equals(binaryObj.type().fieldTypeName("asListHint"));
         assert OBJ.equals(binaryObj.type().fieldTypeName("asSetHint"));
         assert OBJ.equals(binaryObj.type().fieldTypeName("asMapHint"));
-
     }
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java
index c0ba42c..34480a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java
@@ -463,7 +463,7 @@ public abstract class CacheGetEntryAbstractTest extends GridCacheAbstractSelfTes
             CacheObjectContext cacheObjCtx = cacheAdapter.context().cacheObjectContext();
 
             GridCacheMapEntry mapEntry = cacheAdapter.map().getEntry(cacheObjects.toCacheKeyObject(
-                cacheObjCtx, e.getKey(), true));
+                cacheObjCtx, null, e.getKey(), true));
 
             assertNotNull("No entry for key: " + e.getKey(), mapEntry);
             assertEquals(mapEntry.version(), e.version());

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTtlCleanupSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTtlCleanupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTtlCleanupSelfTest.java
index e166acb..42027e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTtlCleanupSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTtlCleanupSelfTest.java
@@ -82,6 +82,6 @@ public class IgniteCacheTtlCleanupSelfTest extends GridCacheAbstractSelfTest {
         CacheObjectContext cacheObjCtx = cacheAdapter.context().cacheObjectContext();
 
         for (int i = 0; i < 100; i++)
-            assertNull(cacheAdapter.map().getEntry(cacheObjects.toCacheKeyObject(cacheObjCtx, i, true)));
+            assertNull(cacheAdapter.map().getEntry(cacheObjects.toCacheKeyObject(cacheObjCtx, null, i, true)));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheBinaryKeyConcurrentQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheBinaryKeyConcurrentQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheBinaryKeyConcurrentQueryTest.java
new file mode 100644
index 0000000..e2de281
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheBinaryKeyConcurrentQueryTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class CacheBinaryKeyConcurrentQueryTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 3;
+
+    /** */
+    private static final int KEYS = 1000;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setMarshaller(null);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAndQueries() throws Exception {
+        Ignite ignite = ignite(0);
+
+        IgniteCache cache1 = ignite.createCache(cacheConfiguration("cache1", ATOMIC));
+        IgniteCache cache2 = ignite.createCache(cacheConfiguration("cache2", TRANSACTIONAL));
+
+        insertData(ignite, cache1.getName());
+        insertData(ignite, cache2.getName());
+
+        IgniteInternalFuture<?> fut1 = startUpdate(cache1.getName());
+        IgniteInternalFuture<?> fut2 = startUpdate(cache2.getName());
+
+        fut1.get();
+        fut2.get();
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Future.
+     */
+    private IgniteInternalFuture<?> startUpdate(final String cacheName) {
+        final long stopTime = System.currentTimeMillis() + 30_000;
+
+        final AtomicInteger idx = new AtomicInteger();
+
+        return GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+            @Override public Void call() {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                IgniteCache cache = ignite(idx.getAndIncrement() % NODES).cache(cacheName).withKeepBinary();
+
+                while (System.currentTimeMillis() < stopTime) {
+                    switch (rnd.nextInt(5)) {
+                        case 0: {
+                            TestKey key = new TestKey(rnd.nextInt(KEYS));
+
+                            CacheEntry e = cache.getEntry(key);
+
+                            assertNotNull(e);
+                            assertTrue(e.getKey() instanceof BinaryObject);
+
+                            cache.put(e.getKey(), new TestValue(rnd.nextInt(KEYS)));
+
+                            break;
+                        }
+
+                        case 1: {
+                            Iterator<Cache.Entry> it = cache.iterator();
+
+                            for (int i = 0; i < 100 && it.hasNext(); i++) {
+                                Cache.Entry e = it.next();
+
+                                assertTrue(e.getKey() instanceof BinaryObject);
+
+                                cache.put(e.getKey(), new TestValue(rnd.nextInt(KEYS)));
+                            }
+
+                            break;
+                        }
+
+                        case 2: {
+                            SqlFieldsQuery qry = new SqlFieldsQuery("select _key " +
+                                "from \"" + cache.getName() + "\".TestValue where id=?");
+
+                            qry.setArgs(rnd.nextInt(KEYS));
+
+                            List<List> res = cache.query(qry).getAll();
+
+                            assertEquals(1, res.size());
+
+                            BinaryObject key = (BinaryObject)res.get(0).get(0);
+
+                            cache.put(key, new TestValue(rnd.nextInt(KEYS)));
+
+                            break;
+                        }
+
+                        case 3: {
+                            SqlQuery qry = new SqlQuery("TestValue", "id=?");
+
+                            qry.setArgs(rnd.nextInt(KEYS));
+
+                            List<Cache.Entry> res = cache.query(qry).getAll();
+
+                            assertEquals(1, res.size());
+
+                            break;
+                        }
+
+                        case 4: {
+                            SqlQuery qry = new SqlQuery("TestValue", "order by id");
+
+                            int cnt = 0;
+
+                            for (Cache.Entry e : (Iterable<Cache.Entry>)cache.query(qry)) {
+                                assertNotNull(cache.get(e.getKey()));
+
+                                cnt++;
+                            }
+
+                            assertTrue(cnt > 0);
+
+                            break;
+                        }
+
+                        default:
+                            fail();
+                    }
+                }
+
+                return null;
+            }
+        }, NODES * 2, "test-thread");
+    }
+
+    /**
+     * @param ignite Node.
+     * @param cacheName Cache name.
+     */
+    private void insertData(Ignite ignite, String cacheName) {
+        try (IgniteDataStreamer streamer = ignite.dataStreamer(cacheName)) {
+            for (int i = 0; i < KEYS; i++)
+                streamer.addData(new TestKey(i), new TestValue(i));
+        }
+    }
+
+    /**
+     * @param name Cache name.
+     * @param atomicityMode Cache atomicity mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setBackups(1);
+
+        QueryEntity qryEntity = new QueryEntity();
+
+        qryEntity.setKeyType(TestKey.class.getName());
+        qryEntity.setValueType(TestValue.class.getName());
+
+        qryEntity.addQueryField("id", Integer.class.getName(), null);
+        qryEntity.addQueryField("val", Integer.class.getName(), null);
+
+        qryEntity.setIndexes(F.asList(new QueryIndex("id"), new QueryIndex("val")));
+
+        ccfg.setQueryEntities(F.asList(qryEntity));
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    static class TestKey {
+        /** */
+        @QuerySqlField(index = true)
+        private int id;
+
+        /**
+         * @param id ID.
+         */
+        public TestKey(int id) {
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey testKey = (TestKey)o;
+
+            return id == testKey.id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+    }
+
+    /**
+     *
+     */
+    static class TestValue {
+        /** */
+        @QuerySqlField(index = true)
+        private int val;
+
+        /**
+         * @param val Value.
+         */
+        public TestValue(int val) {
+            this.val = val;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index a85b7a6..96e8551 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.CacheBinaryKeyConcurrentQueryTest;
 import org.apache.ignite.internal.processors.cache.CacheConfigurationP2PTest;
 import org.apache.ignite.internal.processors.cache.CacheIndexStreamerTest;
 import org.apache.ignite.internal.processors.cache.CacheOperationsWithExpirationTest;
@@ -76,6 +77,7 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite {
         suite.addTestSuite(CacheRandomOperationsMultithreadedTest.class);
         suite.addTestSuite(IgniteCacheStarvationOnRebalanceTest.class);
         suite.addTestSuite(CacheOperationsWithExpirationTest.class);
+        suite.addTestSuite(CacheBinaryKeyConcurrentQueryTest.class);
 
         return suite;
     }


Mime
View raw message