ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [34/50] [abbrv] incubator-ignite git commit: # ignite-6
Date Thu, 05 Feb 2015 16:25:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aaaebbd3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
new file mode 100644
index 0000000..305decc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -0,0 +1,1662 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.transactions.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.internal.processors.cache.CacheFlag.*;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
+import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.*;
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+
+/**
+ * Manager of data structures.
+ */
+public final class DataStructuresProcessor extends GridProcessorAdapter {
+    /** */
+    public static final CacheDataStructuresConfigurationKey DATA_STRUCTURES_KEY =
+        new CacheDataStructuresConfigurationKey();
+
+    /** Initial capacity. */
+    private static final int INITIAL_CAPACITY = 10;
+
+    /** */
+    private static final int MAX_UPDATE_RETRIES = 100;
+
+    /** */
+    private static final long RETRY_DELAY = 1;
+
+    /** Cache contains only {@code GridCacheInternal,GridCacheInternal}. */
+    private CacheProjection<GridCacheInternal, GridCacheInternal> dsView;
+
+    /** Internal storage of all dataStructures items (sequence, atomic long etc.). */
+    private final ConcurrentMap<GridCacheInternal, GridCacheRemovable> dsMap;
+
+    /** Cache contains only {@code GridCacheAtomicValue}. */
+    private CacheProjection<GridCacheInternalKey, GridCacheAtomicLongValue> atomicLongView;
+
+    /** Cache contains only {@code GridCacheCountDownLatchValue}. */
+    private CacheProjection<GridCacheInternalKey, GridCacheCountDownLatchValue> cntDownLatchView;
+
+    /** Cache contains only {@code GridCacheAtomicReferenceValue}. */
+    private CacheProjection<GridCacheInternalKey, GridCacheAtomicReferenceValue> atomicRefView;
+
+    /** Cache contains only {@code GridCacheAtomicStampedValue}. */
+    private CacheProjection<GridCacheInternalKey, GridCacheAtomicStampedValue> atomicStampedView;
+
+    /** Cache contains only entry {@code GridCacheSequenceValue}.  */
+    private CacheProjection<GridCacheInternalKey, GridCacheAtomicSequenceValue> seqView;
+
+    /** Cache context for atomic data structures. */
+    private GridCacheContext dsCacheCtx;
+
+    /** Atomic data structures configuration. */
+    private final IgniteAtomicConfiguration atomicCfg;
+
+    /** */
+    private GridCacheProjectionEx<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> utilityCache;
+
+    /**
+     * @param ctx Context.
+     */
+    public DataStructuresProcessor(GridKernalContext ctx) {
+        super(ctx);
+
+        dsMap = new ConcurrentHashMap8<>(INITIAL_CAPACITY);
+
+        atomicCfg = ctx.config().getAtomicConfiguration();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void start() throws IgniteCheckedException {
+        if (ctx.config().isDaemon())
+            return;
+
+        utilityCache = (GridCacheProjectionEx)ctx.cache().utilityCache();
+
+        assert utilityCache != null;
+
+        if (atomicCfg != null) {
+            GridCache atomicsCache = ctx.cache().atomicsCache();
+
+            assert atomicsCache != null;
+
+            dsView = atomicsCache.projection(GridCacheInternal.class, GridCacheInternal.class).flagsOn(CLONE);
+
+            cntDownLatchView = atomicsCache.projection
+                (GridCacheInternalKey.class, GridCacheCountDownLatchValue.class).flagsOn(CLONE);
+
+            atomicLongView = atomicsCache.projection
+                (GridCacheInternalKey.class, GridCacheAtomicLongValue.class).flagsOn(CLONE);
+
+            atomicRefView = atomicsCache.projection
+                (GridCacheInternalKey.class, GridCacheAtomicReferenceValue.class).flagsOn(CLONE);
+
+            atomicStampedView = atomicsCache.projection
+                (GridCacheInternalKey.class, GridCacheAtomicStampedValue.class).flagsOn(CLONE);
+
+            seqView = atomicsCache.projection
+                (GridCacheInternalKey.class, GridCacheAtomicSequenceValue.class).flagsOn(CLONE);
+
+            dsCacheCtx = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME).context();
+        }
+    }
+
+    /**
+     * Gets a sequence from cache or creates one if it's not cached.
+     *
+     * @param name Sequence name.
+     * @param initVal Initial value for sequence. If sequence already cached, {@code initVal} will be ignored.
+     * @param create  If {@code true} sequence will be created in case it is not in cache.
+     * @return Sequence.
+     * @throws IgniteCheckedException If loading failed.
+     */
+    public final IgniteAtomicSequence sequence(final String name,
+        final long initVal,
+        final boolean create)
+        throws IgniteCheckedException
+    {
+        A.notNull(name, "name");
+
+        checkAtomicsConfiguration();
+
+        return getAtomic(new IgniteOutClosureX<IgniteAtomicSequence>() {
+            @Override public IgniteAtomicSequence applyx() throws IgniteCheckedException {
+                GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
+
+                dsCacheCtx.gate().enter();
+
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    GridCacheAtomicSequenceValue seqVal = cast(dsView.get(key), GridCacheAtomicSequenceValue.class);
+
+                    // Check that sequence hasn't been created in other thread yet.
+                    GridCacheAtomicSequenceEx seq = cast(dsMap.get(key), GridCacheAtomicSequenceEx.class);
+
+                    if (seq != null) {
+                        assert seqVal != null;
+
+                        return seq;
+                    }
+
+                    if (seqVal == null && !create)
+                        return null;
+
+                    // We should use offset because we already reserved left side of range.
+                    long off = atomicCfg.getAtomicSequenceReserveSize() > 1 ?
+                        atomicCfg.getAtomicSequenceReserveSize() - 1 : 1;
+
+                    long upBound;
+                    long locCntr;
+
+                    if (seqVal == null) {
+                        locCntr = initVal;
+
+                        upBound = locCntr + off;
+
+                        // Global counter must be more than reserved region.
+                        seqVal = new GridCacheAtomicSequenceValue(upBound + 1);
+                    }
+                    else {
+                        locCntr = seqVal.get();
+
+                        upBound = locCntr + off;
+
+                        // Global counter must be more than reserved region.
+                        seqVal.set(upBound + 1);
+                    }
+
+                    // Update global counter.
+                    dsView.putx(key, seqVal);
+
+                    // Only one thread can be in the transaction scope and create sequence.
+                    seq = new GridCacheAtomicSequenceImpl(name,
+                        key,
+                        seqView,
+                        dsCacheCtx,
+                        atomicCfg.getAtomicSequenceReserveSize(),
+                        locCntr,
+                        upBound);
+
+                    dsMap.put(key, seq);
+
+                    tx.commit();
+
+                    return seq;
+                }
+                catch (Error | Exception e) {
+                    dsMap.remove(key);
+
+                    U.error(log, "Failed to make atomic sequence: " + name, e);
+
+                    throw e;
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, new DataStructureInfo(name, ATOMIC_SEQ, null), create, IgniteAtomicSequence.class);
+    }
+
+    /**
+     * Removes sequence from cache.
+     *
+     * @param name Sequence name.
+     * @throws IgniteCheckedException If removing failed.
+     */
+    public final void removeSequence(final String name) throws IgniteCheckedException {
+        assert name != null;
+
+        checkAtomicsConfiguration();
+
+        removeDataStructure(new IgniteCallable<Void>() {
+            @Override public Void call() throws Exception {
+                dsCacheCtx.gate().enter();
+
+                try {
+                    GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+
+                    removeInternal(key, GridCacheAtomicSequenceValue.class);
+                }
+                catch (Exception e) {
+                    throw new IgniteCheckedException("Failed to remove sequence by name: " + name, e);
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+
+                return null;
+            }
+        }, name, ATOMIC_SEQ, null);
+    }
+
+    /**
+     * Gets an atomic long from cache or creates one if it's not cached.
+     *
+     * @param name Name of atomic long.
+     * @param initVal Initial value for atomic long. If atomic long already cached, {@code initVal}
+     *        will be ignored.
+     * @param create If {@code true} atomic long will be created in case it is not in cache.
+     * @return Atomic long.
+     * @throws IgniteCheckedException If loading failed.
+     */
+    public final IgniteAtomicLong atomicLong(final String name,
+        final long initVal,
+        final boolean create) throws IgniteCheckedException {
+        A.notNull(name, "name");
+
+        checkAtomicsConfiguration();
+
+        return getAtomic(new IgniteOutClosureX<IgniteAtomicLong>() {
+            @Override public IgniteAtomicLong applyx() throws IgniteCheckedException {
+                final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
+
+                dsCacheCtx.gate().enter();
+
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    GridCacheAtomicLongValue val = cast(dsView.get(key), GridCacheAtomicLongValue.class);
+
+                    // Check that atomic long hasn't been created in other thread yet.
+                    GridCacheAtomicLongEx a = cast(dsMap.get(key), GridCacheAtomicLongEx.class);
+
+                    if (a != null) {
+                        assert val != null;
+
+                        return a;
+                    }
+
+                    if (val == null && !create)
+                        return null;
+
+                    if (val == null) {
+                        val = new GridCacheAtomicLongValue(initVal);
+
+                        dsView.putx(key, val);
+                    }
+
+                    a = new GridCacheAtomicLongImpl(name, key, atomicLongView, dsCacheCtx);
+
+                    dsMap.put(key, a);
+
+                    tx.commit();
+
+                    return a;
+                }
+                catch (Error | Exception e) {
+                    dsMap.remove(key);
+
+                    U.error(log, "Failed to make atomic long: " + name, e);
+
+                    throw e;
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, new DataStructureInfo(name, ATOMIC_LONG, null), create, IgniteAtomicLong.class);
+    }
+
+    /**
+     * @param c Closure creating data structure instance.
+     * @param dsInfo Data structure info.
+     * @param create Create flag.
+     * @param cls Expected data structure class.
+     * @return Data structure instance.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable private <T> T getAtomic(final IgniteOutClosureX<T> c,
+        DataStructureInfo dsInfo,
+        boolean create,
+        Class<? extends T> cls)
+        throws IgniteCheckedException
+    {
+        Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY);
+
+        if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name)))
+            return null;
+
+        IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, create);
+
+        if (err != null)
+            throw err;
+
+        final GridCacheInternalKey key = new GridCacheInternalKeyImpl(dsInfo.name);
+
+        // Check type of structure received by key from local cache.
+        T dataStructure = cast(this.dsMap.get(key), cls);
+
+        if (dataStructure != null)
+            return dataStructure;
+
+        if (!create)
+            return c.applyx();
+
+        try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+            err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
+
+            if (err != null)
+                throw err;
+
+            dataStructure = ctx.closure().callLocalSafe(new Callable<T>() {
+                @Override public T call() throws Exception {
+                    return c.applyx();
+                }
+            }, false).get();
+
+            tx.commit();
+        }
+
+        return dataStructure;
+    }
+
+    /**
+     * Removes atomic long from cache.
+     *
+     * @param name Atomic long name.
+     * @throws IgniteCheckedException If removing failed.
+     */
+    public final void removeAtomicLong(final String name) throws IgniteCheckedException {
+        assert name != null;
+        assert dsCacheCtx != null;
+
+        removeDataStructure(new IgniteCallable<Void>() {
+            @Override public Void call() throws Exception {
+                dsCacheCtx.gate().enter();
+
+                try {
+                    removeInternal(new GridCacheInternalKeyImpl(name), GridCacheAtomicLongValue.class);
+                }
+                catch (Exception e) {
+                    throw new IgniteCheckedException("Failed to remove atomic long by name: " + name, e);
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+
+                return null;
+            }
+        }, name, ATOMIC_LONG, null);
+    }
+
+    /**
+     * @param c Closure.
+     * @param name Data structure name.
+     * @param type Data structure type.
+     * @param afterRmv Optional closure to run after data structure removed.
+     * @throws IgniteCheckedException If failed.
+     */
+    private <T> void removeDataStructure(IgniteCallable<T> c,
+        String name,
+        DataStructureType type,
+        @Nullable IgniteInClosureX<T> afterRmv)
+        throws IgniteCheckedException
+    {
+        Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY);
+
+        if (dsMap == null || !dsMap.containsKey(name))
+            return;
+
+        DataStructureInfo dsInfo = new DataStructureInfo(name, type, null);
+
+        IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, false);
+
+        if (err != null)
+            throw err;
+
+        T rmvInfo;
+
+        try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+            T2<Boolean, IgniteCheckedException> res =
+                utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get();
+
+            err = res.get2();
+
+            if (err != null)
+                throw err;
+
+            assert res.get1() != null;
+
+            boolean exists = res.get1();
+
+            if (!exists)
+                return;
+
+            rmvInfo = ctx.closure().callLocalSafe(c, false).get();
+
+            tx.commit();
+        }
+
+        if (afterRmv != null && rmvInfo != null)
+            afterRmv.applyx(rmvInfo);
+    }
+
+    /**
+     * Gets an atomic reference from cache or creates one if it's not cached.
+     *
+     * @param name Name of atomic reference.
+     * @param initVal Initial value for atomic reference. If atomic reference already cached, {@code initVal}
+     *        will be ignored.
+     * @param create If {@code true} atomic reference will be created in case it is not in cache.
+     * @return Atomic reference.
+     * @throws IgniteCheckedException If loading failed.
+     */
+    @SuppressWarnings("unchecked")
+    public final <T> IgniteAtomicReference<T> atomicReference(final String name,
+        final T initVal,
+        final boolean create)
+        throws IgniteCheckedException
+    {
+        A.notNull(name, "name");
+
+        checkAtomicsConfiguration();
+
+        return getAtomic(new IgniteOutClosureX<IgniteAtomicReference>() {
+            @Override public IgniteAtomicReference<T> applyx() throws IgniteCheckedException {
+                GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
+
+                dsCacheCtx.gate().enter();
+
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    GridCacheAtomicReferenceValue val = cast(dsView.get(key),
+                        GridCacheAtomicReferenceValue.class);
+
+                    // Check that atomic reference hasn't been created in other thread yet.
+                    GridCacheAtomicReferenceEx ref = cast(dsMap.get(key),
+                        GridCacheAtomicReferenceEx.class);
+
+                    if (ref != null) {
+                        assert val != null;
+
+                        return ref;
+                    }
+
+                    if (val == null && !create)
+                        return null;
+
+                    if (val == null) {
+                        val = new GridCacheAtomicReferenceValue(initVal);
+
+                        dsView.putx(key, val);
+                    }
+
+                    ref = new GridCacheAtomicReferenceImpl(name, key, atomicRefView, dsCacheCtx);
+
+                    dsMap.put(key, ref);
+
+                    tx.commit();
+
+                    return ref;
+                }
+                catch (Error | Exception e) {
+                    dsMap.remove(key);
+
+                    U.error(log, "Failed to make atomic reference: " + name, e);
+
+                    throw e;
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, new DataStructureInfo(name, ATOMIC_REF, null), create, IgniteAtomicReference.class);
+    }
+
+    /**
+     * Removes atomic reference from cache.
+     *
+     * @param name Atomic reference name.
+     * @throws IgniteCheckedException If removing failed.
+     */
+    public final void removeAtomicReference(final String name) throws IgniteCheckedException {
+        assert name != null;
+        assert dsCacheCtx != null;
+
+        removeDataStructure(new IgniteCallable<Void>() {
+            @Override public Void call() throws Exception {
+                dsCacheCtx.gate().enter();
+
+                try {
+                    GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+
+                    removeInternal(key, GridCacheAtomicReferenceValue.class);
+                }
+                catch (Exception e) {
+                    throw new IgniteCheckedException("Failed to remove atomic reference by name: " + name, e);
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+
+                return null;
+            }
+        }, name, ATOMIC_REF, null);
+    }
+
+    /**
+     * Gets an atomic stamped from cache or creates one if it's not cached.
+     *
+     * @param name Name of atomic stamped.
+     * @param initVal Initial value for atomic stamped. If atomic stamped already cached, {@code initVal}
+     *        will be ignored.
+     * @param initStamp Initial stamp for atomic stamped. If atomic stamped already cached, {@code initStamp}
+     *        will be ignored.
+     * @param create If {@code true} atomic stamped will be created in case it is not in cache.
+     * @return Atomic stamped.
+     * @throws IgniteCheckedException If loading failed.
+     */
+    @SuppressWarnings("unchecked")
+    public final <T, S> IgniteAtomicStamped<T, S> atomicStamped(final String name, final T initVal,
+        final S initStamp, final boolean create) throws IgniteCheckedException {
+        A.notNull(name, "name");
+
+        checkAtomicsConfiguration();
+
+        return getAtomic(new IgniteOutClosureX<IgniteAtomicStamped>() {
+            @Override public IgniteAtomicStamped<T, S> applyx() throws IgniteCheckedException {
+                GridCacheInternalKeyImpl key = new GridCacheInternalKeyImpl(name);
+
+                dsCacheCtx.gate().enter();
+
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    GridCacheAtomicStampedValue val = cast(dsView.get(key),
+                        GridCacheAtomicStampedValue.class);
+
+                    // Check that atomic stamped hasn't been created in other thread yet.
+                    GridCacheAtomicStampedEx stmp = cast(dsMap.get(key),
+                        GridCacheAtomicStampedEx.class);
+
+                    if (stmp != null) {
+                        assert val != null;
+
+                        return stmp;
+                    }
+
+                    if (val == null && !create)
+                        return null;
+
+                    if (val == null) {
+                        val = new GridCacheAtomicStampedValue(initVal, initStamp);
+
+                        dsView.putx(key, val);
+                    }
+
+                    stmp = new GridCacheAtomicStampedImpl(name, key, atomicStampedView, dsCacheCtx);
+
+                    dsMap.put(key, stmp);
+
+                    tx.commit();
+
+                    return stmp;
+                }
+                catch (Error | Exception e) {
+                    dsMap.remove(key);
+
+                    U.error(log, "Failed to make atomic stamped: " + name, e);
+
+                    throw e;
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, new DataStructureInfo(name, ATOMIC_STAMPED, null), create, IgniteAtomicStamped.class);
+    }
+
+    /**
+     * Removes atomic stamped from cache.
+     *
+     * @param name Atomic stamped name.
+     * @throws IgniteCheckedException If removing failed.
+     */
+    public final void removeAtomicStamped(final String name) throws IgniteCheckedException {
+        assert name != null;
+        assert dsCacheCtx != null;
+
+        removeDataStructure(new IgniteCallable<Void>() {
+            @Override public Void call() throws Exception {
+                dsCacheCtx.gate().enter();
+
+                try {
+                    GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+
+                    removeInternal(key, GridCacheAtomicStampedValue.class);
+                }
+                catch (Exception e) {
+                    throw new IgniteCheckedException("Failed to remove atomic stamped by name: " + name, e);
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+
+                return null;
+            }
+        }, name, ATOMIC_STAMPED, null);
+    }
+
+    /**
+     * Gets a queue from cache or creates one if it's not cached.
+     *
+     * @param name Name of queue.
+     * @param cap Max size of queue.
+     * @param cfg Non-null queue configuration if new queue should be created.
+     * @return Instance of queue.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public final <T> IgniteQueue<T> queue(final String name,
+        int cap,
+        @Nullable final IgniteCollectionConfiguration cfg)
+        throws IgniteCheckedException {
+        A.notNull(name, "name");
+
+        if (cfg != null) {
+            if (cap <= 0)
+                cap = Integer.MAX_VALUE;
+
+            if (ctx.cache().publicCache(cfg.getCacheName()) == null)
+                throw new IgniteCheckedException("Cache for collection is not configured: " + cfg.getCacheName());
+        }
+
+        DataStructureInfo dsInfo = new DataStructureInfo(name,
+            QUEUE,
+            cfg != null ? new QueueInfo(cfg.getCacheName(), cfg.isCollocated(), cap) : null);
+
+        final int cap0 = cap;
+
+        final boolean create = cfg != null;
+
+        return getCollection(new IgniteClosureX<GridCacheContext, IgniteQueue<T>>() {
+            @Override public IgniteQueue<T> applyx(GridCacheContext ctx) throws IgniteCheckedException {
+                return ctx.dataStructures().queue(name, cap0, create && cfg.isCollocated(), create);
+            }
+        }, dsInfo, create);
+    }
+
+    /**
+     * @param name Queue name.
+     * @param cctx Queue cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void removeQueue(final String name, final GridCacheContext cctx) throws IgniteCheckedException {
+        assert name != null;
+        assert cctx != null;
+
+        IgniteCallable<GridCacheQueueHeader> rmv = new IgniteCallable<GridCacheQueueHeader>() {
+            @Override public GridCacheQueueHeader call() throws Exception {
+                return (GridCacheQueueHeader)retryRemove(cctx.cache(), new GridCacheQueueHeaderKey(name));
+            }
+        };
+
+        CIX1<GridCacheQueueHeader> afterRmv = new CIX1<GridCacheQueueHeader>() {
+            @Override public void applyx(GridCacheQueueHeader hdr) throws IgniteCheckedException {
+                if (hdr.empty())
+                    return;
+
+                GridCacheQueueAdapter.removeKeys(cctx.cache(),
+                    hdr.id(),
+                    name,
+                    hdr.collocated(),
+                    hdr.head(),
+                    hdr.tail(),
+                    0);
+            }
+        };
+
+        removeDataStructure(rmv, name, QUEUE, afterRmv);
+    }
+
+    /**
+     * @param c Closure creating collection.
+     * @param dsInfo Data structure info.
+     * @param create Create flag.
+     * @return Collection instance.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable private <T> T getCollection(final IgniteClosureX<GridCacheContext, T> c,
+        DataStructureInfo dsInfo,
+        boolean create)
+        throws IgniteCheckedException
+    {
+        Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY);
+
+        if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name)))
+            return null;
+
+        IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, create);
+
+        if (err != null)
+            throw err;
+
+        if (!create) {
+            DataStructureInfo oldInfo = dsMap.get(dsInfo.name);
+
+            assert oldInfo.info instanceof CollectionInfo : oldInfo.info;
+
+            String cacheName = ((CollectionInfo)oldInfo.info).cacheName;
+
+            GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context();
+
+            return c.applyx(cacheCtx);
+        }
+
+        T col;
+
+        try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+            T2<String, IgniteCheckedException> res =
+                utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get();
+
+            err = res.get2();
+
+            if (err != null)
+                throw err;
+
+            String cacheName = res.get1();
+
+            final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context();
+
+            col = ctx.closure().callLocalSafe(new Callable<T>() {
+                @Override public T call() throws Exception {
+                    return c.applyx(cacheCtx);
+                }
+            }, false).get();
+
+            tx.commit();
+        }
+
+        return col;
+    }
+
+    /**
+     * @param dsMap Map with data structure information.
+     * @param info New data structure information.
+     * @param create Create flag.
+     * @return {@link IgniteException} if validation failed.
+     */
+    @Nullable private static IgniteCheckedException validateDataStructure(
+        @Nullable Map<String, DataStructureInfo> dsMap,
+        DataStructureInfo info,
+        boolean create)
+    {
+        if (dsMap == null)
+            return null;
+
+        DataStructureInfo oldInfo = dsMap.get(info.name);
+
+        if (oldInfo != null)
+            return oldInfo.validate(info, create);
+
+        return null;
+    }
+
+    /**
+     * Gets or creates count down latch. If count down latch is not found in cache,
+     * it is created using provided name and count parameter.
+     *
+     * @param name Name of the latch.
+     * @param cnt Initial count.
+     * @param autoDel {@code True} to automatically delete latch from cache when
+     *      its count reaches zero.
+     * @param create If {@code true} latch will be created in case it is not in cache,
+     *      if it is {@code false} all parameters except {@code name} are ignored.
+     * @return Count down latch for the given name or {@code null} if it is not found and
+     *      {@code create} is false.
+     * @throws IgniteCheckedException If operation failed.
+     */
+    public IgniteCountDownLatch countDownLatch(final String name,
+        final int cnt,
+        final boolean autoDel,
+        final boolean create)
+        throws IgniteCheckedException
+    {
+        A.notNull(name, "name");
+
+        if (create)
+            A.ensure(cnt >= 0, "count can not be negative");
+
+        checkAtomicsConfiguration();
+
+        return getAtomic(new IgniteOutClosureX<IgniteCountDownLatch>() {
+            @Override public IgniteCountDownLatch applyx() throws IgniteCheckedException {
+                GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
+
+                dsCacheCtx.gate().enter();
+
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    GridCacheCountDownLatchValue val = cast(dsView.get(key),
+                        GridCacheCountDownLatchValue.class);
+
+                    // Check that count down hasn't been created in other thread yet.
+                    GridCacheCountDownLatchEx latch = cast(dsMap.get(key), GridCacheCountDownLatchEx.class);
+
+                    if (latch != null) {
+                        assert val != null;
+
+                        return latch;
+                    }
+
+                    if (val == null && !create)
+                        return null;
+
+                    if (val == null) {
+                        val = new GridCacheCountDownLatchValue(cnt, autoDel);
+
+                        dsView.putx(key, val);
+                    }
+
+                    latch = new GridCacheCountDownLatchImpl(name, val.get(), val.initialCount(),
+                        val.autoDelete(), key, cntDownLatchView, dsCacheCtx);
+
+                    dsMap.put(key, latch);
+
+                    tx.commit();
+
+                    return latch;
+                }
+                catch (Error | Exception e) {
+                    dsMap.remove(key);
+
+                    U.error(log, "Failed to create count down latch: " + name, e);
+
+                    throw e;
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, new DataStructureInfo(name, COUNT_DOWN_LATCH, null), create, GridCacheCountDownLatchEx.class);
+    }
+
+    /**
+     * Removes count down latch from cache.
+     *
+     * @param name Name of the latch.
+     * @throws IgniteCheckedException If operation failed.
+     */
+    public void removeCountDownLatch(final String name) throws IgniteCheckedException {
+        assert name != null;
+        assert dsCacheCtx != null;
+
+        removeDataStructure(new IgniteCallable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+
+                dsCacheCtx.gate().enter();
+
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    // Check correctness type of removable object.
+                    GridCacheCountDownLatchValue val =
+                            cast(dsView.get(key), GridCacheCountDownLatchValue.class);
+
+                    if (val != null) {
+                        if (val.get() > 0) {
+                            throw new IgniteCheckedException("Failed to remove count down latch " +
+                                    "with non-zero count: " + val.get());
+                        }
+
+                        dsView.removex(key);
+
+                        tx.commit();
+                    } else
+                        tx.setRollbackOnly();
+
+                    return null;
+                } catch (Error | Exception e) {
+                    U.error(log, "Failed to remove data structure: " + key, e);
+
+                    throw e;
+                } finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, name, COUNT_DOWN_LATCH, null);
+    }
+
+    /**
+     * Remove internal entry by key from cache.
+     *
+     * @param key Internal entry key.
+     * @param cls Class of object which will be removed. If cached object has different type exception will be thrown.
+     * @return Method returns true if sequence has been removed and false if it's not cached.
+     * @throws IgniteCheckedException If removing failed or class of object is different to expected class.
+     */
+    private <R> boolean removeInternal(final GridCacheInternal key, final Class<R> cls) throws IgniteCheckedException {
+        return CU.outTx(
+            new Callable<Boolean>() {
+                @Override public Boolean call() throws Exception {
+                    try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                        // Check correctness type of removable object.
+                        R val = cast(dsView.get(key), cls);
+
+                        if (val != null) {
+                            dsView.removex(key);
+
+                            tx.commit();
+                        }
+                        else
+                            tx.setRollbackOnly();
+
+                        return val != null;
+                    }
+                    catch (Error | Exception e) {
+                        U.error(log, "Failed to remove data structure: " + key, e);
+
+                        throw e;
+                    }
+                }
+            },
+            dsCacheCtx
+        );
+    }
+
+    /**
+     * Transaction committed callback for transaction manager.
+     *
+     * @param tx Committed transaction.
+     */
+    public <K, V> void onTxCommitted(IgniteInternalTx<K, V> tx) {
+        if (dsCacheCtx == null)
+            return;
+
+        if (!dsCacheCtx.isDht() && tx.internal() && (!dsCacheCtx.isColocated() || dsCacheCtx.isReplicated())) {
+            Collection<IgniteTxEntry<K, V>> entries = tx.writeEntries();
+
+            if (log.isDebugEnabled())
+                log.debug("Committed entries: " + entries);
+
+            for (IgniteTxEntry<K, V> entry : entries) {
+                // Check updated or created GridCacheInternalKey keys.
+                if ((entry.op() == CREATE || entry.op() == UPDATE) && entry.key() instanceof GridCacheInternalKey) {
+                    GridCacheInternal key = (GridCacheInternal)entry.key();
+
+                    if (entry.value() instanceof GridCacheCountDownLatchValue) {
+                        // Notify latch on changes.
+                        GridCacheRemovable latch = dsMap.get(key);
+
+                        GridCacheCountDownLatchValue val = (GridCacheCountDownLatchValue)entry.value();
+
+                        if (latch instanceof GridCacheCountDownLatchEx) {
+                            GridCacheCountDownLatchEx latch0 = (GridCacheCountDownLatchEx)latch;
+
+                            latch0.onUpdate(val.get());
+
+                            if (val.get() == 0 && val.autoDelete()) {
+                                entry.cached().markObsolete(dsCacheCtx.versions().next());
+
+                                dsMap.remove(key);
+
+                                latch.onRemoved();
+                            }
+                        }
+                        else if (latch != null) {
+                            U.error(log, "Failed to cast object " +
+                                "[expected=" + IgniteCountDownLatch.class.getSimpleName() +
+                                ", actual=" + latch.getClass() + ", value=" + latch + ']');
+                        }
+                    }
+                }
+
+                // Check deleted GridCacheInternal keys.
+                if (entry.op() == DELETE && entry.key() instanceof GridCacheInternal) {
+                    GridCacheInternal key = (GridCacheInternal)entry.key();
+
+                    // Entry's val is null if entry deleted.
+                    GridCacheRemovable obj = dsMap.remove(key);
+
+                    if (obj != null)
+                        obj.onRemoved();
+                }
+            }
+        }
+    }
+
+    /**
+     * Gets a set from cache or creates one if it's not cached.
+     *
+     * @param name Set name.
+     * @param cfg Set configuration if new set should be created.
+     * @return Set instance.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    @Nullable public <T> IgniteSet<T> set(final String name,
+        @Nullable final IgniteCollectionConfiguration cfg)
+        throws IgniteCheckedException {
+        A.notNull(name, "name");
+
+        if (cfg != null) {
+            if (ctx.cache().publicCache(cfg.getCacheName()) == null)
+                throw new IgniteCheckedException("Cache for collection is not configured: " + cfg.getCacheName());
+        }
+
+        DataStructureInfo dsInfo = new DataStructureInfo(name,
+            SET,
+            cfg != null ? new CollectionInfo(cfg.getCacheName(), cfg.isCollocated()) : null);
+
+        final boolean create = cfg != null;
+
+        return getCollection(new CX1<GridCacheContext, IgniteSet<T>>() {
+            @Override public IgniteSet<T> applyx(GridCacheContext cctx) throws IgniteCheckedException {
+                return cctx.dataStructures().set(name, create ? cfg.isCollocated() : false, create);
+            }
+        }, dsInfo, create);
+    }
+
+    /**
+     * @param name Set name.
+     * @param cctx Set cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void removeSet(final String name, final GridCacheContext cctx) throws IgniteCheckedException {
+        assert name != null;
+        assert cctx != null;
+
+        IgniteCallable<GridCacheSetHeader> rmv = new IgniteCallable<GridCacheSetHeader>() {
+            @Override public GridCacheSetHeader call() throws Exception {
+                return (GridCacheSetHeader)retryRemove(cctx.cache(), new GridCacheSetHeaderKey(name));
+            }
+        };
+
+        CIX1<GridCacheSetHeader> afterRmv = new CIX1<GridCacheSetHeader>() {
+            @Override public void applyx(GridCacheSetHeader hdr) throws IgniteCheckedException {
+                cctx.dataStructures().removeSetData(hdr.id());
+            }
+        };
+
+        removeDataStructure(rmv, name, SET, afterRmv);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key to remove.
+     * @throws IgniteCheckedException If failed.
+     * @return Removed value.
+     */
+    @SuppressWarnings("unchecked")
+    @Nullable private <T> T retryRemove(final GridCache cache, final Object key) throws IgniteCheckedException {
+        return retry(log, new Callable<T>() {
+            @Nullable @Override public T call() throws Exception {
+                return (T)cache.remove(key);
+            }
+        });
+    }
+
+    /**
+     * @param log Logger.
+     * @param call Callable.
+     * @return Callable result.
+     * @throws IgniteCheckedException If all retries failed.
+     */
+    public static <R> R retry(IgniteLogger log, Callable<R> call) throws IgniteCheckedException {
+        try {
+            int cnt = 0;
+
+            while (true) {
+                try {
+                    return call.call();
+                }
+                catch (ClusterGroupEmptyCheckedException e) {
+                    throw new IgniteCheckedException(e);
+                }
+                catch (IgniteTxRollbackCheckedException | CachePartialUpdateCheckedException | ClusterTopologyCheckedException e) {
+                    if (cnt++ == MAX_UPDATE_RETRIES)
+                        throw e;
+                    else {
+                        U.warn(log, "Failed to execute data structure operation, will retry [err=" + e + ']');
+
+                        U.sleep(RETRY_DELAY);
+                    }
+                }
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Tries to cast the object to expected type.
+     *
+     * @param obj Object which will be casted.
+     * @param cls Class
+     * @param <R> Type of expected result.
+     * @return Object has casted to expected type.
+     * @throws IgniteCheckedException If {@code obj} has different to {@code cls} type.
+     */
+    @SuppressWarnings("unchecked")
+    @Nullable private <R> R cast(@Nullable Object obj, Class<R> cls) throws IgniteCheckedException {
+        if (obj == null)
+            return null;
+
+        if (cls.isInstance(obj))
+            return (R)obj;
+        else
+            throw new IgniteCheckedException("Failed to cast object [expected=" + cls +
+                ", actual=" + obj.getClass() + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public void printMemoryStats() {
+        X.println(">>> ");
+        X.println(">>> Data structure processor memory stats [grid=" + ctx.gridName() +
+            ", cache=" + (dsCacheCtx != null ? dsCacheCtx.name() : null) + ']');
+        X.println(">>>   dsMapSize: " + dsMap.size());
+    }
+
+    /**
+     * @throws IgniteException If atomics configuration is not provided.
+     */
+    private void checkAtomicsConfiguration() throws IgniteException {
+        if (atomicCfg == null)
+            throw new IgniteException("Atomic data structure can not be created, " +
+                "need to provide IgniteAtomicConfiguration.");
+    }
+
+    /**
+     *
+     */
+    static enum DataStructureType {
+        /** */
+        ATOMIC_LONG(IgniteAtomicLong.class.getSimpleName()),
+
+        /** */
+        ATOMIC_REF(IgniteAtomicReference.class.getSimpleName()),
+
+        /** */
+        ATOMIC_SEQ(IgniteAtomicSequence.class.getSimpleName()),
+
+        /** */
+        ATOMIC_STAMPED(IgniteAtomicStamped.class.getSimpleName()),
+
+        /** */
+        COUNT_DOWN_LATCH(IgniteCountDownLatch.class.getSimpleName()),
+
+        /** */
+        QUEUE(IgniteQueue.class.getSimpleName()),
+
+        /** */
+        SET(IgniteSet.class.getSimpleName());
+
+        /** */
+        private static final DataStructureType[] VALS = values();
+
+        /** */
+        private String name;
+
+        /**
+         * @param name Name.
+         */
+        DataStructureType(String name) {
+            this.name = name;
+        }
+
+        /**
+         * @return Data structure public class name.
+         */
+        public String className() {
+            return name;
+        }
+
+        /**
+         * @param ord Ordinal value.
+         * @return Enumerated value or {@code null} if ordinal out of range.
+         */
+        @Nullable public static DataStructureType fromOrdinal(int ord) {
+            return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
+        }
+    }
+
+    /**
+     *
+     */
+    static class CollectionInfo implements Externalizable {
+        /** */
+        private boolean collocated;
+
+        /** */
+        private String cacheName;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public CollectionInfo() {
+            // No-op.
+        }
+
+        /*
+         * @param cacheName Collection cache name.
+         * @param collocated Collocated flag.
+         */
+        public CollectionInfo(String cacheName, boolean collocated) {
+            this.cacheName = cacheName;
+            this.collocated = collocated;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            collocated = in.readBoolean();
+            cacheName = U.readString(in);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeBoolean(collocated);
+            U.writeString(out, cacheName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CollectionInfo.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class QueueInfo extends CollectionInfo {
+        /** */
+        private int cap;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public QueueInfo() {
+            // No-op.
+        }
+
+        /**
+         * @param collocated Collocated flag.
+         * @param cap Queue capacity.
+         * @param cacheName Cache name.
+         */
+        public QueueInfo(String cacheName, boolean collocated, int cap) {
+            super(cacheName, collocated);
+
+            this.cap = cap;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            super.readExternal(in);
+
+            cap = in.readInt();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            super.writeExternal(out);
+
+            out.writeInt(cap);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueueInfo.class, this, "super", super.toString());
+        }
+    }
+
+    /**
+     *
+     */
+    static class DataStructureInfo implements Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private String name;
+
+        /** */
+        private DataStructureType type;
+
+        /** */
+        private Object info;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public DataStructureInfo() {
+            // No-op.
+        }
+
+        /**
+         * @param name Data structure name.
+         * @param type Data structure type.
+         * @param info Data structure information.
+         */
+        DataStructureInfo(String name, DataStructureType type, Externalizable info) {
+            this.name = name;
+            this.type = type;
+            this.info = info;
+        }
+
+        /**
+         * @param dsInfo New data structure info.
+         * @param create Create flag.
+         * @return Exception if validation failed.
+         */
+        @Nullable IgniteCheckedException validate(DataStructureInfo dsInfo, boolean create) {
+            if (type != dsInfo.type) {
+                return new IgniteCheckedException("Another data structure with the same name already created " +
+                    "[name=" + name +
+                    ", newType=" + dsInfo.type.className() +
+                    ", existingType=" + type.className() + ']');
+            }
+
+            if (create) {
+                if (type == QUEUE || type == SET) {
+                    CollectionInfo oldInfo = (CollectionInfo)info;
+                    CollectionInfo newInfo = (CollectionInfo)dsInfo.info;
+
+                    if (oldInfo.collocated != newInfo.collocated) {
+                        return new IgniteCheckedException("Another collection with the same name but different " +
+                            "configuration already created [name=" + name +
+                            ", newCollocated=" + newInfo.collocated +
+                            ", existingCollocated=" + newInfo.collocated + ']');
+                    }
+
+                    if (type == QUEUE) {
+                        if (((QueueInfo)oldInfo).cap != ((QueueInfo)newInfo).cap) {
+                            return new IgniteCheckedException("Another queue with the same name but different " +
+                                "configuration already created [name=" + name +
+                                ", newCapacity=" + ((QueueInfo)newInfo).cap +
+                                ", existingCapacity=" + ((QueueInfo)oldInfo).cap + ']');
+                        }
+                    }
+                }
+            }
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            U.writeString(out, name);
+            U.writeEnum(out, type);
+            out.writeObject(info);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            name = U.readString(in);
+            type = DataStructureType.fromOrdinal(in.readByte());
+            info = in.readObject();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DataStructureInfo.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class AddAtomicProcessor implements
+        EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>, IgniteCheckedException>,
+        Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private DataStructureInfo info;
+
+        /**
+         * @param info Data structure information.
+         */
+        AddAtomicProcessor(DataStructureInfo info) {
+            assert info != null;
+
+            this.info = info;
+        }
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public AddAtomicProcessor() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteCheckedException process(
+            MutableEntry<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> entry,
+            Object... args)
+            throws EntryProcessorException
+        {
+            Map<String, DataStructureInfo> map = entry.getValue();
+
+            if (map == null) {
+                map = new HashMap<>();
+
+                map.put(info.name, info);
+
+                entry.setValue(map);
+
+                return null;
+            }
+
+            DataStructureInfo oldInfo = map.get(info.name);
+
+            if (oldInfo == null) {
+                map = new HashMap<>(map);
+
+                map.put(info.name, info);
+
+                entry.setValue(map);
+
+                return null;
+            }
+
+            return oldInfo.validate(info, true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            info.writeExternal(out);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            info = new DataStructureInfo();
+
+            info.readExternal(in);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(AddAtomicProcessor.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class AddCollectionProcessor implements
+        EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>,
+            T2<String, IgniteCheckedException>>, Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private DataStructureInfo info;
+
+        /**
+         * @param info Data structure information.
+         */
+        AddCollectionProcessor(DataStructureInfo info) {
+            assert info != null;
+            assert info.info instanceof CollectionInfo;
+
+            this.info = info;
+        }
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public AddCollectionProcessor() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public T2<String, IgniteCheckedException> process(
+            MutableEntry<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> entry,
+            Object... args)
+        {
+            Map<String, DataStructureInfo> map = entry.getValue();
+
+            CollectionInfo colInfo = (CollectionInfo)info.info;
+
+            if (map == null) {
+                map = new HashMap<>();
+
+                map.put(info.name, info);
+
+                entry.setValue(map);
+
+                return new T2<>(colInfo.cacheName, null);
+            }
+
+            DataStructureInfo oldInfo = map.get(info.name);
+
+            if (oldInfo == null) {
+                map = new HashMap<>(map);
+
+                map.put(info.name, info);
+
+                entry.setValue(map);
+
+                return new T2<>(colInfo.cacheName, null);
+            }
+
+            return new T2<>(colInfo.cacheName, oldInfo.validate(info, true));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            info.writeExternal(out);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            info = new DataStructureInfo();
+
+            info.readExternal(in);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(AddCollectionProcessor.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class RemoveDataStructureProcessor implements
+        EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>,
+            T2<Boolean, IgniteCheckedException>>, Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private DataStructureInfo info;
+
+        /**
+         * @param info Data structure information.
+         */
+        RemoveDataStructureProcessor(DataStructureInfo info) {
+            assert info != null;
+
+            this.info = info;
+        }
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public RemoveDataStructureProcessor() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public T2<Boolean, IgniteCheckedException> process(
+            MutableEntry<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> entry,
+            Object... args)
+        {
+            Map<String, DataStructureInfo> map = entry.getValue();
+
+            if (map == null)
+                return new T2<>(false, null);
+
+            DataStructureInfo oldInfo = map.get(info.name);
+
+            if (oldInfo == null)
+                return new T2<>(false, null);
+
+            IgniteCheckedException err = oldInfo.validate(info, false);
+
+            if (err == null) {
+                map = new HashMap<>(map);
+
+                map.remove(info.name);
+
+                entry.setValue(map);
+            }
+
+            return new T2<>(true, err);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            info.writeExternal(out);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            info = new DataStructureInfo();
+
+            info.readExternal(in);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoveDataStructureProcessor.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aaaebbd3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
index 6751884..8172dcf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
@@ -754,7 +754,7 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
         try {
             IgniteBiTuple<GridKernalContext, String> t = stash.get();
 
-            return t.get1().dataStructures().queue(t.get2(), null, 0, false);
+            return t.get1().dataStructures().queue(t.get2(), 0, null);
         }
         catch (IgniteCheckedException e) {
             throw U.withCause(new InvalidObjectException(e.getMessage()), e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aaaebbd3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index feecc97..f5470e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -385,7 +385,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
      */
     private <R> R retry(Callable<R> call) {
         try {
-            return CacheDataStructuresProcessor.retry(log, call);
+            return DataStructuresProcessor.retry(log, call);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aaaebbd3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
index 3a60ace..e29de99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
@@ -546,7 +546,7 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
         try {
             IgniteBiTuple<GridKernalContext, String> t = stash.get();
 
-            return t.get1().dataStructures().set(t.get2(), null, false);
+            return t.get1().dataStructures().set(t.get2(), null);
         }
         catch (IgniteCheckedException e) {
             throw U.withCause(new InvalidObjectException(e.getMessage()), e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aaaebbd3/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
index 3d5a882..f1f92d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
@@ -562,13 +562,6 @@ public class GridRestProcessor extends GridProcessorAdapter {
 
                 break;
 
-            // TODO IGNITE-6.
-            case ATOMIC_INCREMENT:
-            case ATOMIC_DECREMENT:
-                perm = GridSecurityPermission.CACHE_PUT;
-
-                break;
-
             case CACHE_PUT:
             case CACHE_ADD:
             case CACHE_PUT_ALL:

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aaaebbd3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index d6f84cd..c04e6ae 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -613,20 +613,20 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      */
     public void testFifoQueueTopologyChange() throws Exception {
         try {
-            grid(0).queue(STRUCTURE_NAME, config(false), 0, true).put(10);
+            grid(0).queue(STRUCTURE_NAME, 0, config(false)).put(10);
 
             Ignite g = startGrid(NEW_GRID_NAME);
 
-            assert g.<Integer>queue(STRUCTURE_NAME, null, 0, false).poll() == 10;
+            assert g.<Integer>queue(STRUCTURE_NAME, 0, null).poll() == 10;
 
-            g.queue(STRUCTURE_NAME, null, 0, false).put(20);
+            g.queue(STRUCTURE_NAME, 0, null).put(20);
 
             stopGrid(NEW_GRID_NAME);
 
-            assert grid(0).<Integer>queue(STRUCTURE_NAME, null, 0, false).peek() == 20;
+            assert grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).peek() == 20;
         }
         finally {
-            grid(0).<Integer>queue(STRUCTURE_NAME, null, 0, false).close();
+            grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).close();
         }
     }
 
@@ -634,7 +634,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testQueueConstantTopologyChange() throws Exception {
-        try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, config(false), 0, true)) {
+        try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) {
             s.put(1);
 
             IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
@@ -646,7 +646,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
                             try {
                                 Ignite g = startGrid(name);
 
-                                assert g.<Integer>queue(STRUCTURE_NAME, null, 0, false).peek() > 0;
+                                assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0;
                             }
                             finally {
                                 if (i != TOP_CHANGE_CNT - 1)
@@ -670,7 +670,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
             fut.get();
 
             for (Ignite g : G.allGrids())
-                assert g.<Integer>queue(STRUCTURE_NAME, null, 0, false).peek() == origVal;
+                assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() == origVal;
         }
     }
 
@@ -678,7 +678,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
      * @throws Exception If failed.
      */
     public void testQueueConstantMultipleTopologyChange() throws Exception {
-        try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, config(false), 0, true)) {
+        try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) {
             s.put(1);
 
             IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
@@ -695,7 +695,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
 
                                     Ignite g = startGrid(name);
 
-                                    assert g.<Integer>queue(STRUCTURE_NAME, null, 0, false).peek() > 0;
+                                    assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0;
                                 }
                             }
                             finally {
@@ -721,7 +721,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
             fut.get();
 
             for (Ignite g : G.allGrids())
-                assert g.<Integer>queue(STRUCTURE_NAME, null, 0, false).peek() == origVal;
+                assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() == origVal;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aaaebbd3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java
index 4f6436a..f758a12 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java
@@ -114,7 +114,7 @@ public abstract class GridCacheAbstractQueueFailoverDataConsistencySelfTest exte
     private void testAddFailover(boolean collocated) throws Exception {
         IgniteCollectionConfiguration colCfg = config(collocated);
 
-        IgniteQueue<Integer> queue = grid(0).queue(QUEUE_NAME, colCfg, 0,true);
+        IgniteQueue<Integer> queue = grid(0).queue(QUEUE_NAME, 0, colCfg);
 
         assertNotNull(queue);
         assertEquals(0, queue.size());
@@ -131,7 +131,7 @@ public abstract class GridCacheAbstractQueueFailoverDataConsistencySelfTest exte
         log.info("Test node: " + testNodeIdx) ;
         log.info("Header primary node: " + primaryNode) ;
 
-        queue = grid(testNodeIdx).queue(QUEUE_NAME, null, 0, false);
+        queue = grid(testNodeIdx).queue(QUEUE_NAME, 0, null);
 
         assertNotNull(queue);
 
@@ -209,7 +209,7 @@ public abstract class GridCacheAbstractQueueFailoverDataConsistencySelfTest exte
     private void testPollFailover(boolean collocated) throws Exception {
         IgniteCollectionConfiguration colCfg = config(collocated);
 
-        IgniteQueue<Integer> queue = grid(0).queue(QUEUE_NAME, colCfg, 0, true);
+        IgniteQueue<Integer> queue = grid(0).queue(QUEUE_NAME, 0, colCfg);
 
         assertNotNull(queue);
         assertEquals(0, queue.size());
@@ -226,7 +226,7 @@ public abstract class GridCacheAbstractQueueFailoverDataConsistencySelfTest exte
         log.info("Test node: " + testNodeIdx) ;
         log.info("Primary node: " + primaryNode) ;
 
-        queue = grid(testNodeIdx).queue(QUEUE_NAME, null, 0, false);
+        queue = grid(testNodeIdx).queue(QUEUE_NAME, 0, null);
 
         assertNotNull(queue);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aaaebbd3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
index de33315..6d08f0a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
@@ -52,9 +52,9 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
 
         IgniteCollectionConfiguration colCfg = config(false);
 
-        IgniteQueue queue1 = grid(0).queue(queueName1, colCfg, 0, true);
-        IgniteQueue queue2 = grid(0).queue(queueName2, colCfg, 0, true);
-        IgniteQueue queue3 = grid(0).queue(queueName1, colCfg, 0, true);
+        IgniteQueue queue1 = grid(0).queue(queueName1, 0, colCfg);
+        IgniteQueue queue2 = grid(0).queue(queueName2, 0, colCfg);
+        IgniteQueue queue3 = grid(0).queue(queueName1, 0, colCfg);
 
         assertNotNull(queue1);
         assertNotNull(queue2);
@@ -67,8 +67,8 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
         queue2.close();
         queue3.close();
 
-        assertNull(grid(0).queue(queueName1, null, 0, false));
-        assertNull(grid(0).queue(queueName2, null, 0, false));
+        assertNull(grid(0).queue(queueName1, 0, null));
+        assertNull(grid(0).queue(queueName2, 0, null));
     }
 
     /**
@@ -82,7 +82,7 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
 
         String val = UUID.randomUUID().toString();
 
-        IgniteQueue<String> queue = grid(0).queue(queueName, config(false), 0, true);
+        IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
 
         assert queue.add(val);
 
@@ -100,7 +100,7 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
 
         String val = UUID.randomUUID().toString();
 
-        IgniteQueue<String> queue = grid(0).queue(queueName, config(false), 0, true);
+        IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
 
         assert queue.add(val);
 
@@ -118,7 +118,7 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
         // Random queue name.
         String queueName = UUID.randomUUID().toString();
 
-        IgniteQueue<SameHashItem> queue = grid(0).queue(queueName, config(false), 0, true);
+        IgniteQueue<SameHashItem> queue = grid(0).queue(queueName, 0, config(false));
 
         int retries = 100;
 
@@ -194,7 +194,7 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
         // Random queue name.
         String queueName = UUID.randomUUID().toString();
 
-        IgniteQueue<String> queue = grid(0).queue(queueName, config(false), 0, true);
+        IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
 
         assert queue.add("1");
 
@@ -216,7 +216,7 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
         // Random queue name.
         String queueName = UUID.randomUUID().toString();
 
-        IgniteQueue<String> queue = grid(0).queue(queueName, config(false), 0, true);
+        IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
 
         String item1 = "1";
         assert queue.add(item1);
@@ -241,7 +241,7 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
         // Random queue name.
         String queueName = UUID.randomUUID().toString();
 
-        IgniteQueue<String> queue = grid(0).queue(queueName, config(false), 0, true);
+        IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
 
         for (int i = 0; i < 100; i++)
             assert queue.add(Integer.toString(i));
@@ -297,7 +297,7 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
         // Random queue name.
         String queueName = UUID.randomUUID().toString();
 
-        IgniteQueue<String> queue = grid(0).queue(queueName, config(false), QUEUE_CAPACITY, true);
+        IgniteQueue<String> queue = grid(0).queue(queueName, QUEUE_CAPACITY, config(false));
 
         String thName = Thread.currentThread().getName();
 
@@ -319,7 +319,7 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
         // Random queue name.
         String queueName = UUID.randomUUID().toString();
 
-        final IgniteQueue<String> queue = grid(0).queue(queueName, config(false), QUEUE_CAPACITY, true);
+        final IgniteQueue<String> queue = grid(0).queue(queueName, QUEUE_CAPACITY, config(false));
 
         multithreaded(new Callable<Void>() {
                 @Override public Void call() throws Exception {
@@ -347,7 +347,7 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
         // Random queue name.
         String queueName = UUID.randomUUID().toString();
 
-        final IgniteQueue<String> queue = grid(0).queue(queueName, config(false), QUEUE_CAPACITY, true);
+        final IgniteQueue<String> queue = grid(0).queue(queueName, QUEUE_CAPACITY, config(false));
 
         multithreaded(new Callable<String>() {
                 @Override public String call() throws Exception {
@@ -374,7 +374,7 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
         // Random queue name.
         final String queueName = UUID.randomUUID().toString();
 
-        final IgniteQueue<String> queue = grid(0).queue(queueName, config(false), QUEUE_CAPACITY, true);
+        final IgniteQueue<String> queue = grid(0).queue(queueName, QUEUE_CAPACITY, config(false));
 
         final CountDownLatch putLatch = new CountDownLatch(THREAD_NUM);
 
@@ -413,7 +413,7 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
             Thread th = new Thread(new Runnable() {
                 @Override public void run() {
                     try {
-                        IgniteQueue<String> queue = grid(0).queue(queueName, null, 0, false);
+                        IgniteQueue<String> queue = grid(0).queue(queueName, 0, null);
 
                         if (queue != null)
                             queue.close();
@@ -454,7 +454,7 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
         // Random queue name.
         String queueName = UUID.randomUUID().toString();
 
-        IgniteQueue<String> queue = grid(0).queue(queueName, config(false), 0, true);
+        IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
 
         String thread = Thread.currentThread().getName();
 
@@ -479,7 +479,7 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
         // Random queue name.
         String queueName = UUID.randomUUID().toString();
 
-        final IgniteQueue<String> queue = grid(0).queue(queueName, config(false), 0, true);
+        final IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
 
         multithreaded(
             new Callable<String>() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aaaebbd3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java
index f772bf3..d70d978 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java
@@ -72,7 +72,7 @@ public class GridCacheQueueCleanupSelfTest extends IgniteCollectionAbstractTest
      * @throws Exception If failed.
      */
     public void testCleanup() throws Exception {
-        IgniteQueue<Integer> queue = grid(0).queue(QUEUE_NAME1, config(false), 0, true);
+        IgniteQueue<Integer> queue = grid(0).queue(QUEUE_NAME1, 0, config(false));
 
         GridCacheContext cctx = GridTestUtils.getFieldValue(queue, "cctx");
 
@@ -161,7 +161,7 @@ public class GridCacheQueueCleanupSelfTest extends IgniteCollectionAbstractTest
         // Remove queue and create queue with the same name.
         queue.close();
 
-        queue = ignite.queue(QUEUE_NAME1, config(false), 0, true);
+        queue = ignite.queue(QUEUE_NAME1, 0, config(false));
 
         assertEquals(0, queue.size());
 
@@ -212,7 +212,7 @@ public class GridCacheQueueCleanupSelfTest extends IgniteCollectionAbstractTest
     private IgniteInternalFuture<?> startAddPollThread(final Ignite ignite, final AtomicBoolean stop, final String queueName) {
         return GridTestUtils.runAsync(new Callable<Void>() {
             @Override public Void call() throws Exception {
-                IgniteQueue<Integer> queue = ignite.queue(queueName, config(false), 0, true);
+                IgniteQueue<Integer> queue = ignite.queue(queueName, 0, config(false));
 
                 assertEquals(0, queue.size());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aaaebbd3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java
index 1c3dc50..d1c37d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java
@@ -58,7 +58,7 @@ public abstract class GridCacheQueueJoinedNodeSelfAbstractTest extends IgniteCol
     public void testTakeFromJoined() throws Exception {
         String queueName = UUID.randomUUID().toString();
 
-        IgniteQueue<Integer> queue = grid(0).queue(queueName, config(true), 0, true);
+        IgniteQueue<Integer> queue = grid(0).queue(queueName, 0, config(true));
 
         assertNotNull(queue);
 
@@ -156,7 +156,7 @@ public abstract class GridCacheQueueJoinedNodeSelfAbstractTest extends IgniteCol
                 ", job=" + getClass().getSimpleName() + "]");
 
             try {
-                IgniteQueue<Integer> queue = ignite.queue(queueName, null, 0, false);
+                IgniteQueue<Integer> queue = ignite.queue(queueName, 0, null);
 
                 assertNotNull(queue);
 
@@ -250,7 +250,7 @@ public abstract class GridCacheQueueJoinedNodeSelfAbstractTest extends IgniteCol
             Integer lastPolled = null;
 
             try {
-                IgniteQueue<Integer> queue = ignite.queue(queueName, null, 0, false);
+                IgniteQueue<Integer> queue = ignite.queue(queueName, 0, null);
 
                 assertNotNull(queue);
 


Mime
View raw message