ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [02/51] [abbrv] ignite git commit: ignite-2042 Added special queue/set key classes to make collocation work with BinaryMarshaller. Also fixed issue with 'invoke' result with binary marshaller.
Date Wed, 09 Dec 2015 11:23:44 GMT
ignite-2042 Added special queue/set key classes to make collocation work with BinaryMarshaller. Also fixed issue with 'invoke' result with binary marshaller.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/50f6c013
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/50f6c013
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/50f6c013

Branch: refs/heads/ignite-843-rc2
Commit: 50f6c0131fd761f6231e7c2632a010c093000e70
Parents: 86ec37e
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Dec 3 16:50:00 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Dec 3 16:50:00 2015 +0300

----------------------------------------------------------------------
 .../internal/portable/BinaryReaderExImpl.java   |   2 +-
 .../internal/portable/PortableContext.java      |  34 +-
 .../processors/cache/GridCacheContext.java      |  30 ++
 .../CacheDataStructuresManager.java             |  31 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  20 +-
 .../CacheObjectBinaryProcessorImpl.java         |   8 +
 .../cache/query/GridCacheQueryManager.java      |  12 +-
 .../transactions/IgniteTxLocalAdapter.java      |  11 +-
 .../datastructures/CollocatedQueueItemKey.java  |  75 ++++
 .../datastructures/CollocatedSetItemKey.java    |  87 +++++
 .../datastructures/DataStructuresProcessor.java |   7 +-
 .../GridAtomicCacheQueueImpl.java               |   8 +-
 .../datastructures/GridCacheQueueAdapter.java   |  30 +-
 .../datastructures/GridCacheQueueItemKey.java   |   9 +-
 .../datastructures/GridCacheSetImpl.java        |  37 +-
 .../datastructures/GridCacheSetItemKey.java     |  21 +-
 .../GridTransactionalCacheQueueImpl.java        |   2 +-
 .../processors/datastructures/QueueItemKey.java |  27 ++
 .../processors/datastructures/SetItemKey.java   |  36 ++
 .../cache/IgniteCacheInvokeAbstractTest.java    | 369 ++++++++++++++-----
 ...eAbstractDataStructuresFailoverSelfTest.java |   7 +-
 .../GridCacheQueueApiSelfAbstractTest.java      |  18 +-
 .../GridCacheSetFailoverAbstractSelfTest.java   |   6 +-
 .../GridCachePartitionedQueueApiSelfTest.java   |   5 +
 ...dCachePartitionedQueueEntryMoveSelfTest.java |   2 +-
 .../IgnitePartitionedQueueNoBackupsTest.java    |  92 +++++
 .../GridCacheReplicatedQueueApiSelfTest.java    |   5 +
 .../GridCacheWriteBehindStoreAbstractTest.java  |   2 +-
 .../IgniteCacheDataStructuresSelfTestSuite.java |   3 +
 29 files changed, 778 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java
index ddbf6ba..91b67f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java
@@ -246,7 +246,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
                 dataStart = start + DFLT_HDR_LEN;
             }
 
-            idMapper = userType ? ctx.userTypeIdMapper(typeId) : null;
+            idMapper = userType ? ctx.userTypeIdMapper(typeId) : BinaryInternalIdMapper.defaultInstance();
             schema = PortableUtils.hasSchema(flags) ? getOrCreateSchema() : null;
         }
         else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
index 1482df9..fd6c41d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
@@ -66,6 +66,8 @@ import org.apache.ignite.binary.BinarySerializer;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.processors.cache.portable.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.datastructures.CollocatedQueueItemKey;
+import org.apache.ignite.internal.processors.datastructures.CollocatedSetItemKey;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.lang.GridMapEntry;
 import org.apache.ignite.internal.util.typedef.F;
