ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [10/16] incubator-ignite git commit: # ignite-60
Date Mon, 26 Jan 2015 14:19:00 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c10a6c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueAdapter.java
deleted file mode 100644
index 92696fa..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueAdapter.java
+++ /dev/null
@@ -1,1007 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.datastructures;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.cache.datastructures.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.processor.*;
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static java.util.concurrent.TimeUnit.*;
-
-/**
- * Common code for {@link org.apache.ignite.cache.datastructures.IgniteQueue} implementation.
- */
-public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> implements IgniteQueue<T> {
-    /** Value returned by closure updating queue header indicating that queue was removed. */
-    protected static final long QUEUE_REMOVED_IDX = Long.MIN_VALUE;
-
-    /** */
-    protected static final int MAX_UPDATE_RETRIES = 100;
-
-    /** */
-    protected static final long RETRY_DELAY = 1;
-
-    /** */
-    private static final int DFLT_CLEAR_BATCH_SIZE = 100;
-
-    /** Logger. */
-    protected final IgniteLogger log;
-
-    /** Cache context. */
-    protected final GridCacheContext<?, ?> cctx;
-
-    /** Cache. */
-    protected final IgniteCache cache;
-
-    /** Queue name. */
-    protected final String queueName;
-
-    /** Queue header key. */
-    protected final GridCacheQueueHeaderKey queueKey;
-
-    /** Queue unique ID. */
-    protected final IgniteUuid id;
-
-    /** Queue capacity. */
-    private final int cap;
-
-    /** Collocation flag. */
-    private final boolean collocated;
-
-    /** Removed flag. */
-    private volatile boolean rmvd;
-
-    /** Read blocking operations semaphore. */
-    @GridToStringExclude
-    private final Semaphore readSem;
-
-    /** Write blocking operations semaphore. */
-    @GridToStringExclude
-    private final Semaphore writeSem;
-
-    /**
-     * @param queueName Queue name.
-     * @param hdr Queue hdr.
-     * @param cctx Cache context.
-     */
-    @SuppressWarnings("unchecked")
-    protected GridCacheQueueAdapter(String queueName, GridCacheQueueHeader hdr, GridCacheContext<?, ?> cctx) {
-        this.cctx = cctx;
-        this.queueName = queueName;
-        id = hdr.id();
-        cap = hdr.capacity();
-        collocated = hdr.collocated();
-        queueKey = new GridCacheQueueHeaderKey(queueName);
-        cache = cctx.kernalContext().cache().jcache(cctx.name());
-
-        log = cctx.logger(getClass());
-
-        readSem = new Semaphore(hdr.size(), true);
-
-        writeSem = bounded() ? new Semaphore(hdr.capacity() - hdr.size(), true) : null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String name() {
-        return queueName;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean add(T item) {
-        A.notNull(item, "item");
-
-        return offer(item);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean collocated() {
-        return collocated;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int capacity() throws IgniteCheckedException {
-        return cap;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean bounded() {
-        return cap < Integer.MAX_VALUE;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public int size() {
-        GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey);
-
-        checkRemoved(hdr);
-
-        return hdr.size();
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Nullable @Override public T peek() throws IgniteException {
-        GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey);
-
-        checkRemoved(hdr);
-
-        if (hdr.empty())
-            return null;
-
-        return (T)cache.get(itemKey(hdr.head()));
-    }
-
-    /** {@inheritDoc} */
-    @Override public T remove() {
-        T res = poll();
-
-        if (res == null)
-            throw new NoSuchElementException();
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public T element() {
-        T el = peek();
-
-        if (el == null)
-            throw new NoSuchElementException();
-
-        return el;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public Iterator<T> iterator() {
-        try {
-            GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey);
-
-            checkRemoved(hdr);
-
-            return new QueueIterator(hdr);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void put(T item) throws IgniteException {
-        A.notNull(item, "item");
-
-        if (!bounded()) {
-            boolean offer = offer(item);
-
-            assert offer;
-
-            return;
-        }
-
-        while (true) {
-            try {
-                writeSem.acquire();
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-
-                throw new IgniteException("Queue put interrupted.", e);
-            }
-
-            checkStopping();
-
-            if (offer(item))
-                return;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean offer(T item, long timeout, TimeUnit unit) throws IgniteException {
-        A.notNull(item, "item");
-        A.ensure(timeout >= 0, "Timeout cannot be negative: " + timeout);
-
-        if (!bounded()) {
-            boolean offer = offer(item);
-
-            assert offer;
-
-            return true;
-        }
-
-        long end = U.currentTimeMillis() + MILLISECONDS.convert(timeout, unit);
-
-        while (U.currentTimeMillis() < end) {
-            boolean retVal = false;
-
-            try {
-                if (writeSem.tryAcquire(end - U.currentTimeMillis(), MILLISECONDS)) {
-                    checkStopping();
-
-                    retVal = offer(item);
-                }
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-
-                throw new IgniteException("Queue put interrupted.", e);
-            }
-
-            if (retVal)
-                return true;
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public T take() throws IgniteException {
-        while (true) {
-            try {
-                readSem.acquire();
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-
-                throw new IgniteException("Queue take interrupted.", e);
-            }
-
-            checkStopping();
-
-            T e = poll();
-
-            if (e != null)
-                return e;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public T poll(long timeout, TimeUnit unit) throws IgniteException {
-        A.ensure(timeout >= 0, "Timeout cannot be negative: " + timeout);
-
-        long end = U.currentTimeMillis() + MILLISECONDS.convert(timeout, unit);
-
-        while (U.currentTimeMillis() < end) {
-            T retVal = null;
-
-            try {
-                if (readSem.tryAcquire(end - U.currentTimeMillis(), MILLISECONDS)) {
-                    checkStopping();
-
-                    retVal = poll();
-                }
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-
-                throw new IgniteException("Queue poll interrupted.", e);
-            }
-
-            if (retVal != null)
-                return retVal;
-        }
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int remainingCapacity() {
-        if (!bounded())
-            return Integer.MAX_VALUE;
-
-        int remaining = cap - size();
-
-        return remaining > 0 ? remaining : 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void clear() {
-        clear(DFLT_CLEAR_BATCH_SIZE);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void clear(int batchSize) throws IgniteException {
-        A.ensure(batchSize >= 0, "Batch size cannot be negative: " + batchSize);
-
-        try {
-            IgniteBiTuple<Long, Long> t = (IgniteBiTuple<Long, Long>)cache.invoke(queueKey, new ClearProcessor(id));
-
-            if (t == null)
-                return;
-
-            checkRemoved(t.get1());
-
-            removeKeys(cache, id, queueName, collocated, t.get1(), t.get2(), batchSize);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public int drainTo(Collection<? super T> c) {
-        return drainTo(c, Integer.MAX_VALUE);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int drainTo(Collection<? super T> c, int maxElements) {
-        int max = Math.min(maxElements, size());
-
-        for (int i = 0; i < max; i++) {
-            T el = poll();
-
-            if (el == null)
-                return i;
-
-            c.add(el);
-        }
-
-        return max;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean removed() {
-        return rmvd;
-    }
-
-    /**
-     * @param cache Cache.
-     * @param id Queue unique ID.
-     * @param name Queue name.
-     * @param collocated Collocation flag.
-     * @param startIdx Start item index.
-     * @param endIdx End item index.
-     * @param batchSize Batch size.
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings("unchecked")
-    static void removeKeys(IgniteCache cache,
-        IgniteUuid id,
-        String name,
-        boolean collocated,
-        long startIdx,
-        long endIdx,
-        int batchSize)
-        throws IgniteCheckedException
-    {
-        Set<GridCacheQueueItemKey> keys = new HashSet<>(batchSize > 0 ? batchSize : 10);
-
-        for (long idx = startIdx; idx < endIdx; idx++) {
-            keys.add(itemKey(id, name, collocated, idx));
-
-            if (batchSize > 0 && keys.size() == batchSize) {
-                cache.removeAll(keys);
-
-                keys.clear();
-            }
-        }
-
-        if (!keys.isEmpty())
-            cache.removeAll(keys);
-    }
-
-    /**
-     * Checks result of closure modifying queue header, throws {@link org.apache.ignite.cache.datastructures.CacheDataStructureRemovedRuntimeException}
-     * if queue was removed.
-     *
-     * @param idx Result of closure execution.
-     */
-    protected final void checkRemoved(Long idx) {
-        if (idx == QUEUE_REMOVED_IDX)
-            onRemoved(true);
-    }
-
-    /**
-     * Checks queue state, throws {@link org.apache.ignite.cache.datastructures.CacheDataStructureRemovedRuntimeException} if queue was removed.
-     *
-     * @param hdr Queue hdr.
-     */
-    protected final void checkRemoved(@Nullable GridCacheQueueHeader hdr) {
-        if (queueRemoved(hdr, id))
-            onRemoved(true);
-    }
-
-    /**
-     * Marks queue as removed.
-     *
-     * @param throw0 If {@code true} then throws {@link org.apache.ignite.cache.datastructures.CacheDataStructureRemovedRuntimeException}.
-     */
-    void onRemoved(boolean throw0) {
-        rmvd = true;
-
-        releaseSemaphores();
-
-        if (throw0)
-            throw new CacheDataStructureRemovedRuntimeException("Queue has been removed from cache: " + this);
-    }
-
-    /**
-     * Release all semaphores used in blocking operations (used in case queue was removed or grid is stopping).
-     */
-    private void releaseSemaphores() {
-        if (bounded()) {
-            writeSem.drainPermits();
-            writeSem.release(1_000_000); // Let all blocked threads to proceed (operation will fail with exception).
-        }
-
-        readSem.drainPermits();
-        readSem.release(1_000_000); // Let all blocked threads to proceed (operation will fail with exception).
-    }
-
-    /**
-     * @param hdr Queue header.
-     */
-    void onHeaderChanged(GridCacheQueueHeader hdr) {
-        if (!hdr.empty()) {
-            readSem.drainPermits();
-            readSem.release(hdr.size());
-        }
-
-        if (bounded()) {
-            writeSem.drainPermits();
-
-            if (!hdr.full())
-                writeSem.release(hdr.capacity() - hdr.size());
-        }
-    }
-
-    /**
-     * Grid stop callback.
-     */
-    void onKernalStop() {
-        releaseSemaphores();
-    }
-
-    /**
-     * Throws {@link IgniteException} in case if grid is stopping.
-     */
-    private void checkStopping() {
-        if (cctx.kernalContext().isStopping())
-            throw new IgniteException("Grid is stopping");
-    }
-
-    /**
-     * @return Queue unique ID.
-     */
-    IgniteUuid id() {
-        return id;
-    }
-
-    /**
-     * Removes item with given index from queue.
-     *
-     * @param rmvIdx Index of item to be removed.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected abstract void removeItem(long rmvIdx) throws IgniteCheckedException;
-
-
-    /**
-     * @param idx Item index.
-     * @return Item key.
-     */
-    protected GridCacheQueueItemKey itemKey(Long idx) {
-        return itemKey(id, queueName, collocated(), idx);
-    }
-
-    /**
-     * @param id Queue unique ID.
-     * @param queueName Queue name.
-     * @param collocated Collocation flag.
-     * @param idx Item index.
-     * @return Item key.
-     */
-    private static GridCacheQueueItemKey itemKey(IgniteUuid id, String queueName, boolean collocated, long idx) {
-        return collocated ? new CollocatedItemKey(id, queueName, idx) :
-            new GridCacheQueueItemKey(id, queueName, idx);
-    }
-
-    /**
-     * @param hdr Queue header.
-     * @param id Expected queue unique ID.
-     * @return {@code True} if queue was removed.
-     */
-    private static boolean queueRemoved(@Nullable GridCacheQueueHeader hdr, IgniteUuid id) {
-        return hdr == null || !id.equals(hdr.id());
-    }
-
-    /**
-     */
-    private class QueueIterator implements Iterator<T> {
-        /** */
-        private T next;
-
-        /** */
-        private T cur;
-
-        /** */
-        private long curIdx;
-
-        /** */
-        private long idx;
-
-        /** */
-        private long endIdx;
-
-        /** */
-        private Set<Long> rmvIdxs;
-
-        /**
-         * @param hdr Queue header.
-         * @throws IgniteCheckedException If failed.
-         */
-        @SuppressWarnings("unchecked")
-        private QueueIterator(GridCacheQueueHeader hdr) throws IgniteCheckedException {
-            idx = hdr.head();
-            endIdx = hdr.tail();
-            rmvIdxs = hdr.removedIndexes();
-
-            assert !F.contains(rmvIdxs, idx) : idx;
-
-            if (idx < endIdx)
-                next = (T)cache.get(itemKey(idx));
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasNext() {
-            return next != null;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public T next() {
-            if (next == null)
-                throw new NoSuchElementException();
-
-            cur = next;
-            curIdx = idx;
-
-            idx++;
-
-            if (rmvIdxs != null) {
-                while (F.contains(rmvIdxs, idx) && idx < endIdx)
-                    idx++;
-            }
-
-            next = idx < endIdx ? (T)cache.get(itemKey(idx)) : null;
-
-            return cur;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void remove() {
-            if (cur == null)
-                throw new IllegalStateException();
-
-            try {
-                removeItem(curIdx);
-
-                cur = null;
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-    }
-
-    /**
-     * Item key for collocated queue.
-     */
-    private static class CollocatedItemKey extends GridCacheQueueItemKey {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public CollocatedItemKey() {
-            // No-op.
-        }
-
-        /**
-         * @param id Queue unique ID.
-         * @param queueName Queue name.
-         * @param idx Item index.
-         */
-        private CollocatedItemKey(IgniteUuid id, String queueName, long idx) {
-            super(id, queueName, idx);
-        }
-
-        /**
-         * @return Item affinity key.
-         */
-        @CacheAffinityKeyMapped
-        public Object affinityKey() {
-            return queueName();
-        }
-    }
-
-    /**
-     */
-    protected static class ClearProcessor implements
-        EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, IgniteBiTuple<Long, Long>>, Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private IgniteUuid id;
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public ClearProcessor() {
-            // No-op.
-        }
-
-        /**
-         * @param id Queue unique ID.
-         */
-        public ClearProcessor(IgniteUuid id) {
-            this.id = id;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteBiTuple<Long, Long> process(
-            MutableEntry<GridCacheQueueHeaderKey, GridCacheQueueHeader> e, Object... args) {
-            GridCacheQueueHeader hdr = e.getValue();
-
-            boolean rmvd = queueRemoved(hdr, id);
-
-            if (rmvd)
-                return new IgniteBiTuple<>(QUEUE_REMOVED_IDX, QUEUE_REMOVED_IDX);
-            else if (hdr.empty())
-                return null;
-
-            GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(),
-                hdr.capacity(),
-                hdr.collocated(),
-                hdr.tail(),
-                hdr.tail(),
-                null);
-
-            e.setValue(newHdr);
-
-            return new IgniteBiTuple<>(hdr.head(), hdr.tail());
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeGridUuid(out, id);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            id = U.readGridUuid(in);
-        }
-    }
-
-    /**
-     */
-    protected static class PollProcessor implements
-        EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, Long>, Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private IgniteUuid id;
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public PollProcessor() {
-            // No-op.
-        }
-
-        /**
-         * @param id Queue unique ID.
-         */
-        public PollProcessor(IgniteUuid id) {
-            this.id = id;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Long process(
-            MutableEntry<GridCacheQueueHeaderKey, GridCacheQueueHeader> e, Object... args) {
-            GridCacheQueueHeader hdr = e.getValue();
-
-            boolean rmvd = queueRemoved(hdr, id);
-
-            if (rmvd || hdr.empty())
-                return rmvd ? QUEUE_REMOVED_IDX : null;
-
-            Set<Long> rmvdIdxs = hdr.removedIndexes();
-
-            if (rmvdIdxs == null) {
-                GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(),
-                    hdr.capacity(),
-                    hdr.collocated(),
-                    hdr.head() + 1,
-                    hdr.tail(),
-                    rmvdIdxs);
-
-                e.setValue(newHdr);
-
-                return hdr.head();
-            }
-
-            long next = hdr.head() + 1;
-
-            rmvdIdxs = new HashSet<>(rmvdIdxs);
-
-            do {
-                if (!rmvdIdxs.remove(next)) {
-                    GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(),
-                        hdr.capacity(),
-                        hdr.collocated(),
-                        next + 1,
-                        hdr.tail(),
-                        rmvdIdxs.isEmpty() ? null : rmvdIdxs);
-
-                    e.setValue(newHdr);
-
-                    return next;
-                }
-
-                next++;
-            } while (next != hdr.tail());
-
-            GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(),
-                hdr.capacity(),
-                hdr.collocated(),
-                next,
-                hdr.tail(),
-                rmvdIdxs.isEmpty() ? null : rmvdIdxs);
-
-            e.setValue(newHdr);
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeGridUuid(out, id);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            id = U.readGridUuid(in);
-        }
-    }
-
-    /**
-     */
-    protected static class AddProcessor implements
-        EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, Long>, Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private IgniteUuid id;
-
-        /** */
-        private int size;
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public AddProcessor() {
-            // No-op.
-        }
-
-        /**
-         * @param id Queue unique ID.
-         * @param size Number of elements to add.
-         */
-        public AddProcessor(IgniteUuid id, int size) {
-            this.id = id;
-            this.size = size;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Long process(MutableEntry<GridCacheQueueHeaderKey, GridCacheQueueHeader> e, Object... args) {
-            GridCacheQueueHeader hdr = e.getValue();
-
-            boolean rmvd = queueRemoved(hdr, id);
-
-            if (rmvd || !spaceAvailable(hdr, size))
-                return rmvd ? QUEUE_REMOVED_IDX : null;
-
-            GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(),
-                hdr.capacity(),
-                hdr.collocated(),
-                hdr.head(),
-                hdr.tail() + size,
-                hdr.removedIndexes());
-
-            e.setValue(newHdr);
-
-            return hdr.tail();
-        }
-
-        /**
-         * @param hdr Queue header.
-         * @param size Number of elements to add.
-         * @return {@code True} if new elements can be added.
-         */
-        private boolean spaceAvailable(GridCacheQueueHeader hdr, int size) {
-            return !hdr.bounded() || (hdr.size() + size) <= hdr.capacity();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeGridUuid(out, id);
-            out.writeInt(size);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            id = U.readGridUuid(in);
-            size = in.readInt();
-        }
-    }
-
-    /**
-     */
-    protected static class RemoveProcessor implements
-        EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, Long>, Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private IgniteUuid id;
-
-        /** */
-        private Long idx;
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public RemoveProcessor() {
-            // No-op.
-        }
-
-        /**
-         * @param id Queue UUID.
-         * @param idx Index of item to be removed.
-         */
-        public RemoveProcessor(IgniteUuid id, Long idx) {
-            this.id = id;
-            this.idx = idx;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Long process(MutableEntry<GridCacheQueueHeaderKey, GridCacheQueueHeader> e, Object... args) {
-            GridCacheQueueHeader hdr = e.getValue();
-
-            boolean rmvd = queueRemoved(hdr, id);
-
-            if (rmvd || hdr.empty() || idx < hdr.head())
-                return rmvd ? QUEUE_REMOVED_IDX : null;
-
-            if (idx == hdr.head()) {
-                Set<Long> rmvIdxs = hdr.removedIndexes();
-
-                long head = hdr.head() + 1;
-
-                if (!F.contains(rmvIdxs, head)) {
-                    GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(),
-                        hdr.capacity(),
-                        hdr.collocated(),
-                        head,
-                        hdr.tail(),
-                        hdr.removedIndexes());
-
-                    e.setValue(newHdr);
-
-                    return idx;
-                }
-
-                rmvIdxs = new HashSet<>(rmvIdxs);
-
-                while (rmvIdxs.remove(head))
-                    head++;
-
-                GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(),
-                    hdr.capacity(),
-                    hdr.collocated(),
-                    head,
-                    hdr.tail(),
-                    rmvIdxs.isEmpty() ? null : rmvIdxs);
-
-                e.setValue(newHdr);
-
-                return null;
-            }
-
-            Set<Long> rmvIdxs = hdr.removedIndexes();
-
-            if (rmvIdxs == null) {
-                rmvIdxs = new HashSet<>();
-
-                rmvIdxs.add(idx);
-            }
-            else {
-                if (!rmvIdxs.contains(idx)) {
-                    rmvIdxs = new HashSet<>(rmvIdxs);
-
-                    rmvIdxs.add(idx);
-                }
-                else
-                    idx = null;
-            }
-
-            GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(),
-                hdr.capacity(),
-                hdr.collocated(),
-                hdr.head(),
-                hdr.tail(),
-                rmvIdxs);
-
-            e.setValue(newHdr);
-
-            return idx;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeGridUuid(out, id);
-            out.writeLong(idx);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            id = U.readGridUuid(in);
-            idx = in.readLong();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        GridCacheQueueAdapter that = (GridCacheQueueAdapter) o;
-
-        return id.equals(that.id);
-
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return id.hashCode();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheQueueAdapter.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c10a6c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueHeader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueHeader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueHeader.java
deleted file mode 100644
index 9adf695..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueHeader.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.datastructures;
-
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Queue header.
- */
-public class GridCacheQueueHeader implements GridCacheInternal, Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private IgniteUuid id;
-
-    /** */
-    private long head;
-
-    /** */
-    private long tail;
-
-    /** */
-    private int cap;
-
-    /** */
-    private boolean collocated;
-
-    /** */
-    @GridToStringInclude
-    private Set<Long> rmvIdxs;
-
-    /**
-     * Required by {@link Externalizable}.
-     */
-    public GridCacheQueueHeader() {
-        // No-op.
-    }
-
-    /**
-     * @param id Queue unique ID.
-     * @param cap Capacity.
-     * @param collocated Collocation flag.
-     * @param head Queue head index.
-     * @param tail Queue tail index.
-     * @param rmvIdxs Indexes of removed items.
-     */
-    public GridCacheQueueHeader(IgniteUuid id, int cap, boolean collocated, long head, long tail,
-        @Nullable Set<Long> rmvIdxs) {
-        assert id != null;
-        assert head <= tail;
-
-        this.id = id;
-        this.cap = cap;
-        this.collocated = collocated;
-        this.head = head;
-        this.tail = tail;
-        this.rmvIdxs = rmvIdxs;
-    }
-
-    /**
-     * @return Queue unique ID.
-     */
-    public IgniteUuid id() {
-        return id;
-    }
-
-    /**
-     * @return Capacity.
-     */
-    public int capacity() {
-        return cap;
-    }
-
-    /**
-     * @return Queue collocation flag.
-     */
-    public boolean collocated() {
-        return collocated;
-    }
-
-    /**
-     * @return Head index.
-     */
-    public long head() {
-        return head;
-    }
-
-    /**
-     * @return Tail index.
-     */
-    public long tail() {
-        return tail;
-    }
-
-    /**
-     * @return {@code True} if queue is bounded.
-     */
-    public boolean bounded() {
-        return cap < Integer.MAX_VALUE;
-    }
-
-    /**
-     * @return {@code True} if queue is empty.
-     */
-    public boolean empty() {
-        return head == tail;
-    }
-
-    /**
-     * @return {@code True} if queue is full.
-     */
-    public boolean full() {
-        return bounded() && size() == capacity();
-    }
-
-    /**
-     * @return Queue size.
-     */
-    public int size() {
-        int rmvSize = F.isEmpty(removedIndexes()) ? 0 : removedIndexes().size();
-
-        int size = (int)(tail() - head() - rmvSize);
-
-        assert size >= 0 : size;
-
-        return size;
-    }
-
-    /**
-     * @return Indexes of removed items.
-     */
-    @Nullable public Set<Long> removedIndexes() {
-        return rmvIdxs;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeGridUuid(out, id);
-        out.writeInt(cap);
-        out.writeBoolean(collocated);
-        out.writeLong(head);
-        out.writeLong(tail);
-        out.writeBoolean(rmvIdxs != null);
-
-        if (rmvIdxs != null) {
-            out.writeInt(rmvIdxs.size());
-
-            for (Long idx : rmvIdxs)
-                out.writeLong(idx);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        id = U.readGridUuid(in);
-        cap = in.readInt();
-        collocated = in.readBoolean();
-        head = in.readLong();
-        tail = in.readLong();
-
-        if (in.readBoolean()) {
-            int size = in.readInt();
-
-            rmvIdxs = new HashSet<>();
-
-            for (int i = 0; i < size; i++)
-                rmvIdxs.add(in.readLong());
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheQueueHeader.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c10a6c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueHeaderKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueHeaderKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueHeaderKey.java
deleted file mode 100644
index 74fac46..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueHeaderKey.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.datastructures;
-
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Queue header key.
- */
-public class GridCacheQueueHeaderKey implements Externalizable, GridCacheInternal {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private String name;
-
-    /**
-     * Required by {@link Externalizable}.
-     */
-    public GridCacheQueueHeaderKey() {
-        // No-op.
-    }
-
-    /**
-     * @param name Queue name.
-     */
-    public GridCacheQueueHeaderKey(String name) {
-        this.name = name;
-    }
-
-    /**
-     * @return Queue name.
-     */
-    public String queueName() {
-        return name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeString(out, name);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        name = U.readString(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        GridCacheQueueHeaderKey queueKey = (GridCacheQueueHeaderKey)o;
-
-        return name.equals(queueKey.name);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return name.hashCode();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheQueueHeaderKey.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c10a6c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueItemKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueItemKey.java
deleted file mode 100644
index a23228d..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueItemKey.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.datastructures;
-
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Queue item key.
- */
-class GridCacheQueueItemKey implements Externalizable, GridCacheInternal {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private IgniteUuid queueId;
-
-    /** */
-    private String queueName;
-
-    /** */
-    private long idx;
-
-    /**
-     * Required by {@link Externalizable}.
-     */
-    public GridCacheQueueItemKey() {
-        // No-op.
-    }
-
-    /**
-     * @param queueId Queue unique ID.
-     * @param queueName Queue name.
-     * @param idx Item index.
-     */
-    GridCacheQueueItemKey(IgniteUuid queueId, String queueName, long idx) {
-        this.queueId = queueId;
-        this.queueName = queueName;
-        this.idx = idx;
-    }
-
-    /**
-     * @return Item index.
-     */
-    public Long index() {
-        return idx;
-    }
-
-    /**
-     * @return Queue UUID.
-     */
-    public IgniteUuid queueId() {
-        return queueId;
-    }
-
-    /**
-     * @return Queue name.
-     */
-    public String queueName() {
-        return queueName;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeGridUuid(out, queueId);
-        U.writeString(out, queueName);
-        out.writeLong(idx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        queueId = U.readGridUuid(in);
-        queueName = U.readString(in);
-        idx = in.readLong();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        GridCacheQueueItemKey itemKey = (GridCacheQueueItemKey)o;
-
-        return idx == itemKey.idx && queueId.equals(itemKey.queueId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        int result = queueId.hashCode();
-
-        result = 31 * result + (int)(idx ^ (idx >>> 32));
-
-        return result;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheQueueItemKey.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c10a6c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueProxy.java
deleted file mode 100644
index 03415a7..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueProxy.java
+++ /dev/null
@@ -1,746 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.datastructures;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.datastructures.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Cache queue proxy.
- */
-public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Deserialization stash. */
-    private static final ThreadLocal<IgniteBiTuple<GridCacheContext, String>> stash =
-        new ThreadLocal<IgniteBiTuple<GridCacheContext, String>>() {
-            @Override protected IgniteBiTuple<GridCacheContext, String> initialValue() {
-                return F.t2();
-            }
-        };
-
-    /** Delegate queue. */
-    private GridCacheQueueAdapter<T> delegate;
-
-    /** Cache context. */
-    private GridCacheContext cctx;
-
-    /** Cache gateway. */
-    private GridCacheGateway gate;
-
-    /**
-     * Required by {@link Externalizable}.
-     */
-    public GridCacheQueueProxy() {
-        // No-op.
-    }
-
-    /**
-     * @param cctx Cache context.
-     * @param delegate Delegate queue.
-     */
-    public GridCacheQueueProxy(GridCacheContext cctx, GridCacheQueueAdapter<T> delegate) {
-        this.cctx = cctx;
-        this.delegate = delegate;
-
-        gate = cctx.gate();
-    }
-
-    /**
-     * @return Delegate queue.
-     */
-    public GridCacheQueueAdapter<T> delegate() {
-        return delegate;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean add(final T item) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<Boolean>() {
-                    @Override public Boolean call() throws Exception {
-                        return delegate.add(item);
-                    }
-                }, cctx);
-
-            return delegate.add(item);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean offer(final T item) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<Boolean>() {
-                    @Override public Boolean call() throws Exception {
-                        return delegate.offer(item);
-                    }
-                }, cctx);
-
-            return delegate.offer(item);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean addAll(final Collection<? extends T> items) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<Boolean>() {
-                    @Override public Boolean call() throws Exception {
-                        return delegate.addAll(items);
-                    }
-                }, cctx);
-
-            return delegate.addAll(items);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("SuspiciousMethodCalls")
-    @Override public boolean contains(final Object item) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<Boolean>() {
-                    @Override public Boolean call() throws Exception {
-                        return delegate.contains(item);
-                    }
-                }, cctx);
-
-            return delegate.contains(item);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean containsAll(final Collection<?> items) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<Boolean>() {
-                    @Override public Boolean call() throws Exception {
-                        return delegate.containsAll(items);
-                    }
-                }, cctx);
-
-            return delegate.containsAll(items);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void clear() {
-        gate.enter();
-
-        try {
-            if (cctx.transactional()) {
-                CU.outTx(new Callable<Void>() {
-                    @Override public Void call() throws Exception {
-                        delegate.clear();
-
-                        return null;
-                    }
-                }, cctx);
-            }
-            else
-                delegate.clear();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("SuspiciousMethodCalls")
-    @Override public boolean remove(final Object item) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<Boolean>() {
-                    @Override public Boolean call() throws Exception {
-                        return delegate.remove(item);
-                    }
-                }, cctx);
-
-            return delegate.remove(item);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean removeAll(final Collection<?> items) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<Boolean>() {
-                    @Override public Boolean call() throws Exception {
-                        return delegate.removeAll(items);
-                    }
-                }, cctx);
-
-            return delegate.removeAll(items);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isEmpty() {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<Boolean>() {
-                    @Override public Boolean call() throws Exception {
-                        return delegate.isEmpty();
-                    }
-                }, cctx);
-
-            return delegate.isEmpty();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterator<T> iterator() {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<Iterator<T>>() {
-                    @Override public Iterator<T> call() throws Exception {
-                        return delegate.iterator();
-                    }
-                }, cctx);
-
-            return delegate.iterator();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Object[] toArray() {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<Object[]>() {
-                    @Override public Object[] call() throws Exception {
-                        return delegate.toArray();
-                    }
-                }, cctx);
-
-            return delegate.toArray();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("SuspiciousToArrayCall")
-    @Override public <T1> T1[] toArray(final T1[] a) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<T1[]>() {
-                    @Override public T1[] call() throws Exception {
-                        return delegate.toArray(a);
-                    }
-                }, cctx);
-
-            return delegate.toArray(a);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean retainAll(final Collection<?> items) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<Boolean>() {
-                    @Override public Boolean call() throws Exception {
-                        return delegate.retainAll(items);
-                    }
-                }, cctx);
-
-            return delegate.retainAll(items);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<Integer>() {
-                    @Override public Integer call() throws Exception {
-                        return delegate.size();
-                    }
-                }, cctx);
-
-            return delegate.size();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public T poll() {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<T>() {
-                    @Override public T call() throws Exception {
-                        return delegate.poll();
-                    }
-                }, cctx);
-
-            return delegate.poll();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public T peek() {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<T>() {
-                    @Override public T call() throws Exception {
-                        return delegate.peek();
-                    }
-                }, cctx);
-
-            return delegate.peek();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void clear(final int batchSize) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional()) {
-                CU.outTx(new Callable<Void>() {
-                    @Override public Void call() throws Exception {
-                        delegate.clear(batchSize);
-
-                        return null;
-                    }
-                }, cctx);
-            }
-            else
-                delegate.clear(batchSize);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public int remainingCapacity() {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<Integer>() {
-                    @Override public Integer call() throws Exception {
-                        return delegate.remainingCapacity();
-                    }
-                }, cctx);
-
-            return delegate.remainingCapacity();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public int drainTo(final Collection<? super T> c) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<Integer>() {
-                    @Override public Integer call() throws Exception {
-                        return delegate.drainTo(c);
-                    }
-                }, cctx);
-
-            return delegate.drainTo(c);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public int drainTo(final Collection<? super T> c, final int maxElements) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<Integer>() {
-                    @Override public Integer call() throws Exception {
-                        return delegate.drainTo(c, maxElements);
-                    }
-                }, cctx);
-
-            return delegate.drainTo(c, maxElements);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public T remove() {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<T>() {
-                    @Override public T call() throws Exception {
-                        return delegate.remove();
-                    }
-                }, cctx);
-
-            return delegate.remove();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public T element() {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<T>() {
-                    @Override public T call() throws Exception {
-                        return delegate.element();
-                    }
-                }, cctx);
-
-            return delegate.element();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void put(final T item) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional()) {
-                CU.outTx(new Callable<Void>() {
-                    @Override public Void call() throws Exception {
-                        delegate.put(item);
-
-                        return null;
-                    }
-                }, cctx);
-            }
-            else
-                delegate.put(item);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean offer(final T item, final long timeout, final TimeUnit unit) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<Boolean>() {
-                    @Override public Boolean call() throws Exception {
-                        return delegate.offer(item, timeout, unit);
-                    }
-                }, cctx);
-
-            return delegate.offer(item, timeout, unit);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public T take() {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<T>() {
-                    @Override public T call() throws Exception {
-                        return delegate.take();
-                    }
-                }, cctx);
-
-            return delegate.take();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public T poll(final long timeout, final TimeUnit unit) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<T>() {
-                    @Override public T call() throws Exception {
-                        return delegate.poll(timeout, unit);
-                    }
-                }, cctx);
-
-            return delegate.poll(timeout, unit);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String name() {
-        return delegate.name();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int capacity() throws IgniteCheckedException {
-        return delegate.capacity();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean bounded() throws IgniteCheckedException {
-        return delegate.bounded();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean collocated() throws IgniteCheckedException {
-        return delegate.collocated();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean removed() {
-        return delegate.removed();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return delegate.hashCode();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        GridCacheQueueProxy that = (GridCacheQueueProxy)o;
-
-        return delegate.equals(that.delegate);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeObject(cctx);
-        U.writeString(out, name());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        IgniteBiTuple<GridCacheContext, String> t = stash.get();
-
-        t.set1((GridCacheContext)in.readObject());
-        t.set2(U.readString(in));
-    }
-
-    /**
-     * Reconstructs object on unmarshalling.
-     *
-     * @return Reconstructed object.
-     * @throws ObjectStreamException Thrown in case of unmarshalling error.
-     */
-    protected Object readResolve() throws ObjectStreamException {
-        try {
-            IgniteBiTuple<GridCacheContext, String> t = stash.get();
-
-            return t.get1().dataStructures().queue(t.get2(), 0, false, false);
-        }
-        catch (IgniteCheckedException e) {
-            throw U.withCause(new InvalidObjectException(e.getMessage()), e);
-        }
-        finally {
-            stash.remove();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return delegate.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c10a6c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheRemovable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheRemovable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheRemovable.java
deleted file mode 100644
index e43e71e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheRemovable.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.datastructures;
-
-import org.jetbrains.annotations.*;
-
-/**
- * Provides callback for marking object as removed.
- */
-public interface GridCacheRemovable {
-    /**
-     * Set status of data structure as removed.
-     *
-     * @return Current status.
-     */
-    public boolean onRemoved();
-
-    /**
-     * @param err Error which cause data structure to become invalid.
-     */
-    public void onInvalid(@Nullable Exception err);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c10a6c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetHeader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetHeader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetHeader.java
deleted file mode 100644
index 71aaf53..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetHeader.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.datastructures;
-
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Cache set header.
- */
-public class GridCacheSetHeader implements GridCacheInternal, Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private IgniteUuid id;
-
-    /** */
-    private boolean collocated;
-
-    /**
-     * Required by {@link Externalizable}.
-     */
-    public GridCacheSetHeader() {
-        // No-op.
-    }
-
-    /**
-     * @param id Set UUID.
-     * @param collocated Collocation flag.
-     */
-    public GridCacheSetHeader(IgniteUuid id, boolean collocated) {
-        this.id = id;
-        this.collocated = collocated;
-    }
-
-    /**
-     * @return Set unique ID.
-     */
-    public IgniteUuid id() {
-        return id;
-    }
-
-    /**
-     * @return Collocation flag.
-     */
-    public boolean collocated() {
-        return collocated;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeGridUuid(out, id);
-        out.writeBoolean(collocated);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        id = U.readGridUuid(in);
-        collocated = in.readBoolean();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheSetHeader.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c10a6c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetHeaderKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetHeaderKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetHeaderKey.java
deleted file mode 100644
index eea5e42..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetHeaderKey.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.datastructures;
-
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Set header key.
- */
-public class GridCacheSetHeaderKey implements Externalizable, GridCacheInternal {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private String name;
-
-    /**
-     * Required by {@link Externalizable}.
-     */
-    public GridCacheSetHeaderKey() {
-        // No-op.
-    }
-
-    /**
-     * @param name Set name.
-     */
-    public GridCacheSetHeaderKey(String name) {
-        this.name = name;
-    }
-
-    /**
-     * @return Set name.
-     */
-    public String name() {
-        return name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeString(out, name);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        name = U.readString(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        GridCacheSetHeaderKey setKey = (GridCacheSetHeaderKey)o;
-
-        return name.equals(setKey.name);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return name.hashCode();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheSetHeaderKey.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c10a6c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetImpl.java
deleted file mode 100644
index b3f8f19..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetImpl.java
+++ /dev/null
@@ -1,576 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.datastructures;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.cache.datastructures.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.internal.processors.cache.query.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*;
-
-/**
- * Cache set implementation.
- */
-public class GridCacheSetImpl<T> extends AbstractCollection<T> implements IgniteSet<T> {
-    /** */
-    private static final int BATCH_SIZE = 100;
-
-    /** Cache context. */
-    private final GridCacheContext ctx;
-
-    /** Cache. */
-    private final GridCache<GridCacheSetItemKey, Boolean> cache;
-
-    /** Set name. */
-    private final String name;
-
-    /** Set unique ID. */
-    private final IgniteUuid id;
-
-    /** Collocation flag. */
-    private final boolean collocated;
-
-    /** Queue header partition. */
-    private final int hdrPart;
-
-    /** Removed flag. */
-    private volatile boolean rmvd;
-
-    /**
-     * @param ctx Cache context.
-     * @param name Set name.
-     * @param hdr Set header.
-     */
-    @SuppressWarnings("unchecked")
-    public GridCacheSetImpl(GridCacheContext ctx, String name, GridCacheSetHeader hdr) {
-        this.ctx = ctx;
-        this.name = name;
-        id = hdr.id();
-        collocated = hdr.collocated();
-
-        cache = ctx.cache();
-
-        hdrPart = ctx.affinity().partition(new GridCacheSetHeaderKey(name));
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean collocated() {
-        return collocated;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String name() {
-        return name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean removed() {
-        return rmvd;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public int size() {
-        try {
-            onAccess();
-
-            if (ctx.isLocal() || ctx.isReplicated()) {
-                GridConcurrentHashSet<GridCacheSetItemKey> set = ctx.dataStructures().setData(id);
-
-                return set != null ? set.size() : 0;
-            }
-
-            CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, null,
-                new GridSetQueryPredicate<>(id, collocated), false, false);
-
-            Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion());
-
-            qry.projection(ctx.grid().forNodes(nodes));
-
-            Iterable<Integer> col = (Iterable<Integer>) qry.execute(new SumReducer()).get();
-
-            int sum = 0;
-
-            for (Integer val : col)
-                sum += val;
-
-            return sum;
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public boolean isEmpty() {
-        onAccess();
-
-        GridConcurrentHashSet<GridCacheSetItemKey> set = ctx.dataStructures().setData(id);
-
-        return (set == null || set.isEmpty()) && size() == 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean contains(Object o) {
-        onAccess();
-
-        final GridCacheSetItemKey key = itemKey(o);
-
-        return retry(new Callable<Boolean>() {
-            @Override public Boolean call() throws Exception {
-                return cache.get(key) != null;
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean add(T o) {
-        onAccess();
-
-        final GridCacheSetItemKey key = itemKey(o);
-
-        return retry(new Callable<Boolean>() {
-            @Override public Boolean call() throws Exception {
-                return cache.putxIfAbsent(key, true);
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean remove(Object o) {
-        onAccess();
-
-        final GridCacheSetItemKey key = itemKey(o);
-
-        return retry(new Callable<Boolean>() {
-            @Override public Boolean call() throws Exception {
-                return cache.removex(key);
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean containsAll(Collection<?> c) {
-        for (Object obj : c) {
-            if (!contains(obj))
-                return false;
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean addAll(Collection<? extends T> c) {
-        onAccess();
-
-        boolean add = false;
-
-        Map<GridCacheSetItemKey, Boolean> addKeys = null;
-
-        for (T obj : c) {
-            if (add) {
-                if (addKeys == null)
-                    addKeys = U.newHashMap(BATCH_SIZE);
-
-                addKeys.put(itemKey(obj), true);
-
-                if (addKeys.size() == BATCH_SIZE) {
-                    retryPutAll(addKeys);
-
-                    addKeys.clear();
-                }
-            }
-            else
-                add |= add(obj);
-        }
-
-        if (!F.isEmpty(addKeys))
-            retryPutAll(addKeys);
-
-        return add;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean removeAll(Collection<?> c) {
-        onAccess();
-
-        boolean rmv = false;
-
-        Set<GridCacheSetItemKey> rmvKeys = null;
-
-        for (Object obj : c) {
-            if (rmv) {
-                if (rmvKeys == null)
-                    rmvKeys = U.newHashSet(BATCH_SIZE);
-
-                rmvKeys.add(itemKey(obj));
-
-                if (rmvKeys.size() == BATCH_SIZE) {
-                    retryRemoveAll(rmvKeys);
-
-                    rmvKeys.clear();
-                }
-            }
-            else
-                rmv |= remove(obj);
-        }
-
-        if (!F.isEmpty(rmvKeys))
-            retryRemoveAll(rmvKeys);
-
-        return rmv;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean retainAll(Collection<?> c) {
-        try {
-            onAccess();
-
-            try (GridCloseableIterator<T> iter = iterator0()) {
-                boolean rmv = false;
-
-                Set<GridCacheSetItemKey> rmvKeys = null;
-
-                for (T val : iter) {
-                    if (!c.contains(val)) {
-                        rmv = true;
-
-                        if (rmvKeys == null)
-                            rmvKeys = U.newHashSet(BATCH_SIZE);
-
-                        rmvKeys.add(itemKey(val));
-
-                        if (rmvKeys.size() == BATCH_SIZE) {
-                            retryRemoveAll(rmvKeys);
-
-                            rmvKeys.clear();
-                        }
-                    }
-                }
-
-                if (!F.isEmpty(rmvKeys))
-                    retryRemoveAll(rmvKeys);
-
-                return rmv;
-            }
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void clear() {
-        try {
-            onAccess();
-
-            try (GridCloseableIterator<T> iter = iterator0()) {
-                Collection<GridCacheSetItemKey> rmvKeys = new ArrayList<>(BATCH_SIZE);
-
-                for (T val : iter) {
-                    rmvKeys.add(itemKey(val));
-
-                    if (rmvKeys.size() == BATCH_SIZE) {
-                        retryRemoveAll(rmvKeys);
-
-                        rmvKeys.clear();
-                    }
-                }
-
-                if (!rmvKeys.isEmpty())
-                    retryRemoveAll(rmvKeys);
-            }
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterator<T> iterator() {
-        onAccess();
-
-        return iterator0();
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    private GridCloseableIterator<T> iterator0() {
-        try {
-            CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, null,
-                new GridSetQueryPredicate<>(id, collocated), false, false);
-
-            Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion());
-
-            qry.projection(ctx.grid().forNodes(nodes));
-
-            CacheQueryFuture<Map.Entry<T, ?>> fut = qry.execute();
-
-            CacheWeakQueryIteratorsHolder.WeakQueryFutureIterator it =
-                ctx.itHolder().iterator(fut, new CacheIteratorConverter<T, Map.Entry<T, ?>>() {
-                    @Override protected T convert(Map.Entry<T, ?> e) {
-                        return e.getKey();
-                    }
-
-                    @Override protected void remove(T item) {
-                        GridCacheSetImpl.this.remove(item);
-                    }
-                });
-
-            if (rmvd) {
-                ctx.itHolder().removeIterator(it);
-
-                checkRemoved();
-            }
-
-            return it;
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-    }
-    /**
-     * @param call Callable.
-     * @return Callable result.
-     */
-    private <R> R retry(Callable<R> call) {
-        try {
-            return (R)ctx.dataStructures().retry(call);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /**
-     * @param keys Keys to remove.
-     */
-    private void retryRemoveAll(final Collection<GridCacheSetItemKey> keys) {
-        retry(new Callable<Void>() {
-            @Override public Void call() throws Exception {
-                cache.removeAll(keys);
-
-                return null;
-            }
-        });
-    }
-
-    /**
-     * @param keys Keys to remove.
-     */
-    private void retryPutAll(final Map<GridCacheSetItemKey, Boolean> keys) {
-        retry(new Callable<Void>() {
-            @Override public Void call() throws Exception {
-                cache.putAll(keys);
-
-                return null;
-            }
-        });
-    }
-
-    /**
-     * @param topVer Topology version.
-     * @return Nodes where set data request should be sent.
-     * @throws IgniteCheckedException If all cache nodes left grid.
-     */
-    @SuppressWarnings("unchecked")
-    private Collection<ClusterNode> dataNodes(long topVer) throws IgniteCheckedException {
-        if (ctx.isLocal() || ctx.isReplicated())
-            return Collections.singleton(ctx.localNode());
-
-        Collection<ClusterNode> nodes;
-
-        if (collocated) {
-            List<ClusterNode> nodes0 = ctx.affinity().nodes(hdrPart, topVer);
-
-            nodes = !nodes0.isEmpty() ?
-                Collections.singleton(nodes0.contains(ctx.localNode()) ? ctx.localNode() : F.first(nodes0)) : nodes0;
-        }
-        else
-            nodes = CU.affinityNodes(ctx, topVer);
-
-        if (nodes.isEmpty())
-            throw new IgniteCheckedException("Failed to get set data, all cache nodes left grid.");
-
-        return nodes;
-    }
-
-    /**
-     * @param rmvd Removed flag.
-     */
-    void removed(boolean rmvd) {
-        if (this.rmvd)
-            return;
-
-        this.rmvd = rmvd;
-
-        if (rmvd)
-            ctx.itHolder().clearQueries();
-    }
-
-    /**
-     * Throws {@link org.apache.ignite.cache.datastructures.CacheDataStructureRemovedRuntimeException} if set was removed.
-     */
-    private void checkRemoved() {
-        if (rmvd)
-            throw new CacheDataStructureRemovedRuntimeException("Set has been removed from cache: " + this);
-    }
-
-    /**
-     * Checks if set was removed and handles iterators weak reference queue.
-     */
-    private void onAccess() {
-        ctx.itHolder().checkWeakQueue();
-
-        checkRemoved();
-    }
-
-    /**
-     * @return Set ID.
-     */
-    IgniteUuid id() {
-        return id;
-    }
-
-    /**
-     * @return Cache context.
-     */
-    GridCacheContext context() {
-        return ctx;
-    }
-
-    /**
-     * @param item Set item.
-     * @return Item key.
-     */
-    private GridCacheSetItemKey itemKey(Object item) {
-        return collocated ? new CollocatedItemKey(name, id, item) : new GridCacheSetItemKey(id, item);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheSetImpl.class, this);
-    }
-
-    /**
-     *
-     */
-    private static class SumReducer implements IgniteReducer<Object, Integer>, Externalizable {
-        /** */
-        private static final long serialVersionUID = -3436987759126521204L;
-
-        /** */
-        private int cntr;
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public SumReducer() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean collect(@Nullable Object o) {
-            cntr++;
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Integer reduce() {
-            return cntr;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            // No-op.
-        }
-    }
-
-    /**
-     * Item key for collocated set.
-     */
-    private static class CollocatedItemKey extends GridCacheSetItemKey {
-        /** */
-        private static final long serialVersionUID = -1400701398705953750L;
-
-        /** */
-        private String setName;
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public CollocatedItemKey() {
-            // No-op.
-        }
-
-        /**
-         * @param setName Set name.
-         * @param setId Set unique ID.
-         * @param item Set item.
-         */
-        private CollocatedItemKey(String setName, IgniteUuid setId, Object item) {
-            super(setId, item);
-
-            this.setName = setName;
-        }
-
-        /**
-         * @return Item affinity key.
-         */
-        @CacheAffinityKeyMapped
-        public Object affinityKey() {
-            return setName;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            super.writeExternal(out);
-
-            U.writeString(out, setName);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            super.readExternal(in);
-
-            setName = U.readString(in);
-        }
-    }
-}


Mime
View raw message