@@ -233,7 +235,8 @@ public class PortableContext implements Externalizable {
 
     /**
      * @param marsh Portable marshaller.
-     * @throws org.apache.ignite.binary.BinaryObjectException In case of error.
+     * @param cfg Configuration.
+     * @throws BinaryObjectException In case of error.
      */
     public void configure(BinaryMarshaller marsh, IgniteConfiguration cfg) throws BinaryObjectException {
         if (marsh == null)
@@ -265,7 +268,7 @@ public class PortableContext implements Externalizable {
      * @param globalIdMapper ID mapper.
      * @param globalSerializer Serializer.
      * @param typeCfgs Type configurations.
-     * @throws org.apache.ignite.binary.BinaryObjectException In case of error.
+     * @throws BinaryObjectException In case of error.
      */
     private void configure(
         BinaryIdMapper globalIdMapper,
@@ -313,9 +316,8 @@ public class PortableContext implements Externalizable {
             }
         }
 
-        for (TypeDescriptor desc : descs.descriptors()) {
+        for (TypeDescriptor desc : descs.descriptors())
             registerUserType(desc.clsName, desc.idMapper, desc.serializer, desc.affKeyFieldName, desc.isEnum);
-        }
 
         BinaryInternalIdMapper dfltMapper = BinaryInternalIdMapper.create(globalIdMapper);
 
@@ -327,6 +329,20 @@ public class PortableContext implements Externalizable {
 
             affKeyFieldNames.putIfAbsent(typeId, entry.getValue());
         }
+
+        addSystemClassAffinityKey(CollocatedSetItemKey.class);
+        addSystemClassAffinityKey(CollocatedQueueItemKey.class);
+    }
+
+    /**
+     * @param cls Class.
+     */
+    private void addSystemClassAffinityKey(Class<?> cls) {
+        String fieldName = affinityFieldName(cls);
+
+        assert fieldName != null : cls;
+
+        affKeyFieldNames.putIfAbsent(cls.getName().hashCode(), affinityFieldName(cls));
     }
 
     /**
@@ -400,7 +416,7 @@ public class PortableContext implements Externalizable {
     /**
      * @param cls Class.
      * @return Class descriptor.
-     * @throws org.apache.ignite.binary.BinaryObjectException In case of error.
+     * @throws BinaryObjectException In case of error.
      */
     public PortableClassDescriptor descriptorForClass(Class<?> cls, boolean deserialize)
         throws BinaryObjectException {
@@ -722,7 +738,7 @@ public class PortableContext implements Externalizable {
      * @param serializer Serializer.
      * @param affKeyFieldName Affinity key field name.
      * @param isEnum If enum.
-     * @throws org.apache.ignite.binary.BinaryObjectException In case of error.
+     * @throws BinaryObjectException In case of error.
      */
     @SuppressWarnings("ErrorNotRethrown")
     public void registerUserType(String clsName,
@@ -808,7 +824,7 @@ public class PortableContext implements Externalizable {
     /**
      * @param typeId Type ID.
      * @return Meta data.
-     * @throws org.apache.ignite.binary.BinaryObjectException In case of error.
+     * @throws BinaryObjectException In case of error.
      */
     @Nullable public BinaryType metadata(int typeId) throws BinaryObjectException {
         return metaHnd != null ? metaHnd.metadata(typeId) : null;
@@ -964,7 +980,7 @@ public class PortableContext implements Externalizable {
          * @param affKeyFieldName Affinity key field name.
          * @param isEnum Enum flag.
          * @param canOverride Whether this descriptor can be override.
-         * @throws org.apache.ignite.binary.BinaryObjectException If failed.
+         * @throws BinaryObjectException If failed.
          */
         private void add(String clsName,
             BinaryIdMapper idMapper,
@@ -1044,7 +1060,7 @@ public class PortableContext implements Externalizable {
          * Override portable class descriptor.
          *
          * @param other Other descriptor.
-         * @throws org.apache.ignite.binary.BinaryObjectException If failed.
+         * @throws BinaryObjectException If failed.
          */
         private void override(TypeDescriptor other) throws BinaryObjectException {
             assert clsName.equals(other.clsName);

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 5b4f22c..d689ba6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -36,6 +36,7 @@ import javax.cache.Cache;
 import javax.cache.configuration.Factory;
 import javax.cache.expiry.EternalExpiryPolicy;
 import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheInterceptor;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.swapspace.GridSwapSpaceManager;
+import org.apache.ignite.internal.portable.BinaryMarshaller;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
@@ -1682,6 +1684,13 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
+     * @return {@code True} if {@link BinaryMarshaller is configured}.
+     */
+    public boolean binaryMarshaller() {
+        return marshaller() instanceof BinaryMarshaller;
+    }
+
+    /**
      * @return Keep portable flag.
      */
     public boolean keepPortable() {
@@ -1752,6 +1761,27 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
+     * @param resMap Invoke results map.
+     * @param keepBinary Keep binary flag.
+     * @return Unwrapped results.
+     */
+    public Map unwrapInvokeResult(@Nullable Map<Object, EntryProcessorResult> resMap, final boolean keepBinary) {
+        return F.viewReadOnly(resMap, new C1<EntryProcessorResult, EntryProcessorResult>() {
+            @Override public EntryProcessorResult apply(EntryProcessorResult res) {
+                if (res instanceof CacheInvokeResult) {
+                    CacheInvokeResult invokeRes = (CacheInvokeResult)res;
+
+                    if (invokeRes.result() != null)
+                        res = CacheInvokeResult.fromResult(unwrapPortableIfNeeded(invokeRes.result(),
+                            keepBinary, false));
+                }
+
+                return res;
+            }
+        });
+    }
+
+    /**
      * @return Cache object context.
      */
     public CacheObjectContext cacheObjectContext() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index ec787f8..6ec29b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -54,9 +54,9 @@ import org.apache.ignite.internal.processors.datastructures.GridCacheQueueProxy;
 import org.apache.ignite.internal.processors.datastructures.GridCacheSetHeader;
 import org.apache.ignite.internal.processors.datastructures.GridCacheSetHeaderKey;
 import org.apache.ignite.internal.processors.datastructures.GridCacheSetImpl;
-import org.apache.ignite.internal.processors.datastructures.GridCacheSetItemKey;
 import org.apache.ignite.internal.processors.datastructures.GridCacheSetProxy;
 import org.apache.ignite.internal.processors.datastructures.GridTransactionalCacheQueueImpl;
+import org.apache.ignite.internal.processors.datastructures.SetItemKey;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
@@ -79,7 +79,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
     private final ConcurrentMap<IgniteUuid, GridCacheSetProxy> setsMap;
 
     /** Set keys used for set iteration. */
-    private ConcurrentMap<IgniteUuid, GridConcurrentHashSet<GridCacheSetItemKey>> setDataMap =
+    private ConcurrentMap<IgniteUuid, GridConcurrentHashSet<SetItemKey>> setDataMap =
         new ConcurrentHashMap8<>();
 
     /** Queues map. */
@@ -311,12 +311,13 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
      *
      * @param key Key.
      * @param rmv {@code True} if entry was removed.
+     * @param keepPortable Keep portable flag.
      */
     public void onEntryUpdated(KeyCacheObject key, boolean rmv, boolean keepPortable) {
         Object key0 = cctx.cacheObjectContext().unwrapPortableIfNeeded(key, keepPortable, false);
 
-        if (key0 instanceof GridCacheSetItemKey)
-            onSetItemUpdated((GridCacheSetItemKey)key0, rmv);
+        if (key0 instanceof SetItemKey)
+            onSetItemUpdated((SetItemKey)key0, rmv);
     }
 
     /**
@@ -327,11 +328,11 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
     public void onPartitionEvicted(int part) {
         GridCacheAffinityManager aff = cctx.affinity();
 
-        for (GridConcurrentHashSet<GridCacheSetItemKey> set : setDataMap.values()) {
-            Iterator<GridCacheSetItemKey> iter = set.iterator();
+        for (GridConcurrentHashSet<SetItemKey> set : setDataMap.values()) {
+            Iterator<SetItemKey> iter = set.iterator();
 
             while (iter.hasNext()) {
-                GridCacheSetItemKey key = iter.next();
+                SetItemKey key = iter.next();
 
                 if (aff.partition(key) == part)
                     iter.remove();
@@ -415,7 +416,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
      * @param id Set ID.
      * @return Data for given set.
      */
-    @Nullable public GridConcurrentHashSet<GridCacheSetItemKey> setData(IgniteUuid id) {
+    @Nullable public GridConcurrentHashSet<SetItemKey> setData(IgniteUuid id) {
         return setDataMap.get(id);
     }
 
@@ -436,7 +437,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
             cctx.preloader().syncFuture().get();
         }
 
-        GridConcurrentHashSet<GridCacheSetItemKey> set = setDataMap.get(setId);
+        GridConcurrentHashSet<SetItemKey> set = setDataMap.get(setId);
 
         if (set == null)
             return;
@@ -445,9 +446,9 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
 
         final int BATCH_SIZE = 100;
 
-        Collection<GridCacheSetItemKey> keys = new ArrayList<>(BATCH_SIZE);
+        Collection<SetItemKey> keys = new ArrayList<>(BATCH_SIZE);
 
-        for (GridCacheSetItemKey key : set) {
+        for (SetItemKey key : set) {
             if (!loc && !aff.primary(cctx.localNode(), key, topVer))
                 continue;
 
@@ -555,14 +556,14 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
      * @param key Set item key.
      * @param rmv {@code True} if item was removed.
      */
-    private void onSetItemUpdated(GridCacheSetItemKey key, boolean rmv) {
-        GridConcurrentHashSet<GridCacheSetItemKey> set = setDataMap.get(key.setId());
+    private void onSetItemUpdated(SetItemKey key, boolean rmv) {
+        GridConcurrentHashSet<SetItemKey> set = setDataMap.get(key.setId());
 
         if (set == null) {
             if (rmv)
                 return;
 
-            GridConcurrentHashSet<GridCacheSetItemKey> old = setDataMap.putIfAbsent(key.setId(),
+            GridConcurrentHashSet<SetItemKey> old = setDataMap.putIfAbsent(key.setId(),
                 set = new GridConcurrentHashSet<>());
 
             if (old != null)
@@ -592,7 +593,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
-    private void retryRemoveAll(final IgniteInternalCache cache, final Collection<GridCacheSetItemKey> keys)
+    private void retryRemoveAll(final IgniteInternalCache cache, final Collection<SetItemKey> keys)
         throws IgniteCheckedException {
         DataStructuresProcessor.retry(log, new Callable<Void>() {
             @Override public Void call() throws Exception {

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d8ab62a..c5ec258 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -824,25 +824,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         return resFut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, Map<K, EntryProcessorResult<T>>>() {
             @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) throws IgniteCheckedException {
-                Map<K, EntryProcessorResult<T>> resMap = fut.get();
+                Map<Object, EntryProcessorResult> resMap = (Map)fut.get();
 
-                if (resMap != null) {
-                    return F.viewReadOnly(resMap, new C1<EntryProcessorResult<T>, EntryProcessorResult<T>>() {
-                        @Override public EntryProcessorResult<T> apply(EntryProcessorResult<T> res) {
-                            if (res instanceof CacheInvokeResult) {
-                                CacheInvokeResult invokeRes = (CacheInvokeResult)res;
-
-                                if (invokeRes.result() != null)
-                                    res = CacheInvokeResult.fromResult((T)ctx.unwrapPortableIfNeeded(invokeRes.result(),
-                                        keepBinary, false));
-                            }
-
-                            return res;
-                        }
-                    });
-                }
-
-                return null;
+                return ctx.unwrapInvokeResult(resMap, keepBinary);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
index 220a45a..d172bca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
@@ -602,6 +602,14 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
                 if (affKeyFieldName != null)
                     return po.field(affKeyFieldName);
             }
+            else if (po instanceof BinaryObjectEx) {
+                int id = ((BinaryObjectEx)po).typeId();
+
+                String affKeyFieldName = portableCtx.affinityKeyFieldName(id);
+
+                if (affKeyFieldName != null)
+                    return po.field(affKeyFieldName);
+            }
         }
         catch (BinaryObjectException e) {
             U.error(log, "Failed to get affinity field from portable object: " + po, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index bef587a..bb5d230 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -47,8 +47,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.datastructures.GridCacheSetItemKey;
 import org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicate;
+import org.apache.ignite.internal.processors.datastructures.SetItemKey;
 import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
@@ -761,21 +761,21 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
         IgniteUuid id = filter.setId();
 
-        Collection<GridCacheSetItemKey> data = cctx.dataStructures().setData(id);
+        Collection<SetItemKey> data = cctx.dataStructures().setData(id);
 
         if (data == null)
             data = Collections.emptyList();
 
         final GridIterator<IgniteBiTuple<K, V>> it = F.iterator(
             data,
-            new C1<GridCacheSetItemKey, IgniteBiTuple<K, V>>() {
-                @Override public IgniteBiTuple<K, V> apply(GridCacheSetItemKey e) {
+            new C1<SetItemKey, IgniteBiTuple<K, V>>() {
+                @Override public IgniteBiTuple<K, V> apply(SetItemKey e) {
                     return new IgniteBiTuple<>((K)e.item(), (V)Boolean.TRUE);
                 }
             },
             true,
-            new P1<GridCacheSetItemKey>() {
-                @Override public boolean apply(GridCacheSetItemKey e) {
+            new P1<SetItemKey>() {
+                @Override public boolean apply(SetItemKey e) {
                     return filter.apply(e, null);
                 }
             });

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index f13cff4..33c0fa9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -3220,8 +3220,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                         try {
                             txFut.get();
 
-                            return new GridCacheReturn(cacheCtx, true, keepBinary,
-                                implicitRes.value(), implicitRes.success());
+                            Object res = implicitRes.value();
+
+                            if (implicitRes.invokeResult()) {
+                                assert res == null || res instanceof Map : implicitRes;
+
+                                res = cacheCtx.unwrapInvokeResult((Map)res, keepBinary);
+                            }
+
+                            return new GridCacheReturn(cacheCtx, true, keepBinary, res, implicitRes.success());
                         }
                         catch (IgniteCheckedException | RuntimeException e) {
                             rollbackAsync();

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedQueueItemKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedQueueItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedQueueItemKey.java
new file mode 100644
index 0000000..8eb9fa0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedQueueItemKey.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ *
+ */
+public class CollocatedQueueItemKey implements QueueItemKey {
+    /** */
+    private IgniteUuid queueId;
+
+    /** */
+    @AffinityKeyMapped
+    private int queueNameHash;
+
+    /** */
+    private long idx;
+
+    /**
+     * @param queueId Queue unique ID.
+     * @param queueName Queue name.
+     * @param idx Item index.
+     */
+    public CollocatedQueueItemKey(IgniteUuid queueId, String queueName, long idx) {
+        this.queueId = queueId;
+        this.queueNameHash = queueName.hashCode();
+        this.idx = idx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        CollocatedQueueItemKey itemKey = (CollocatedQueueItemKey)o;
+
+        return idx == itemKey.idx && queueId.equals(itemKey.queueId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = queueId.hashCode();
+
+        res = 31 * res + (int)(idx ^ (idx >>> 32));
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CollocatedQueueItemKey.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedSetItemKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedSetItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedSetItemKey.java
new file mode 100644
index 0000000..94cffd4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedSetItemKey.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ *
+ */
+public class CollocatedSetItemKey implements SetItemKey {
+    /** */
+    private IgniteUuid setId;
+
+    /** */
+    @GridToStringInclude
+    private Object item;
+
+    /** */
+    @AffinityKeyMapped
+    private int setNameHash;
+
+    /**
+     * @param setName Set name.
+     * @param setId Set unique ID.
+     * @param item Set item.
+     */
+    public CollocatedSetItemKey(String setName, IgniteUuid setId, Object item) {
+        this.setNameHash = setName.hashCode();
+        this.setId = setId;
+        this.item = item;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid setId() {
+        return setId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object item() {
+        return item;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = setId.hashCode();
+
+        res = 31 * res + item.hashCode();
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        CollocatedSetItemKey that = (CollocatedSetItemKey)o;
+
+        return setId.equals(that.setId) && item.equals(that.item);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CollocatedSetItemKey.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 998bd92..9ed9350 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -113,12 +113,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     /** Initial capacity. */
     private static final int INITIAL_CAPACITY = 10;
 
-    /** */
-    private static final int MAX_UPDATE_RETRIES = 100;
-
-    /** */
-    private static final long RETRY_DELAY = 1;
-
     /** Initialization latch. */
     private final CountDownLatch initLatch = new CountDownLatch(1);
 
@@ -986,6 +980,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                     hdr.id(),
                     name,
                     hdr.collocated(),
+                    cctx.binaryMarshaller(),
                     hdr.head(),
                     hdr.tail(),
                     0);

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
index b433887..58d3efe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
@@ -55,7 +55,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
 
             checkRemoved(idx);
 
-            GridCacheQueueItemKey key = itemKey(idx);
+            QueueItemKey key = itemKey(idx);
 
             cache.getAndPut(key, item);
 
@@ -78,7 +78,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
 
                 checkRemoved(idx);
 
-                GridCacheQueueItemKey key = itemKey(idx);
+                QueueItemKey key = itemKey(idx);
 
                 T data = (T)cache.getAndRemove(key);
 
@@ -115,7 +115,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
 
             checkRemoved(idx);
 
-            Map<GridCacheQueueItemKey, T> putMap = new HashMap<>();
+            Map<QueueItemKey, T> putMap = new HashMap<>();
 
             for (T item : items) {
                 putMap.put(itemKey(idx), item);
@@ -140,7 +140,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
         if (idx != null) {
             checkRemoved(idx);
 
-            GridCacheQueueItemKey key = itemKey(idx);
+            QueueItemKey key = itemKey(idx);
 
             if (cache.remove(key))
                 return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index df1bd88..ca0250d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -58,9 +58,6 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
     protected static final long QUEUE_REMOVED_IDX = Long.MIN_VALUE;
 
     /** */
-    protected static final long RETRY_DELAY = 1;
-
-    /** */
     private static final int DFLT_CLEAR_BATCH_SIZE = 100;
 
     /** Logger. */
@@ -98,6 +95,9 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
     @GridToStringExclude
     private final Semaphore writeSem;
 
+    /** */
+    private final boolean binaryMarsh;
+
     /**
      * @param queueName Queue name.
      * @param hdr Queue hdr.
@@ -112,6 +112,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
         collocated = hdr.collocated();
         queueKey = new GridCacheQueueHeaderKey(queueName);
         cache = cctx.kernalContext().cache().internalCache(cctx.name());
+        binaryMarsh = cctx.binaryMarshaller();
 
         log = cctx.logger(getClass());
 
@@ -369,7 +370,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
 
             checkRemoved(t.get1());
 
-            removeKeys(cache, id, queueName, collocated, t.get1(), t.get2(), batchSize);
+            removeKeys(cache, id, queueName, collocated, binaryMarsh, t.get1(), t.get2(), batchSize);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -407,6 +408,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
      * @param id Queue unique ID.
      * @param name Queue name.
      * @param collocated Collocation flag.
+     * @param binaryMarsh {@code True} if binary marshaller is configured.
      * @param startIdx Start item index.
      * @param endIdx End item index.
      * @param batchSize Batch size.
@@ -418,14 +420,15 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
         IgniteUuid id,
         String name,
         boolean collocated,
+        boolean binaryMarsh,
         long startIdx,
         long endIdx,
         int batchSize)
         throws IgniteCheckedException {
-        Set<GridCacheQueueItemKey> keys = new HashSet<>(batchSize > 0 ? batchSize : 10);
+        Set<QueueItemKey> keys = new HashSet<>(batchSize > 0 ? batchSize : 10);
 
         for (long idx = startIdx; idx < endIdx; idx++) {
-            keys.add(itemKey(id, name, collocated, idx));
+            keys.add(itemKey(id, name, collocated, binaryMarsh, idx));
 
             if (batchSize > 0 && keys.size() == batchSize) {
                 cache.removeAll(keys);
@@ -536,8 +539,8 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
      * @param idx Item index.
      * @return Item key.
      */
-    protected GridCacheQueueItemKey itemKey(Long idx) {
-        return itemKey(id, queueName, collocated(), idx);
+    protected QueueItemKey itemKey(Long idx) {
+        return itemKey(id, queueName, collocated(), binaryMarsh, idx);
     }
 
     /** {@inheritDoc} */
@@ -558,11 +561,18 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
      * @param id Queue unique ID.
      * @param queueName Queue name.
      * @param collocated Collocation flag.
+     * @param binaryMarsh {@code True} if binary marshaller is configured.
      * @param idx Item index.
      * @return Item key.
      */
-    private static GridCacheQueueItemKey itemKey(IgniteUuid id, String queueName, boolean collocated, long idx) {
-        return collocated ? new CollocatedItemKey(id, queueName, idx) : new GridCacheQueueItemKey(id, queueName, idx);
+    private static QueueItemKey itemKey(IgniteUuid id,
+        String queueName,
+        boolean collocated,
+        boolean binaryMarsh,
+        long idx) {
+        return collocated ?
+            (binaryMarsh ? new CollocatedQueueItemKey(id, queueName, idx) : new CollocatedItemKey(id, queueName, idx)) :
+            new GridCacheQueueItemKey(id, queueName, idx);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueItemKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueItemKey.java
index c4cb7b1..df47e73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueItemKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueItemKey.java
@@ -21,7 +21,6 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.cache.GridCacheInternal;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
@@ -29,7 +28,7 @@ import org.apache.ignite.lang.IgniteUuid;
 /**
  * Queue item key.
  */
-class GridCacheQueueItemKey implements Externalizable, GridCacheInternal {
+class GridCacheQueueItemKey implements Externalizable, QueueItemKey {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -110,11 +109,11 @@ class GridCacheQueueItemKey implements Externalizable, GridCacheInternal {
 
     /** {@inheritDoc} */
     @Override public int hashCode() {
-        int result = queueId.hashCode();
+        int res = queueId.hashCode();
 
-        result = 31 * result + (int)(idx ^ (idx >>> 32));
+        res = 31 * res + (int)(idx ^ (idx >>> 32));
 
-        return result;
+        return res;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index 62eab61..f25e361 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -66,7 +66,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
     private final GridCacheContext ctx;
 
     /** Cache. */
-    private final IgniteInternalCache<GridCacheSetItemKey, Boolean> cache;
+    private final IgniteInternalCache<SetItemKey, Boolean> cache;
 
     /** Logger. */
     private final IgniteLogger log;
@@ -86,6 +86,9 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
     /** Removed flag. */
     private volatile boolean rmvd;
 
+    /** */
+    private final boolean binaryMarsh;
+
     /**
      * @param ctx Cache context.
      * @param name Set name.
@@ -97,6 +100,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
         this.name = name;
         id = hdr.id();
         collocated = hdr.collocated();
+        binaryMarsh = ctx.binaryMarshaller();
 
         cache = ctx.cache();
 
@@ -140,7 +144,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
             onAccess();
 
             if (ctx.isLocal() || ctx.isReplicated()) {
-                GridConcurrentHashSet<GridCacheSetItemKey> set = ctx.dataStructures().setData(id);
+                GridConcurrentHashSet<SetItemKey> set = ctx.dataStructures().setData(id);
 
                 return set != null ? set.size() : 0;
             }
@@ -171,7 +175,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
     @Override public boolean isEmpty() {
         onAccess();
 
-        GridConcurrentHashSet<GridCacheSetItemKey> set = ctx.dataStructures().setData(id);
+        GridConcurrentHashSet<SetItemKey> set = ctx.dataStructures().setData(id);
 
         return (set == null || set.isEmpty()) && size() == 0;
     }
@@ -180,7 +184,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
     @Override public boolean contains(Object o) {
         onAccess();
 
-        final GridCacheSetItemKey key = itemKey(o);
+        final SetItemKey key = itemKey(o);
 
         return retry(new Callable<Boolean>() {
             @Override public Boolean call() throws Exception {
@@ -193,7 +197,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
     @Override public boolean add(T o) {
         onAccess();
 
-        final GridCacheSetItemKey key = itemKey(o);
+        final SetItemKey key = itemKey(o);
 
         return retry(new Callable<Boolean>() {
             @Override public Boolean call() throws Exception {
@@ -206,7 +210,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
     @Override public boolean remove(Object o) {
         onAccess();
 
-        final GridCacheSetItemKey key = itemKey(o);
+        final SetItemKey key = itemKey(o);
 
         return retry(new Callable<Boolean>() {
             @Override public Boolean call() throws Exception {
@@ -231,7 +235,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
 
         boolean add = false;
 
-        Map<GridCacheSetItemKey, Boolean> addKeys = null;
+        Map<SetItemKey, Boolean> addKeys = null;
 
         for (T obj : c) {
             if (add) {
@@ -247,7 +251,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
                 }
             }
             else
-                add |= add(obj);
+                add = add(obj);
         }
 
         if (!F.isEmpty(addKeys))
@@ -262,7 +266,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
 
         boolean rmv = false;
 
-        Set<GridCacheSetItemKey> rmvKeys = null;
+        Set<SetItemKey> rmvKeys = null;
 
         for (Object obj : c) {
             if (rmv) {
@@ -278,7 +282,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
                 }
             }
             else
-                rmv |= remove(obj);
+                rmv = remove(obj);
         }
 
         if (!F.isEmpty(rmvKeys))
@@ -295,7 +299,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
             try (GridCloseableIterator<T> iter = iterator0()) {
                 boolean rmv = false;
 
-                Set<GridCacheSetItemKey> rmvKeys = null;
+                Set<SetItemKey> rmvKeys = null;
 
                 for (T val : iter) {
                     if (!c.contains(val)) {
@@ -331,7 +335,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
             onAccess();
 
             try (GridCloseableIterator<T> iter = iterator0()) {
-                Collection<GridCacheSetItemKey> rmvKeys = new ArrayList<>(BATCH_SIZE);
+                Collection<SetItemKey> rmvKeys = new ArrayList<>(BATCH_SIZE);
 
                 for (T val : iter) {
                     rmvKeys.add(itemKey(val));
@@ -425,7 +429,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
     /**
      * @param keys Keys to remove.
      */
-    private void retryRemoveAll(final Collection<GridCacheSetItemKey> keys) {
+    private void retryRemoveAll(final Collection<SetItemKey> keys) {
         retry(new Callable<Void>() {
             @Override public Void call() throws Exception {
                 cache.removeAll(keys);
@@ -438,7 +442,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
     /**
      * @param keys Keys to remove.
      */
-    private void retryPutAll(final Map<GridCacheSetItemKey, Boolean> keys) {
+    private void retryPutAll(final Map<SetItemKey, Boolean> keys) {
         retry(new Callable<Void>() {
             @Override public Void call() throws Exception {
                 cache.putAll(keys);
@@ -523,8 +527,9 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
      * @param item Set item.
      * @return Item key.
      */
-    private GridCacheSetItemKey itemKey(Object item) {
-        return collocated ? new CollocatedItemKey(name, id, item) : new GridCacheSetItemKey(id, item);
+    private SetItemKey itemKey(Object item) {
+        return collocated ? (binaryMarsh ? new CollocatedSetItemKey(name, id, item) : new CollocatedItemKey(name, id, item))
+            : new GridCacheSetItemKey(id, item);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java
index d025dce..8b47b3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java
@@ -21,7 +21,6 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.cache.GridCacheInternal;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -30,7 +29,7 @@ import org.apache.ignite.lang.IgniteUuid;
 /**
  * Set item key.
  */
-public class GridCacheSetItemKey implements GridCacheInternal, Externalizable {
+public class GridCacheSetItemKey implements SetItemKey, Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -57,27 +56,23 @@ public class GridCacheSetItemKey implements GridCacheInternal, Externalizable {
         this.item = item;
     }
 
-    /**
-     * @return Set UUID.
-     */
-    public IgniteUuid setId() {
+    /** {@inheritDoc} */
+    @Override public IgniteUuid setId() {
         return setId;
     }
 
-    /**
-     * @return Set item.
-     */
-    public Object item() {
+    /** {@inheritDoc} */
+    @Override public Object item() {
         return item;
     }
 
     /** {@inheritDoc} */
     @Override public int hashCode() {
-        int result = setId.hashCode();
+        int res = setId.hashCode();
 
-        result = 31 * result + item.hashCode();
+        res = 31 * res + item.hashCode();
 
-        return result;
+        return res;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
index 4880324..32e94d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
@@ -143,7 +143,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
                         if (idx != null) {
                             checkRemoved(idx);
 
-                            Map<GridCacheQueueItemKey, T> putMap = new HashMap<>();
+                            Map<QueueItemKey, T> putMap = new HashMap<>();
 
                             for (T item : items) {
                                 putMap.put(itemKey(idx), item);

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/QueueItemKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/QueueItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/QueueItemKey.java
new file mode 100644
index 0000000..fe0cef3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/QueueItemKey.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+
+/**
+ *
+ */
+public interface QueueItemKey extends GridCacheInternal {
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/SetItemKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/SetItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/SetItemKey.java
new file mode 100644
index 0000000..759945a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/SetItemKey.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ *
+ */
+public interface SetItemKey extends GridCacheInternal {
+    /**
+     * @return Set UUID.
+     */
+    public IgniteUuid setId();
+
+    /**
+     * @return Set item.
+     */
+    public Object item();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
index b881d90..51a70b9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -139,6 +140,31 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
 
             tx = startTx(txMode);
 
+            TestValue testVal = cache.invoke(key, new UserClassValueProcessor());
+
+            if (tx != null)
+                tx.commit();
+
+            assertEquals("63", testVal.value());
+
+            checkValue(key, 63);
+
+            tx = startTx(txMode);
+
+            Collection<TestValue> testValCol = cache.invoke(key, new CollectionReturnProcessor());
+
+            if (tx != null)
+                tx.commit();
+
+            assertEquals(10, testValCol.size());
+
+            for (TestValue val : testValCol)
+                assertEquals("64", val.value());
+
+            checkValue(key, 63);
+
+            tx = startTx(txMode);
+
             GridTestUtils.assertThrows(log, new Callable<Void>() {
                 @Override public Void call() throws Exception {
                     cache.invoke(key, new ExceptionProcessor(63));
@@ -237,166 +263,226 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
 
         IncrementProcessor incProcessor = new IncrementProcessor();
 
-        Transaction tx = startTx(txMode);
+        {
+            Transaction tx = startTx(txMode);
 
-        Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, incProcessor);
+            Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, incProcessor);
 
-        if (tx != null)
-            tx.commit();
+            if (tx != null)
+                tx.commit();
 
-        Map<Object, Object> exp = new HashMap<>();
+            Map<Object, Object> exp = new HashMap<>();
 
-        for (Integer key : keys)
-            exp.put(key, -1);
+            for (Integer key : keys)
+                exp.put(key, -1);
 
-        checkResult(resMap, exp);
+            checkResult(resMap, exp);
 
-        for (Integer key : keys)
-            checkValue(key, 1);
+            for (Integer key : keys)
+                checkValue(key, 1);
+        }
 
-        tx = startTx(txMode);
+        {
+            Transaction tx = startTx(txMode);
 
-        resMap = cache.invokeAll(keys, incProcessor);
+            Map<Integer, EntryProcessorResult<TestValue>> resMap = cache.invokeAll(keys, new UserClassValueProcessor());
 
-        if (tx != null)
-            tx.commit();
+            if (tx != null)
+                tx.commit();
 
-        exp = new HashMap<>();
+            Map<Object, Object> exp = new HashMap<>();
 
-        for (Integer key : keys)
-            exp.put(key, 1);
+            for (Integer key : keys)
+                exp.put(key, new TestValue("1"));
 
-        checkResult(resMap, exp);
+            checkResult(resMap, exp);
 
-        for (Integer key : keys)
-            checkValue(key, 2);
+            for (Integer key : keys)
+                checkValue(key, 1);
+        }
 
-        tx = startTx(txMode);
+        {
+            Transaction tx = startTx(txMode);
 
-        resMap = cache.invokeAll(keys, new ArgumentsSumProcessor(), 10, 20, 30);
+            Map<Integer, EntryProcessorResult<Collection<TestValue>>> resMap =
+                cache.invokeAll(keys, new CollectionReturnProcessor());
 
-        if (tx != null)
-            tx.commit();
+            if (tx != null)
+                tx.commit();
 
-        for (Integer key : keys)
-            exp.put(key, 3);
+            Map<Object, Object> exp = new HashMap<>();
 
-        checkResult(resMap, exp);
+            for (Integer key : keys) {
+                List<TestValue> expCol = new ArrayList<>();
 
-        for (Integer key : keys)
-            checkValue(key, 62);
+                for (int i = 0; i < 10; i++)
+                    expCol.add(new TestValue("2"));
 
-        tx = startTx(txMode);
+                exp.put(key, expCol);
+            }
 
-        resMap = cache.invokeAll(keys, new ExceptionProcessor(null));
+            checkResult(resMap, exp);
 
-        if (tx != null)
-            tx.commit();
+            for (Integer key : keys)
+                checkValue(key, 1);
+        }
 
-        for (Integer key : keys) {
-            final EntryProcessorResult<Integer> res = resMap.get(key);
+        {
+            Transaction tx = startTx(txMode);
 
-            assertNotNull("No result for " + key);
+            Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, incProcessor);
 
-            GridTestUtils.assertThrows(log, new Callable<Void>() {
-                @Override public Void call() throws Exception {
-                    res.get();
+            if (tx != null)
+                tx.commit();
 
-                    return null;
-                }
-            }, EntryProcessorException.class, "Test processor exception.");
+            Map<Object, Object> exp = new HashMap<>();
+
+            for (Integer key : keys)
+                exp.put(key, 1);
+
+            checkResult(resMap, exp);
+
+            for (Integer key : keys)
+                checkValue(key, 2);
         }
 
-        for (Integer key : keys)
-            checkValue(key, 62);
+        {
+            Transaction tx = startTx(txMode);
 
-        tx = startTx(txMode);
+            Map<Integer, EntryProcessorResult<Integer>> resMap =
+                cache.invokeAll(keys, new ArgumentsSumProcessor(), 10, 20, 30);
 
-        Map<Integer, EntryProcessor<Integer, Integer, Integer>> invokeMap = new HashMap<>();
+            if (tx != null)
+                tx.commit();
+
+            Map<Object, Object> exp = new HashMap<>();
+
+            for (Integer key : keys)
+                exp.put(key, 3);
+
+            checkResult(resMap, exp);
+
+            for (Integer key : keys)
+                checkValue(key, 62);
+        }
+
+        {
+            Transaction tx = startTx(txMode);
 
-        for (Integer key : keys) {
-            switch (key % 4) {
-                case 0: invokeMap.put(key, new IncrementProcessor()); break;
+            Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, new ExceptionProcessor(null));
 
-                case 1: invokeMap.put(key, new RemoveProcessor(62)); break;
+            if (tx != null)
+                tx.commit();
 
-                case 2: invokeMap.put(key, new ArgumentsSumProcessor()); break;
+            for (Integer key : keys) {
+                final EntryProcessorResult<Integer> res = resMap.get(key);
 
-                case 3: invokeMap.put(key, new ExceptionProcessor(62)); break;
+                assertNotNull("No result for " + key);
 
-                default:
-                    fail();
+                GridTestUtils.assertThrows(log, new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        res.get();
+
+                        return null;
+                    }
+                }, EntryProcessorException.class, "Test processor exception.");
             }
+
+            for (Integer key : keys)
+                checkValue(key, 62);
         }
 
-        resMap = cache.invokeAll(invokeMap, 10, 20, 30);
+        {
+            Transaction tx = startTx(txMode);
 
-        if (tx != null)
-            tx.commit();
+            Map<Integer, EntryProcessor<Integer, Integer, Integer>> invokeMap = new HashMap<>();
 
-        for (Integer key : keys) {
-            final EntryProcessorResult<Integer> res = resMap.get(key);
+            for (Integer key : keys) {
+                switch (key % 4) {
+                    case 0: invokeMap.put(key, new IncrementProcessor()); break;
 
-            switch (key % 4) {
-                case 0: {
-                    assertNotNull("No result for " + key, res);
+                    case 1: invokeMap.put(key, new RemoveProcessor(62)); break;
 
-                    assertEquals(62, (int)res.get());
+                    case 2: invokeMap.put(key, new ArgumentsSumProcessor()); break;
 
-                    checkValue(key, 63);
+                    case 3: invokeMap.put(key, new ExceptionProcessor(62)); break;
 
-                    break;
+                    default:
+                        fail();
                 }
+            }
 
-                case 1: {
-                    assertNull(res);
+            Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(invokeMap, 10, 20, 30);
 
-                    checkValue(key, null);
+            if (tx != null)
+                tx.commit();
 
-                    break;
-                }
+            for (Integer key : keys) {
+                final EntryProcessorResult<Integer> res = resMap.get(key);
 
-                case 2: {
-                    assertNotNull("No result for " + key, res);
+                switch (key % 4) {
+                    case 0: {
+                        assertNotNull("No result for " + key, res);
 
-                    assertEquals(3, (int)res.get());
+                        assertEquals(62, (int)res.get());
 
-                    checkValue(key, 122);
+                        checkValue(key, 63);
 
-                    break;
-                }
+                        break;
+                    }
+
+                    case 1: {
+                        assertNull(res);
+
+                        checkValue(key, null);
+
+                        break;
+                    }
+
+                    case 2: {
+                        assertNotNull("No result for " + key, res);
+
+                        assertEquals(3, (int)res.get());
+
+                        checkValue(key, 122);
 
-                case 3: {
-                    assertNotNull("No result for " + key, res);
+                        break;
+                    }
 
-                    GridTestUtils.assertThrows(log, new Callable<Void>() {
-                        @Override public Void call() throws Exception {
-                            res.get();
+                    case 3: {
+                        assertNotNull("No result for " + key, res);
 
-                            return null;
-                        }
-                    }, EntryProcessorException.class, "Test processor exception.");
+                        GridTestUtils.assertThrows(log, new Callable<Void>() {
+                            @Override public Void call() throws Exception {
+                                res.get();
 
-                    checkValue(key, 62);
+                                return null;
+                            }
+                        }, EntryProcessorException.class, "Test processor exception.");
 
-                    break;
+                        checkValue(key, 62);
+
+                        break;
+                    }
                 }
             }
         }
 
         cache.invokeAll(keys, new IncrementProcessor());
 
-        tx = startTx(txMode);
+        {
+            Transaction tx = startTx(txMode);
 
-        resMap = cache.invokeAll(keys, new RemoveProcessor(null));
+            Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, new RemoveProcessor(null));
 
-        if (tx != null)
-            tx.commit();
+            if (tx != null)
+                tx.commit();
 
-        assertEquals("Unexpected results: " + resMap, 0, resMap.size());
+            assertEquals("Unexpected results: " + resMap, 0, resMap.size());
 
-        for (Integer key : keys)
-            checkValue(key, null);
+            for (Integer key : keys)
+                checkValue(key, null);
+        }
 
         IgniteCache<Integer, Integer> asyncCache = cache.withAsync();
 
@@ -406,9 +492,9 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
 
         IgniteFuture<Map<Integer, EntryProcessorResult<Integer>>> fut = asyncCache.future();
 
-        resMap = fut.get();
+        Map<Integer, EntryProcessorResult<Integer>> resMap = fut.get();
 
-        exp = new HashMap<>();
+        Map<Object, Object> exp = new HashMap<>();
 
         for (Integer key : keys)
             exp.put(key, -1);
@@ -418,7 +504,7 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
         for (Integer key : keys)
             checkValue(key, 1);
 
-        invokeMap = new HashMap<>();
+        Map<Integer, EntryProcessor<Integer, Integer, Integer>> invokeMap = new HashMap<>();
 
         for (Integer key : keys)
             invokeMap.put(key, incProcessor);
@@ -442,15 +528,16 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
      * @param resMap Result map.
      * @param exp Expected results.
      */
-    private void checkResult(Map<Integer, EntryProcessorResult<Integer>> resMap, Map<Object, Object> exp) {
+    @SuppressWarnings("unchecked")
+    private void checkResult(Map resMap, Map<Object, Object> exp) {
         assertNotNull(resMap);
 
         assertEquals(exp.size(), resMap.size());
 
         for (Map.Entry<Object, Object> expVal : exp.entrySet()) {
-            EntryProcessorResult<Integer> res = resMap.get(expVal.getKey());
+            EntryProcessorResult<?> res = (EntryProcessorResult)resMap.get(expVal.getKey());
 
-            assertNotNull("No result for " + expVal.getKey());
+            assertNotNull("No result for " + expVal.getKey(), res);
 
             assertEquals("Unexpected result for " + expVal.getKey(), res.get(), expVal.getValue());
         }
@@ -557,6 +644,44 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
     /**
      *
      */
+    protected static class UserClassValueProcessor implements EntryProcessor<Integer, Integer, TestValue> {
+        /** {@inheritDoc} */
+        @Override public TestValue process(MutableEntry<Integer, Integer> e, Object... arguments)
+            throws EntryProcessorException {
+            return new TestValue(String.valueOf(e.getValue()));
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(UserClassValueProcessor.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class CollectionReturnProcessor implements
+        EntryProcessor<Integer, Integer, Collection<TestValue>> {
+        /** {@inheritDoc} */
+        @Override public Collection<TestValue> process(MutableEntry<Integer, Integer> e, Object... arguments)
+            throws EntryProcessorException {
+            List<TestValue> vals = new ArrayList<>();
+
+            for (int i = 0; i < 10; i++)
+                vals.add(new TestValue(String.valueOf(e.getValue() + 1)));
+
+            return vals;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CollectionReturnProcessor.class, this);
+        }
+    }
+
+    /**
+     *
+     */
     protected static class IncrementProcessor implements EntryProcessor<Integer, Integer, Integer> {
         /** {@inheritDoc} */
         @Override public Integer process(MutableEntry<Integer, Integer> e,
@@ -656,4 +781,50 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
             return S.toString(ExceptionProcessor.class, this);
         }
     }
+
+    /**
+     *
+     */
+    static class TestValue {
+        /** */
+        private String val;
+
+        /**
+         * @param val Value.
+         */
+        public TestValue(String val) {
+            this.val = val;
+        }
+
+        /**
+         * @return Value.
+         */
+        public String value() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestValue testVal = (TestValue) o;
+
+            return val.equals(testVal.val);
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return val.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestValue.class, this);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 2751de1..ef96d9f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -170,11 +170,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
             while (U.currentTimeMillis() < stopTime)
                 assertEquals(10, atomic.get());
         }
-        catch (IgniteException e) {
-            if (X.hasCause(e, ClusterTopologyServerNotFoundException.class))
-                return;
-
-            throw e;
+        catch (IgniteException ignore) {
+            return; // Test that client does not hang.
         }
 
         fail();

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
index cf638df..6366f09 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
@@ -244,15 +244,27 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
     }
 
     /**
-     * JUnit.
-     *
      * @throws Exception If failed.
      */
     public void testIterator() throws Exception {
+        checkIterator(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIteratorCollocated() throws Exception {
+        checkIterator(true);
+    }
+
+    /**
+     * @param collocated Collocated flag.
+     */
+    private void checkIterator(boolean collocated) {
         // Random queue name.
         String queueName = UUID.randomUUID().toString();
 
-        IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
+        IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(collocated));
 
         for (int i = 0; i < 100; i++)
             assert queue.add(Integer.toString(i));

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java
index 74c9a4f..ca57205 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java
@@ -31,7 +31,7 @@ import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
-import org.apache.ignite.internal.processors.datastructures.GridCacheSetItemKey;
+import org.apache.ignite.internal.processors.datastructures.SetItemKey;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -183,8 +183,8 @@ public abstract class GridCacheSetFailoverAbstractSelfTest extends IgniteCollect
                     if (entry.hasValue()) {
                         cnt++;
 
-                        if (entry.key() instanceof GridCacheSetItemKey) {
-                            GridCacheSetItemKey setItem = (GridCacheSetItemKey)entry.key();
+                        if (entry.key() instanceof SetItemKey) {
+                            SetItemKey setItem = (SetItemKey)entry.key();
 
                             if (setIds.add(setItem.setId()))
                                 log.info("Unexpected set item [setId=" + setItem.setId() +

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueApiSelfTest.java
index 2420153..de2fa07 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueApiSelfTest.java
@@ -31,6 +31,11 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED;
  */
 public class GridCachePartitionedQueueApiSelfTest extends GridCacheQueueApiSelfAbstractTest {
     /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
     @Override protected CacheMode collectionCacheMode() {
         return PARTITIONED;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
index 1d225a6..db11291 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
@@ -90,7 +90,7 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection
      * @throws Exception If failed.
      */
     public void testQueue() throws Exception {
-        final String queueName = "queue-test-name";
+        final String queueName = "q";
 
         System.out.println(U.filler(20, '\n'));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedQueueNoBackupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedQueueNoBackupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedQueueNoBackupsTest.java
new file mode 100644
index 0000000..880c638
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedQueueNoBackupsTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures.partitioned;
+
+import java.util.Iterator;
+import java.util.UUID;
+import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.testframework.GridTestUtils;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class IgnitePartitionedQueueNoBackupsTest extends GridCachePartitionedQueueApiSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode collectionCacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode collectionMemoryMode() {
+        return ONHEAP_TIERED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode collectionCacheAtomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CollectionConfiguration collectionConfiguration() {
+        CollectionConfiguration colCfg = super.collectionConfiguration();
+
+        colCfg.setBackups(0);
+
+        return colCfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCollocation() throws Exception {
+        IgniteQueue<Integer> queue = grid(0).queue("queue", 0, config(true));
+
+        for (int i = 0; i < 1000; i++)
+            assertTrue(queue.add(i));
+
+        assertEquals(1000, queue.size());
+
+        GridCacheContext cctx = GridTestUtils.getFieldValue(queue, "cctx");
+
+        UUID setNodeId = null;
+
+        for (int i = 0; i < gridCount(); i++) {
+            IgniteKernal grid = (IgniteKernal)grid(i);
+
+            Iterator<GridCacheEntryEx> entries =
+                grid.context().cache().internalCache(cctx.name()).map().allEntries0().iterator();
+
+            if (entries.hasNext()) {
+                if (setNodeId == null)
+                    setNodeId = grid.localNode().id();
+                else
+                    fail("For collocated queue all items should be stored on single node.");
+            }
+        }
+    }}

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedQueueApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedQueueApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedQueueApiSelfTest.java
index 1aea6d9..bad37a9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedQueueApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedQueueApiSelfTest.java
@@ -31,6 +31,11 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED;
  */
 public class GridCacheReplicatedQueueApiSelfTest extends GridCacheQueueApiSelfAbstractTest {
     /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
     @Override protected CacheMode collectionCacheMode() {
         return REPLICATED;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
index 4a5141e..e9674f3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
@@ -114,7 +114,7 @@ public abstract class GridCacheWriteBehindStoreAbstractTest extends GridCommonAb
 
         Map<Integer, String> map = store.getMap();
 
-        assert map.isEmpty();
+        assert map.isEmpty() : map;
 
         Transaction tx = grid().transactions().txStart(OPTIMISTIC, REPEATABLE_READ);
 


Mime
View raw message