ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject [3/9] incubator-ignite git commit: sp-2 streaming cleanup
Date Thu, 19 Mar 2015 16:50:27 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeBatchWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeBatchWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeBatchWindow.java
deleted file mode 100644
index 9132972..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeBatchWindow.java
+++ /dev/null
@@ -1,804 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.window;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.streamer.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
-
-/**
- * Window that is bounded by size and accumulates events to batches.
- */
-public class StreamerBoundedSizeBatchWindow<E> extends StreamerWindowAdapter<E> {
-    /** Max size. */
-    private int batchSize;
-
-    /** Min size. */
-    private int maxBatches;
-
-    /** Reference for queue and size. */
-    private volatile QueueHolder holder;
-
-    /** Enqueue lock. */
-    private ReadWriteLock enqueueLock = new ReentrantReadWriteLock();
-
-    /**
-     * Gets maximum number of batches can be stored in window.
-     *
-     * @return Maximum number of batches for window.
-     */
-    public int getMaximumBatches() {
-        return maxBatches;
-    }
-
-    /**
-     * Sets maximum number of batches can be stored in window.
-     *
-     * @param maxBatches Maximum number of batches for window.
-     */
-    public void setMaximumBatches(int maxBatches) {
-        this.maxBatches = maxBatches;
-    }
-
-    /**
-     * Gets batch size.
-     *
-     * @return Batch size.
-     */
-    public int getBatchSize() {
-        return batchSize;
-    }
-
-    /**
-     * Sets batch size.
-     *
-     * @param batchSize Batch size.
-     */
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void checkConfiguration() {
-        if (batchSize <= 0)
-            throw new IgniteException("Failed to initialize window (batchSize size must be positive) " +
-                "[windowClass=" + getClass().getSimpleName() +
-                ", maximumBatches=" + maxBatches +
-                ", batchSize=" + batchSize + ']');
-
-        if (maxBatches < 0)
-            throw new IgniteException("Failed to initialize window (maximumBatches cannot be negative) " +
-                "[windowClass=" + getClass().getSimpleName() +
-                ", maximumBatches=" + maxBatches +
-                ", batchSize=" + batchSize + ']');
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void stop0() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void reset0() {
-        ConcurrentLinkedDeque8<Batch> first = new ConcurrentLinkedDeque8<>();
-
-        Batch b = new Batch(batchSize);
-
-        ConcurrentLinkedDeque8.Node<Batch> n = first.offerLastx(b);
-
-        b.node(n);
-
-        holder = new QueueHolder(first, new AtomicInteger(1), new AtomicInteger());
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        return holder.totalQueueSize().get();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected GridStreamerWindowIterator<E> iterator0() {
-        final QueueHolder win = holder;
-
-        final Iterator<Batch> batchIt = win.batchQueue().iterator();
-
-        return new GridStreamerWindowIterator<E>() {
-            /** Current batch iterator. */
-            private ConcurrentLinkedDeque8.IteratorEx<E> curBatchIt;
-
-            /** Next batch iterator. Will be null if no more batches available. */
-            private ConcurrentLinkedDeque8.IteratorEx<E> nextBatchIt;
-
-            /** Last returned value. */
-            private E lastRet;
-
-            {
-                curBatchIt = batchIt.hasNext() ? batchIt.next().iterator() : null;
-            }
-
-            /** {@inheritDoc} */
-            @SuppressWarnings("SimplifiableIfStatement")
-            @Override public boolean hasNext() {
-                if (curBatchIt != null) {
-                    if (curBatchIt.hasNext())
-                        return true;
-
-                    return nextBatchIt != null && nextBatchIt.hasNext();
-                }
-                else
-                    return false;
-            }
-
-            /** {@inheritDoc} */
-            @Override public E next() {
-                if (curBatchIt == null)
-                    throw new NoSuchElementException();
-
-                if (!curBatchIt.hasNext()) {
-                    if (nextBatchIt != null) {
-                        curBatchIt = nextBatchIt;
-
-                        nextBatchIt = null;
-
-                        lastRet = curBatchIt.next();
-                    }
-                    else
-                        throw new NoSuchElementException();
-                }
-                else {
-                    E next = curBatchIt.next();
-
-                    // Moved to last element in batch - check for next iterator.
-                    if (!curBatchIt.hasNext())
-                        advanceBatch();
-
-                    lastRet = next;
-                }
-
-                return lastRet;
-            }
-
-            /** {@inheritDoc} */
-            @Nullable @Override public E removex() {
-                if (curBatchIt == null)
-                    throw new NoSuchElementException();
-
-                if (curBatchIt.removex()) {
-                    // Decrement global size if deleted.
-                    win.totalQueueSize().decrementAndGet();
-
-                    return lastRet;
-                }
-                else
-                    return null;
-            }
-
-            /**
-             * Moves to the next batch.
-             */
-            private void advanceBatch() {
-                if (batchIt.hasNext()) {
-                    Batch batch = batchIt.next();
-
-                    nextBatchIt = batch.iterator();
-                }
-                else
-                    nextBatchIt = null;
-            }
-        };
-    }
-
-    /** {@inheritDoc} */
-    @Override public int evictionQueueSize() {
-        QueueHolder win = holder;
-
-        int oversizeCnt = Math.max(0, win.batchQueueSize().get() - maxBatches);
-
-        Iterator<Batch> it = win.batchQueue().iterator();
-
-        int size = 0;
-
-        int idx = 0;
-
-        while (it.hasNext()) {
-            Batch batch = it.next();
-
-            if (idx++ < oversizeCnt)
-                size += batch.size();
-        }
-
-        return size;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean enqueue0(E evt) {
-        try {
-            return enqueueInternal(evt);
-        }
-        catch (IgniteInterruptedCheckedException ignored) {
-            return false;
-        }
-    }
-
-    /**
-     * Enqueue event to window.
-     *
-     * @param evt Event to add.
-     * @return {@code True} if event was added.
-     *
-     * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread was interrupted.
-     */
-    @SuppressWarnings("LockAcquiredButNotSafelyReleased")
-    private boolean enqueueInternal(E evt) throws IgniteInterruptedCheckedException {
-        QueueHolder tup = holder;
-
-        ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
-        AtomicInteger size = tup.batchQueueSize();
-
-        while (true) {
-            Batch last = evts.peekLast();
-
-            if (last == null || !last.add(evt)) {
-                // This call will ensure that last object is actually added to batch
-                // before we add new batch to events queue.
-                // If exception is thrown here, window will be left in consistent state.
-                if (last != null)
-                    last.finish();
-
-                // Add new batch to queue in write lock.
-                if (enqueueLock.writeLock().tryLock()) {
-                    try {
-                        Batch first0 = evts.peekLast();
-
-                        if (first0 == last) {
-                            Batch batch = new Batch(batchSize);
-
-                            ConcurrentLinkedDeque8.Node<Batch> node = evts.offerLastx(batch);
-
-                            batch.node(node);
-
-                            size.incrementAndGet();
-
-                            if (batch.removed() && evts.unlinkx(node))
-                                size.decrementAndGet();
-                        }
-                    }
-                    finally {
-                        enqueueLock.writeLock().unlock();
-                    }
-                }
-                else {
-                    // Acquire read lock to wait for batch enqueue.
-                    enqueueLock.readLock().lock();
-
-                    enqueueLock.readLock().unlock();
-                }
-            }
-            else {
-                // Event was added, global size increment.
-                tup.totalQueueSize().incrementAndGet();
-
-                return true;
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Collection<E> pollEvicted0(int cnt) {
-        QueueHolder tup = holder;
-
-        ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
-        AtomicInteger size = tup.batchQueueSize();
-
-        Collection<E> res = new ArrayList<>(cnt);
-
-        while (true) {
-            int curSize = size.get();
-
-            if (curSize > maxBatches) {
-                // Just peek the first batch.
-                Batch first = evts.peekFirst();
-
-                if (first != null) {
-                    assert first.finished();
-
-                    Collection<E> polled = first.pollNonBatch(cnt - res.size());
-
-                    if (!polled.isEmpty())
-                        res.addAll(polled);
-
-                    if (first.isEmpty()) {
-                        ConcurrentLinkedDeque8.Node<Batch> node = first.node();
-
-                        first.markRemoved();
-
-                        if (node != null && evts.unlinkx(node))
-                            size.decrementAndGet();
-                    }
-
-                    if (res.size() == cnt)
-                        break;
-                }
-                else
-                    break;
-            }
-            else
-                break;
-        }
-
-        // Removed entries, update global size.
-        tup.totalQueueSize().addAndGet(-res.size());
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Collection<E> pollEvictedBatch0() {
-        QueueHolder tup = holder;
-
-        ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
-        AtomicInteger size = tup.batchQueueSize();
-
-        while (true) {
-            int curSize = size.get();
-
-            if (curSize > maxBatches) {
-                if (size.compareAndSet(curSize, curSize - 1)) {
-                    Batch polled = evts.poll();
-
-                    if (polled != null) {
-                        assert polled.finished();
-
-                        // Mark batch deleted for consistency.
-                        polled.markRemoved();
-
-                        Collection<E> polled0 = polled.shrink();
-
-                        // Result of shrink is empty, must retry the poll.
-                        if (!polled0.isEmpty()) {
-                            // Update global size.
-                            tup.totalQueueSize().addAndGet(-polled0.size());
-
-                            return polled0;
-                        }
-                    }
-                    else {
-                        // Polled was zero, so we must restore counter and return.
-                        size.incrementAndGet();
-
-                        return Collections.emptyList();
-                    }
-                }
-            }
-            else
-                return Collections.emptyList();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Collection<E> dequeue0(int cnt) {
-        QueueHolder tup = holder;
-
-        ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
-        AtomicInteger size = tup.batchQueueSize();
-
-        Collection<E> res = new ArrayList<>(cnt);
-
-        while (true) {
-            // Just peek the first batch.
-            Batch first = evts.peekFirst();
-
-            if (first != null) {
-                Collection<E> polled = first.pollNonBatch(cnt - res.size());
-
-                // We must check for finished before unlink as no elements
-                // can be added to batch after it is finished.
-                if (first.isEmpty() && first.emptyFinished()) {
-                    ConcurrentLinkedDeque8.Node<Batch> node = first.node();
-
-                    first.markRemoved();
-
-                    if (node != null && evts.unlinkx(node))
-                        size.decrementAndGet();
-
-                    assert first.isEmpty();
-                }
-                else if (polled.isEmpty())
-                    break;
-
-                res.addAll(polled);
-
-                if (res.size() == cnt)
-                    break;
-            }
-            else
-                break;
-        }
-
-        // Update global size.
-        tup.totalQueueSize().addAndGet(-res.size());
-
-        return res;
-    }
-
-    /**
-     * Consistency check, used for testing.
-     */
-    void consistencyCheck() {
-        QueueHolder win = holder;
-
-        Iterator<E> it = iterator();
-
-        int cnt = 0;
-
-        while (it.hasNext()) {
-            it.next();
-
-            cnt++;
-        }
-
-        int cnt0 = 0;
-
-        for (Batch batch : win.batchQueue())
-            cnt0 += batch.size();
-
-        int sz = size();
-
-        assert cnt0 == sz : "Batch size comparison failed [batchCnt=" + cnt0 + ", size=" + sz + ']';
-        assert cnt == sz : "Queue size comparison failed [iterCnt=" + cnt + ", size=" + sz + ']';
-        assert win.batchQueue().size() == win.batchQueueSize().get();
-    }
-
-    /**
-     * Window structure.
-     */
-    private class QueueHolder extends GridTuple3<ConcurrentLinkedDeque8<Batch>, AtomicInteger, AtomicInteger> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * Empty constructor required by {@link Externalizable}.
-         */
-        public QueueHolder() {
-            // No-op.
-        }
-
-        /**
-         * @param batchQueue Batch queue.
-         * @param batchQueueSize Batch queue size counter.
-         * @param globalSize Global size counter.
-         */
-        private QueueHolder(ConcurrentLinkedDeque8<Batch> batchQueue,
-            AtomicInteger batchQueueSize, @Nullable AtomicInteger globalSize) {
-            super(batchQueue, batchQueueSize, globalSize);
-
-            assert batchQueue.size() == 1;
-            assert batchQueueSize.get() == 1;
-        }
-
-        /**
-         * @return Events queue.
-         */
-        @SuppressWarnings("ConstantConditions")
-        public ConcurrentLinkedDeque8<Batch> batchQueue() {
-            return get1();
-        }
-
-        /**
-         * @return Batch queue size.
-         */
-        @SuppressWarnings("ConstantConditions")
-        public AtomicInteger batchQueueSize() {
-            return get2();
-        }
-
-        /**
-         * @return Global queue size.
-         */
-        @SuppressWarnings("ConstantConditions")
-        public AtomicInteger totalQueueSize() {
-            return get3();
-        }
-    }
-
-    /**
-     * Batch.
-     */
-    private class Batch extends ReentrantReadWriteLock implements Iterable<E> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Batch events. */
-        private ConcurrentLinkedDeque8<E> evts;
-
-        /** Capacity. */
-        private AtomicInteger cap;
-
-        /** Finished. */
-        private volatile boolean finished;
-
-        /** Queue node. */
-        @GridToStringExclude
-        private ConcurrentLinkedDeque8.Node<Batch> qNode;
-
-        /** Node removed flag. */
-        private volatile boolean rmvd;
-
-        /**
-         * @param batchSize Batch size.
-         */
-        private Batch(int batchSize) {
-            cap = new AtomicInteger(batchSize);
-
-            evts = new ConcurrentLinkedDeque8<>();
-        }
-
-        /**
-         * @return {@code True} if batch is removed.
-         */
-        public boolean removed() {
-            return rmvd;
-        }
-
-        /**
-         * Marks batch as removed.
-         */
-        public void markRemoved() {
-            rmvd = true;
-        }
-
-        /**
-         * Adds event to batch.
-         *
-         * @param evt Event to add.
-         * @return {@code True} if event was added, {@code false} if batch is full.
-         */
-        public boolean add(E evt) {
-            readLock().lock();
-
-            try {
-                if (finished)
-                    return false;
-
-                while (true) {
-                    int size = cap.get();
-
-                    if (size > 0) {
-                        if (cap.compareAndSet(size, size - 1)) {
-                            evts.add(evt);
-
-                            // Will go through write lock and finish batch.
-                            if (size == 1)
-                                finished = true;
-
-                            return true;
-                        }
-                    }
-                    else
-                        return false;
-                }
-            }
-            finally {
-                readLock().unlock();
-            }
-        }
-
-        /**
-         * @return Queue node.
-         */
-        public ConcurrentLinkedDeque8.Node<Batch> node() {
-            return qNode;
-        }
-
-        /**
-         * @param qNode Queue node.
-         */
-        public void node(ConcurrentLinkedDeque8.Node<Batch> qNode) {
-            this.qNode = qNode;
-        }
-
-        /**
-         * Waits for latch count down after last event was added.
-         *
-         * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If wait was interrupted.
-         */
-        public void finish() throws IgniteInterruptedCheckedException {
-            writeLock().lock();
-
-            try {
-                // Safety.
-                assert cap.get() == 0;
-                assert finished;
-            }
-            finally {
-                writeLock().unlock();
-            }
-        }
-
-        /**
-         * @return {@code True} if batch is finished and no more events will be added to it.
-         */
-        public boolean finished() {
-            readLock().lock();
-
-            try {
-                return finished;
-            }
-            finally {
-                readLock().unlock();
-            }
-        }
-
-        /**
-         * Gets batch size.
-         *
-         * @return Batch size.
-         */
-        public int size() {
-            readLock().lock();
-
-            try {
-                return evts == null ? 0 : evts.sizex();
-            }
-            finally {
-                readLock().unlock();
-            }
-        }
-
-        /**
-         * @return {@code True} if batch is empty.
-         */
-        public boolean isEmpty() {
-            readLock().lock();
-
-            try {
-                return evts == null || evts.isEmpty();
-            }
-            finally {
-                readLock().unlock();
-            }
-        }
-
-        /**
-         * Checks if batch is empty and finished inside write lock. This will ensure that no more entries will
-         * be added to batch and it can be safely unlinked from the queue.
-         *
-         * @return {@code True} if batch is empty and finished.
-         */
-        public boolean emptyFinished() {
-            writeLock().lock();
-
-            try {
-                return finished && (evts == null || evts.isEmpty());
-            }
-            finally {
-                writeLock().unlock();
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public ConcurrentLinkedDeque8.IteratorEx<E> iterator() {
-            readLock().lock();
-
-            try {
-                if (evts != null)
-                    return (ConcurrentLinkedDeque8.IteratorEx<E>)evts.iterator();
-
-                return new ConcurrentLinkedDeque8.IteratorEx<E>() {
-                    @Override public boolean removex() {
-                        throw new NoSuchElementException();
-                    }
-
-                    @Override public boolean hasNext() {
-                        return false;
-                    }
-
-                    @Override public E next() {
-                        throw new NoSuchElementException();
-                    }
-
-                    @Override public void remove() {
-                        throw new NoSuchElementException();
-                    }
-                };
-            }
-            finally {
-                readLock().unlock();
-            }
-        }
-
-        /**
-         * Polls up to {@code cnt} objects from batch in concurrent fashion.
-         *
-         * @param cnt Number of objects to poll.
-         * @return Collection of polled elements or empty collection if nothing to poll.
-         */
-        public Collection<E> pollNonBatch(int cnt) {
-            readLock().lock();
-
-            try {
-                if (evts == null)
-                    return Collections.emptyList();
-
-                Collection<E> res = new ArrayList<>(cnt);
-
-                for (int i = 0; i < cnt; i++) {
-                    E evt = evts.poll();
-
-                    if (evt != null)
-                        res.add(evt);
-                    else
-                        return res;
-                }
-
-                return res;
-            }
-            finally {
-                readLock().unlock();
-            }
-        }
-
-        /**
-         * Shrinks this batch. No events can be polled from it after this method.
-         *
-         * @return Collection of events contained in batch before shrink (empty collection in
-         *         case no events were present).
-         */
-        public Collection<E> shrink() {
-            writeLock().lock();
-
-            try {
-                if (evts == null)
-                    return Collections.emptyList();
-
-                // Since iterator can concurrently delete elements, we must poll here.
-                Collection<E> res = new ArrayList<>(evts.sizex());
-
-                E o;
-
-                while ((o = evts.poll()) != null)
-                    res.add(o);
-
-                // Nothing cal be polled after shrink.
-                evts = null;
-
-                return res;
-            }
-            finally {
-                writeLock().unlock();
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            ConcurrentLinkedDeque8<E> evts0 = evts;
-
-            return S.toString(Batch.class, this, "evtQueueSize", evts0 == null ? 0 : evts0.sizex());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeSortedWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeSortedWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeSortedWindow.java
deleted file mode 100644
index f2bfcf3..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeSortedWindow.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.window;
-
-import org.apache.ignite.internal.processors.streamer.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Size-bounded sorted window. Unlike {@link StreamerBoundedSizeWindow}, which limits
- * window only on size, this window also provides events in sorted order.
- */
-public class StreamerBoundedSizeSortedWindow<E>
-    extends StreamerBoundedSizeWindowAdapter<E, StreamerBoundedSizeSortedWindow.Holder<E>> {
-    /** Comparator. */
-    private Comparator<E> comp;
-
-    /** Order counter. */
-    private AtomicLong orderCnt = new AtomicLong();
-
-    /**
-     * Gets event comparator.
-     *
-     * @return Event comparator.
-     */
-    public Comparator<E> getComparator() {
-        return comp;
-    }
-
-    /**
-     * Sets event comparator.
-     *
-     * @param comp Comparator.
-     */
-    public void setComparator(Comparator<E> comp) {
-        this.comp = comp;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override protected Collection<Holder<E>> newCollection() {
-        final Comparator<E> comp0 = comp;
-
-        Collection<Holder<E>> col = new GridConcurrentSkipListSet<>(new Comparator<Holder<E>>() {
-            @Override public int compare(Holder<E> h1, Holder<E> h2) {
-                if (h1 == h2)
-                    return 0;
-
-                int diff = comp0 == null ?
-                    ((Comparable<E>)h1.val).compareTo(h2.val) : comp0.compare(h1.val, h2.val);
-
-                if (diff != 0)
-                    return diff;
-                else {
-                    assert h1.order != h2.order;
-
-                    return h1.order < h2.order ? -1 : 1;
-                }
-            }
-        });
-
-        return (Collection)col;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean addInternal(E evt, Collection<Holder<E>> col, @Nullable Set<E> set) {
-        if (comp == null) {
-            if (!(evt instanceof Comparable))
-                throw new IllegalArgumentException("Failed to add object to window (object is not comparable and no " +
-                    "comparator is specified: " + evt);
-        }
-
-        if (set != null) {
-            if (set.add(evt)) {
-                col.add(new Holder<>(evt, orderCnt.getAndIncrement()));
-
-                return true;
-            }
-
-            return false;
-        }
-        else {
-            col.add(new Holder<>(evt, orderCnt.getAndIncrement()));
-
-            return true;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected int addAllInternal(Collection<E> evts, Collection<Holder<E>> col, @Nullable Set<E> set) {
-        int cnt = 0;
-
-        for (E evt : evts) {
-            if (addInternal(evt, col, set))
-                cnt++;
-        }
-
-        return cnt;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected E pollInternal(Collection<Holder<E>> col, Set<E> set) {
-        Holder<E> h = (Holder<E>)((NavigableSet<E>)col).pollLast();
-
-        if (set != null && h != null)
-            set.remove(h.val);
-
-        return h == null ? null : h.val;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected GridStreamerWindowIterator<E> iteratorInternal(final Collection<Holder<E>> col,
-        final Set<E> set, final AtomicInteger size) {
-        final Iterator<Holder<E>> it = col.iterator();
-
-        return new GridStreamerWindowIterator<E>() {
-            private Holder<E> lastRet;
-
-            @Override public boolean hasNext() {
-                return it.hasNext();
-            }
-
-            @Override public E next() {
-                lastRet = it.next();
-
-                return lastRet.val;
-            }
-
-            @Override public E removex() {
-                if (lastRet == null)
-                    throw new IllegalStateException();
-
-                if (col.remove(lastRet)) {
-                    if (set != null)
-                        set.remove(lastRet.val);
-
-                    size.decrementAndGet();
-
-                    return lastRet.val;
-                }
-                else
-                    return null;
-            }
-        };
-    }
-
-    /**
-     * Value wrapper.
-     */
-    @SuppressWarnings("PackageVisibleInnerClass")
-    static class Holder<E> {
-        /** Value. */
-        private E val;
-
-        /** Order to distinguish between objects for which comparator returns 0. */
-        private long order;
-
-        /**
-         * @param val Value to hold.
-         * @param order Adding order.
-         */
-        private Holder(E val, long order) {
-            this.val = val;
-            this.order = order;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return val.hashCode();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            if (this == obj)
-                return false;
-
-            if (!(obj instanceof Holder))
-                return false;
-
-            Holder h = (Holder)obj;
-
-            return F.eq(val, h.val) && order == h.order;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void consistencyCheck(Collection<Holder<E>> col, Set<E> set, AtomicInteger size) {
-        assert col.size() == size.get();
-
-        if (set != null) {
-            // Check no duplicates in collection.
-
-            Collection<Object> vals = new HashSet<>();
-
-            for (Object evt : col)
-                assert vals.add(((Holder)evt).val);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindow.java
deleted file mode 100644
index 16033eb..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindow.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.window;
-
-import org.apache.ignite.internal.processors.streamer.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Queue window bounded by number of elements in queue. After adding elements to this window called
- * must check for evicted events.
- * <p>
- * It is guaranteed that window size will never get less then maximum size when poling from this window
- * concurrently from different threads.
- */
-public class StreamerBoundedSizeWindow<E> extends StreamerBoundedSizeWindowAdapter<E, E> {
-    /** {@inheritDoc} */
-    @Override protected Collection<E> newCollection() {
-        return new ConcurrentLinkedDeque8<>();
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridStreamerWindowIterator<E> iteratorInternal(Collection<E> col, final Set<E> set,
-        final AtomicInteger size) {
-        final ConcurrentLinkedDeque8.IteratorEx<E> it =
-            (ConcurrentLinkedDeque8.IteratorEx<E>)col.iterator();
-
-        return new GridStreamerWindowIterator<E>() {
-            private E lastRet;
-
-            @Override public boolean hasNext() {
-                return it.hasNext();
-            }
-
-            @Override public E next() {
-                lastRet = it.next();
-
-                return lastRet;
-            }
-
-            @Override public E removex() {
-                if (it.removex()) {
-                    if (set != null)
-                        set.remove(lastRet);
-
-                    size.decrementAndGet();
-
-                    return lastRet;
-                }
-                else
-                    return null;
-            }
-        };
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("IfMayBeConditional")
-    @Override protected boolean addInternal(E evt, Collection<E> col, Set<E> set) {
-        assert col instanceof ConcurrentLinkedDeque8;
-
-        // If unique.
-        if (set != null) {
-            if (set.add(evt)) {
-                col.add(evt);
-
-                return true;
-            }
-
-            return false;
-        }
-        else {
-            col.add(evt);
-
-            return true;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected int addAllInternal(Collection<E> evts, Collection<E> col, Set<E> set) {
-        assert col instanceof ConcurrentLinkedDeque8;
-        if (set != null) {
-            int cnt = 0;
-
-            for (E evt : evts) {
-                if (set.add(evt)) {
-                    col.add(evt);
-
-                    cnt++;
-                }
-            }
-
-            return cnt;
-        }
-        else {
-            col.addAll(evts);
-
-            return evts.size();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override protected E pollInternal(Collection<E> col, Set<E> set) {
-        assert col instanceof ConcurrentLinkedDeque8;
-
-        E res = ((Queue<E>)col).poll();
-
-        if (set != null && res != null)
-            set.remove(res);
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void consistencyCheck(Collection<E> col, Set<E> set, AtomicInteger size) {
-        assert col.size() == size.get();
-
-        if (set != null) {
-            // Check no duplicates in collection.
-
-            Collection<Object> vals = new HashSet<>();
-
-            for (Object evt : col)
-                assert vals.add(evt);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindowAdapter.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindowAdapter.java
deleted file mode 100644
index ff54ad0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindowAdapter.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.window;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.streamer.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Abstract non-public class for size-bound windows. Support reset.
- */
-abstract class StreamerBoundedSizeWindowAdapter<E, T> extends StreamerWindowAdapter<E> {
-    /** Reference. */
-    private AtomicReference<WindowHolder> ref = new AtomicReference<>();
-
-    /** If true, only unique elements will be accepted. */
-    private boolean unique;
-
-    /** Window maximum size. */
-    protected int maxSize;
-
-    /**
-     * Gets window maximum size.
-     *
-     * @return Maximum size.
-     */
-    public int getMaximumSize() {
-        return maxSize;
-    }
-
-    /**
-     * Sets window maximum size.
-     *
-     * @param maxSize Maximum size.
-     */
-    public void setMaximumSize(int maxSize) {
-        this.maxSize = maxSize;
-    }
-
-    /**
-     * @return True if only unique elements will be accepted.
-     */
-    public boolean isUnique() {
-        return unique;
-    }
-
-    /**
-     * @param unique If true, only unique elements will be accepted.
-     */
-    public void setUnique(boolean unique) {
-        this.unique = unique;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void checkConfiguration() {
-        if (maxSize < 0)
-            throw new IgniteException("Failed to initialize window (maximumSize cannot be negative) " +
-                "[windowClass=" + getClass().getSimpleName() +
-                ", maxSize=" + maxSize +
-                ", unique=" + unique + ']');
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void stop0() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        int size = ref.get().size().get();
-
-        return size > 0 ? size : 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int evictionQueueSize() {
-        int evictSize = size() - maxSize;
-
-        return evictSize > 0 ? evictSize : 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean enqueue0(E evt) {
-        add(evt);
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Collection<E> pollEvicted0(int cnt) {
-        Collection<E> res = new ArrayList<>(cnt);
-
-        for (int i = 0; i < cnt; i++) {
-            E evicted = pollEvictedInternal();
-
-            if (evicted == null)
-                return res;
-
-            res.add(evicted);
-        }
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Collection<E> pollEvictedBatch0() {
-        E res = pollEvictedInternal();
-
-        if (res == null)
-            return Collections.emptyList();
-
-        return Collections.singleton(res);
-    }
-
-    /**
-     * Poll evicted internal implementation.
-     *
-     * @return Evicted element.
-     */
-    @Nullable private E pollEvictedInternal() {
-        WindowHolder tup = ref.get();
-
-        AtomicInteger size = tup.size();
-
-        while (true) {
-            int curSize = size.get();
-
-            if (curSize > maxSize) {
-                if (size.compareAndSet(curSize, curSize - 1)) {
-                    E evt = pollInternal(tup.collection(), tup.set());
-
-                    if (evt != null)
-                        return evt;
-                    else {
-                        // No actual events in queue, it means that other thread is just adding event.
-                        // return null as it is a concurrent add call.
-                        size.incrementAndGet();
-
-                        return null;
-                    }
-                }
-            }
-            else
-                return null;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Collection<E> dequeue0(int cnt) {
-        WindowHolder tup = ref.get();
-
-        AtomicInteger size = tup.size();
-        Collection<T> evts = tup.collection();
-
-        Collection<E> resCol = new ArrayList<>(cnt);
-
-        while (true) {
-            int curSize = size.get();
-
-            if (curSize > 0) {
-                if (size.compareAndSet(curSize, curSize - 1)) {
-                    E res = pollInternal(evts, tup.set());
-
-                    if (res != null) {
-                        resCol.add(res);
-
-                        if (resCol.size() >= cnt)
-                            return resCol;
-                    }
-                    else {
-                        size.incrementAndGet();
-
-                        return resCol;
-                    }
-                }
-            }
-            else
-                return resCol;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected GridStreamerWindowIterator<E> iterator0() {
-        WindowHolder win = ref.get();
-
-        return iteratorInternal(win.collection(), win.set(), win.size());
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void reset0() {
-        ref.set(new WindowHolder(newCollection(),
-            unique ? new GridConcurrentHashSet<E>() : null,
-            new AtomicInteger()));
-    }
-
-    /**
-     * @param evt Event to add.
-     */
-    private void add(E evt) {
-        WindowHolder tup = ref.get();
-
-        if (addInternal(evt, tup.collection(), tup.set()))
-            tup.size().incrementAndGet();
-    }
-
-    /**
-     * @param evts Events to add.
-     */
-    private void addAll(Collection<E> evts) {
-        WindowHolder tup = ref.get();
-
-        int cnt = addAllInternal(evts, tup.collection(), tup.set());
-
-        tup.size().addAndGet(cnt);
-    }
-
-    /**
-     * Checks window consistency. Used for testing.
-     */
-    void consistencyCheck() {
-        WindowHolder win = ref.get();
-
-        consistencyCheck(win.collection(), win.set(), win.size());
-    }
-
-    /**
-     * Get underlying collection.
-     *
-     * @return Collection.
-     */
-    @SuppressWarnings("ConstantConditions")
-    protected Collection<T> collection() {
-        return ref.get().get1();
-    }
-
-    /**
-     * Creates new collection specific for window implementation. This collection will be subsequently passed
-     * to addInternal(...) and pollInternal() methods.
-     *
-     * @return Collection - holder.
-     */
-    protected abstract Collection<T> newCollection();
-
-    /**
-     * Adds event to queue implementation.
-     *
-     * @param evt Event to add.
-     * @param col Collection to add to.
-     * @param set Set to check.
-     * @return {@code True} if event was added.
-     */
-    protected abstract boolean addInternal(E evt, Collection<T> col, @Nullable Set<E> set);
-
-    /**
-     * Adds all events to queue implementation.
-     *
-     * @param evts Events to add.
-     * @param col Collection to add to.
-     * @param set Set to check.
-     * @return Added events number.
-     */
-    protected abstract int addAllInternal(Collection<E> evts, Collection<T> col, @Nullable Set<E> set);
-
-    /**
-     * @param col Collection to add to.
-     * @param set Set to check.
-     * @return Polled object.
-     */
-    @Nullable protected abstract E pollInternal(Collection<T> col, @Nullable Set<E> set);
-
-    /**
-     * Creates iterator based on implementation collection type.
-     *
-     * @param col Collection.
-     * @param set Set to check.
-     * @param size Size.
-     * @return Iterator.
-     */
-    protected abstract GridStreamerWindowIterator<E> iteratorInternal(Collection<T> col, @Nullable Set<E> set,
-        AtomicInteger size);
-
-    /**
-     * Checks consistency. Used in tests.
-     *
-     * @param col Collection.
-     * @param set Set if unique.
-     * @param size Size holder.
-     */
-    protected abstract void consistencyCheck(Collection<T> col, Set<E> set, AtomicInteger size);
-
-    /**
-     * Window holder.
-     */
-    @SuppressWarnings("ConstantConditions")
-    private class WindowHolder extends GridTuple3<Collection<T>, Set<E>, AtomicInteger> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * Empty constructor required by {@link Externalizable}.
-         */
-        public WindowHolder() {
-            // No-op.
-        }
-
-        /**
-         * @param col Collection.
-         * @param set Set if unique.
-         * @param size Window size counter.
-         */
-        WindowHolder(@Nullable Collection<T> col, @Nullable Set<E> set, @Nullable AtomicInteger size) {
-            super(col, set, size);
-        }
-
-        /**
-         * @return Collection.
-         */
-        public Collection<T> collection() {
-            return get1();
-        }
-
-        /**
-         * @return Set.
-         */
-        public Set<E> set() {
-            return get2();
-        }
-
-        /**
-         * @return Size.
-         */
-        public AtomicInteger size() {
-            return get3();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeBatchWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeBatchWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeBatchWindow.java
deleted file mode 100644
index a2aae67..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeBatchWindow.java
+++ /dev/null
@@ -1,906 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.window;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.streamer.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
-
-/**
- * Window that accumulates events in batches, and is bounded by time and maximum number of batches.
- */
-public class StreamerBoundedTimeBatchWindow<E> extends StreamerWindowAdapter<E> {
-    /** Batch size. */
-    private int batchSize;
-
-    /** Maximum batches. */
-    private int maxBatches;
-
-    /** */
-    private long batchTimeInterval;
-
-    /** Atomic reference for queue and size. */
-    private AtomicReference<WindowHolder> ref = new AtomicReference<>();
-
-    /** Enqueue lock. */
-    private ReadWriteLock enqueueLock = new ReentrantReadWriteLock();
-
-    /**
-     * Gets maximum number of batches can be stored in window.
-     *
-     * @return Maximum number of batches for window.
-     */
-    public int getMaximumBatches() {
-        return maxBatches;
-    }
-
-    /**
-     * Sets maximum number of batches can be stored in window.
-     *
-     * @param maxBatches Maximum number of batches for window.
-     */
-    public void setMaximumBatches(int maxBatches) {
-        this.maxBatches = maxBatches;
-    }
-
-    /**
-     * Gets batch size.
-     *
-     * @return Batch size.
-     */
-    public int getBatchSize() {
-        return batchSize;
-    }
-
-    /**
-     * Sets batch size.
-     *
-     * @param batchSize Batch size.
-     */
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-    }
-
-    /**
-     * Gets batch time interval.
-     *
-     * @return Batch time interval.
-     */
-    public long getBatchTimeInterval() {
-        return batchTimeInterval;
-    }
-
-    /**
-     * Sets batch time interval.
-     *
-     * @param batchTimeInterval Batch time interval.
-     */
-    public void setBatchTimeInterval(long batchTimeInterval) {
-        this.batchTimeInterval = batchTimeInterval;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void checkConfiguration() {
-        if (maxBatches < 0)
-            throw new IgniteException("Failed to initialize window (maximumBatches cannot be negative) " +
-                "[windowClass=" + getClass().getSimpleName() +
-                ", maximumBatches=" + maxBatches +
-                ", batchSize=" + batchSize +
-                ", batchTimeInterval=" + batchTimeInterval + ']');
-
-        if (batchSize < 0)
-            throw new IgniteException("Failed to initialize window (batchSize cannot be negative) " +
-                "[windowClass=" + getClass().getSimpleName() +
-                ", maximumBatches=" + maxBatches +
-                ", batchSize=" + batchSize +
-                ", batchTimeInterval=" + batchTimeInterval + ']');
-        else if (batchSize == 0)
-            batchSize = Integer.MAX_VALUE;
-
-        if (batchTimeInterval <= 0)
-            throw new IgniteException("Failed to initialize window (batchTimeInterval must be positive) " +
-                "[windowClass=" + getClass().getSimpleName() +
-                ", maximumBatches=" + maxBatches +
-                ", batchSize=" + batchSize +
-                ", batchTimeInterval=" + batchTimeInterval + ']');
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void stop0() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void reset0() {
-        ConcurrentLinkedDeque8<Batch> first = new ConcurrentLinkedDeque8<>();
-
-        Batch b = new Batch(batchSize, U.currentTimeMillis() + batchTimeInterval);
-
-        ConcurrentLinkedDeque8.Node<Batch> n = first.offerLastx(b);
-
-        b.node(n);
-
-        ref.set(new WindowHolder(first, new AtomicInteger(1), new AtomicInteger()));
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        return ref.get().totalQueueSize().get();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected GridStreamerWindowIterator<E> iterator0() {
-        final WindowHolder win = ref.get();
-
-        final Iterator<Batch> batchIt = win.batchQueue().iterator();
-
-        return new GridStreamerWindowIterator<E>() {
-            /** Current batch iterator. */
-            private ConcurrentLinkedDeque8.IteratorEx<E> curBatchIt;
-
-            /** Next batch iterator. Will be null if no more batches available. */
-            private ConcurrentLinkedDeque8.IteratorEx<E> nextBatchIt;
-
-            /** Last returned value. */
-            private E lastRet;
-
-            {
-                curBatchIt = batchIt.hasNext() ? batchIt.next().iterator() : null;
-            }
-
-            /** {@inheritDoc} */
-            @SuppressWarnings("SimplifiableIfStatement")
-            @Override public boolean hasNext() {
-                if (curBatchIt != null) {
-                    if (curBatchIt.hasNext())
-                        return true;
-
-                    return nextBatchIt != null && nextBatchIt.hasNext();
-                }
-                else
-                    return false;
-            }
-
-            /** {@inheritDoc} */
-            @Override public E next() {
-                if (curBatchIt == null)
-                    throw new NoSuchElementException();
-
-                if (!curBatchIt.hasNext()) {
-                    if (nextBatchIt != null) {
-                        curBatchIt = nextBatchIt;
-
-                        nextBatchIt = null;
-
-                        lastRet = curBatchIt.next();
-                    }
-                    else
-                        throw new NoSuchElementException();
-                }
-                else {
-                    E next = curBatchIt.next();
-
-                    // Moved to last element in batch - check for next iterator.
-                    if (!curBatchIt.hasNext())
-                        advanceBatch();
-
-                    lastRet = next;
-                }
-
-                return lastRet;
-            }
-
-            /** {@inheritDoc} */
-            @Override public E removex() {
-                if (curBatchIt == null)
-                    throw new NoSuchElementException();
-
-                if (curBatchIt.removex()) {
-                    // Decrement global size if deleted.
-                    win.totalQueueSize().decrementAndGet();
-
-                    return lastRet;
-                }
-                else
-                    return null;
-            }
-
-            /**
-             * Moves to the next batch.
-             */
-            private void advanceBatch() {
-                if (batchIt.hasNext()) {
-                    Batch batch = batchIt.next();
-
-                    nextBatchIt = batch.iterator();
-                }
-                else
-                    nextBatchIt = null;
-            }
-        };
-    }
-
-    /** {@inheritDoc} */
-    @Override public int evictionQueueSize() {
-        WindowHolder win = ref.get();
-
-        int oversizeCnt = maxBatches > 0 ? Math.max(0, win.batchQueueSize().get() - maxBatches) : 0;
-
-        long now = U.currentTimeMillis();
-
-        Iterator<Batch> it = win.batchQueue().iterator();
-
-        int size = 0;
-
-        int idx = 0;
-
-        while (it.hasNext()) {
-            Batch batch = it.next();
-
-            if (idx++ < oversizeCnt || batch.batchEndTs < now)
-                size += batch.size();
-        }
-
-        return size;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean enqueue0(E evt) {
-        try {
-            return enqueue0(evt, U.currentTimeMillis());
-        }
-        catch (IgniteInterruptedCheckedException ignored) {
-            return false;
-        }
-    }
-
-    /**
-     * Enqueue event to window.
-     *
-     * @param evt Event to add.
-     * @param ts Event timestamp.
-     * @return {@code True} if event was added.
-     *
-     * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread was interrupted.
-     */
-    private boolean enqueue0(E evt, long ts) throws IgniteInterruptedCheckedException {
-        WindowHolder tup = ref.get();
-
-        ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
-        AtomicInteger size = tup.batchQueueSize();
-
-        while (true) {
-            Batch last = evts.peekLast();
-
-            if (last == null || !last.add(evt, ts)) {
-                // This call will ensure that last object is actually added to batch
-                // before we add new batch to events queue.
-                // If exception is thrown here, window will be left in consistent state.
-                if (last != null)
-                    last.finish();
-
-                // Add new batch to queue in write lock.
-                if (enqueueLock.writeLock().tryLock()) {
-                    try {
-                        Batch first0 = evts.peekLast();
-
-                        if (first0 == last) {
-                            Batch batch = new Batch(batchSize, ts + batchTimeInterval);
-
-                            ConcurrentLinkedDeque8.Node<Batch> node = evts.offerLastx(batch);
-
-                            batch.node(node);
-
-                            size.incrementAndGet();
-
-                            // If batch was removed in other thread.
-                            if (batch.removed() && evts.unlinkx(node))
-                                size.decrementAndGet();
-                        }
-                    }
-                    finally {
-                        enqueueLock.writeLock().unlock();
-                    }
-                }
-                else {
-                    // Acquire read lock to wait for batch enqueue.
-                    enqueueLock.readLock().lock();
-
-                    try {
-                        evts.peekLast();
-                    }
-                    finally {
-                        enqueueLock.readLock().unlock();
-                    }
-                }
-            }
-            else {
-                // Event was added, global size increment.
-                tup.totalQueueSize().incrementAndGet();
-
-                return true;
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Collection<E> pollEvicted0(int cnt) {
-        WindowHolder tup = ref.get();
-
-        ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
-        AtomicInteger size = tup.batchQueueSize();
-
-        Collection<E> res = new ArrayList<>(cnt);
-
-        while (true) {
-            int curSize = size.get();
-
-            // Just peek the first batch.
-            Batch first = evts.peekFirst();
-
-            if (first != null && ((maxBatches > 0 && curSize > maxBatches) || first.checkExpired())) {
-                assert first.finished();
-
-                Collection<E> polled = first.pollNonBatch(cnt - res.size());
-
-                if (!polled.isEmpty())
-                    res.addAll(polled);
-
-                if (first.isEmpty()) {
-                    ConcurrentLinkedDeque8.Node<Batch> node = first.node();
-
-                    first.markRemoved();
-
-                    if (node != null && evts.unlinkx(node))
-                        size.decrementAndGet();
-                }
-
-                if (res.size() == cnt)
-                    break;
-            }
-            else
-                break;
-        }
-
-        // Removed entries, update global size.
-        tup.totalQueueSize().addAndGet(-res.size());
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Collection<E> pollEvictedBatch0() {
-        WindowHolder tup = ref.get();
-
-        ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
-        AtomicInteger size = tup.batchQueueSize();
-
-        while (true) {
-            int curSize = size.get();
-
-            if (maxBatches > 0 && curSize > maxBatches) {
-                if (size.compareAndSet(curSize, curSize - 1)) {
-                    Batch polled = evts.poll();
-
-                    if (polled != null) {
-                        assert polled.finished();
-
-                        // Mark batch removed for consistency.
-                        polled.markRemoved();
-
-                        Collection<E> polled0 = polled.shrink();
-
-                        // Result of shrink is empty, must retry the poll.
-                        if (!polled0.isEmpty()) {
-                            // Update global size.
-                            tup.totalQueueSize().addAndGet(-polled0.size());
-
-                            return polled0;
-                        }
-                    }
-                    else {
-                        // Polled was zero, so we must restore counter and return.
-                        size.incrementAndGet();
-
-                        return Collections.emptyList();
-                    }
-                }
-            }
-            else {
-                while (true) {
-                    Batch batch = evts.peekFirst();
-
-                    // This call will finish batch and return true if batch is expired.
-                    if (batch != null && batch.checkExpired()) {
-                        assert batch.finished();
-
-                        ConcurrentLinkedDeque8.Node<Batch> node = batch.node();
-
-                        batch.markRemoved();
-
-                        if (node != null && evts.unlinkx(node))
-                            size.decrementAndGet();
-
-                        Collection<E> col = batch.shrink();
-
-                        tup.totalQueueSize().addAndGet(-col.size());
-
-                        if (!col.isEmpty())
-                            return col;
-                    }
-                    else
-                        return Collections.emptyList();
-                }
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Collection<E> dequeue0(int cnt) {
-        WindowHolder tup = ref.get();
-
-        ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
-        AtomicInteger size = tup.batchQueueSize();
-
-        Collection<E> res = new ArrayList<>(cnt);
-
-        while (true) {
-            // Just peek the first batch.
-            Batch first = evts.peekFirst();
-
-            if (first != null) {
-                Collection<E> polled = first.pollNonBatch(cnt - res.size());
-
-                // We must check for finished before unlink as no elements
-                // can be added to batch after it is finished.
-                if (first.isEmpty() && first.emptyFinished()) {
-                    ConcurrentLinkedDeque8.Node<Batch> node = first.node();
-
-                    first.markRemoved();
-
-                    if (node != null && evts.unlinkx(node))
-                        size.decrementAndGet();
-
-                    assert first.isEmpty();
-                }
-                else if (polled.isEmpty())
-                    break;
-
-                res.addAll(polled);
-
-                if (res.size() == cnt)
-                    break;
-            }
-            else
-                break;
-        }
-
-        // Update global size.
-        tup.totalQueueSize().addAndGet(-res.size());
-
-        return res;
-    }
-
-    /**
-     * Consistency check, used for testing.
-     */
-    void consistencyCheck() {
-        WindowHolder win = ref.get();
-
-        Iterator<E> it = iterator();
-
-        int cnt = 0;
-
-        while (it.hasNext()) {
-            it.next();
-
-            cnt++;
-        }
-
-        int cnt0 = 0;
-
-        for (Batch batch : win.batchQueue())
-            cnt0 += batch.size();
-
-        int sz = size();
-
-        assert cnt0 == sz : "Batch size comparison failed [batchCnt=" + cnt0 + ", size=" + sz + ']';
-        assert cnt == sz : "Queue size comparison failed [iterCnt=" + cnt + ", size=" + sz + ']';
-        assert win.batchQueue().size() == win.batchQueueSize().get();
-    }
-
-    /**
-     * Window structure.
-     */
-    @SuppressWarnings("ConstantConditions")
-    private class WindowHolder extends GridTuple3<ConcurrentLinkedDeque8<Batch>, AtomicInteger, AtomicInteger> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * Empty constructor required by {@link Externalizable}.
-         */
-        public WindowHolder() {
-            // No-op.
-        }
-
-        /**
-         * @param batchQueue Batch queue.
-         * @param batchQueueSize Batch queue size counter.
-         * @param globalSize Global size counter.
-         */
-        private WindowHolder(ConcurrentLinkedDeque8<Batch> batchQueue,
-            AtomicInteger batchQueueSize, @Nullable AtomicInteger globalSize) {
-            super(batchQueue, batchQueueSize, globalSize);
-
-            assert batchQueue.size() == 1;
-            assert batchQueueSize.get() == 1;
-        }
-
-        /**
-         * @return Events queue.
-         */
-        public ConcurrentLinkedDeque8<Batch> batchQueue() {
-            return get1();
-        }
-
-        /**
-         * @return Batch queue size.
-         */
-        public AtomicInteger batchQueueSize() {
-            return get2();
-        }
-
-        /**
-         * @return Global queue size.
-         */
-        public AtomicInteger totalQueueSize() {
-            return get3();
-        }
-    }
-
-    /**
-     * Batch.
-     */
-    private class Batch extends ReentrantReadWriteLock implements Iterable<E> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Batch events. */
-        private ConcurrentLinkedDeque8<E> evts;
-
-        /** Capacity. */
-        private AtomicInteger cap;
-
-        /** Batch end timestamp. */
-        private final long batchEndTs;
-
-        /** Finished flag. */
-        private boolean finished;
-
-        /** Queue node. */
-        @GridToStringExclude
-        private ConcurrentLinkedDeque8.Node<Batch> qNode;
-
-        /** Removed flag. */
-        private volatile boolean rmvd;
-
-        /**
-         * @param batchSize Batch size.
-         * @param batchEndTs Batch end timestamp.
-         */
-        private Batch(int batchSize, long batchEndTs) {
-            cap = new AtomicInteger(batchSize);
-            this.batchEndTs = batchEndTs;
-
-            evts = new ConcurrentLinkedDeque8<>();
-        }
-
-        /**
-         * @return {@code True} if removed.
-         */
-        public boolean removed() {
-            return rmvd;
-        }
-
-        /**
-         * Marks batch as removed.
-         */
-        public void markRemoved() {
-            rmvd = true;
-        }
-
-        /**
-         * Adds event to batch.
-         *
-         * @param evt Event to add.
-         * @param ts Event timestamp.
-         * @return {@code True} if event was added, {@code false} if batch is full.
-         */
-        public boolean add(E evt, long ts) {
-            if (ts <= batchEndTs) {
-                readLock().lock();
-
-                try {
-                    if (finished)
-                        // Finished was set inside write lock.
-                        return false;
-
-                    while (true) {
-                        int size = cap.get();
-
-                        if (size > 0) {
-                            if (cap.compareAndSet(size, size - 1)) {
-                                evts.add(evt);
-
-                                // Will go through write lock and finish batch.
-                                if (size == 1)
-                                    finished = true;
-
-                                return true;
-                            }
-                        }
-                        else
-                            return false;
-                    }
-                }
-                finally {
-                    readLock().unlock();
-                }
-            }
-            else {
-                writeLock().lock();
-
-                try {
-                    // No events could be added to this batch.
-                    finished = true;
-
-                    return false;
-                }
-                finally {
-                    writeLock().unlock();
-                }
-            }
-        }
-
-        /**
-         * @return Queue node.
-         */
-        public ConcurrentLinkedDeque8.Node<Batch> node() {
-            return qNode;
-        }
-
-        /**
-         * @param qNode Queue node.
-         */
-        public void node(ConcurrentLinkedDeque8.Node<Batch> qNode) {
-            this.qNode = qNode;
-        }
-
-        /**
-         * Waits for latch count down after last event was added.
-         *
-         * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If wait was interrupted.
-         */
-        public void finish() throws IgniteInterruptedCheckedException {
-            writeLock().lock();
-
-            try {
-                // Safety.
-                assert cap.get() == 0 || finished;
-            }
-            finally {
-                writeLock().unlock();
-            }
-        }
-
-        /**
-         * @return {@code True} if batch is finished and no more events will be added to it.
-         */
-        public boolean finished() {
-            readLock().lock();
-
-            try {
-                return finished;
-            }
-            finally {
-                readLock().unlock();
-            }
-        }
-
-        /**
-         * Gets batch size.
-         *
-         * @return Batch size.
-         */
-        public int size() {
-            readLock().lock();
-
-            try {
-                return evts == null ? 0 : evts.sizex();
-            }
-            finally {
-                readLock().unlock();
-            }
-        }
-
-        /**
-         * @return {@code True} if batch is empty.
-         */
-        public boolean isEmpty() {
-            readLock().lock();
-
-            try {
-                return evts == null || evts.isEmpty();
-            }
-            finally {
-                readLock().unlock();
-            }
-        }
-
-        /**
-         * Checks if batch is empty and finished inside write lock. This will ensure that no more entries will
-         * be added to batch and it can be safely unlinked from the queue.
-         *
-         * @return {@code True} if batch is empty and finished.
-         */
-        public boolean emptyFinished() {
-            writeLock().lock();
-
-            try {
-                return finished && (evts == null || evts.isEmpty());
-            }
-            finally {
-                writeLock().unlock();
-            }
-        }
-
-        /**
-         * Checks if the batch has expired.
-         *
-         * @return {@code True} if the batch has expired, {@code false} otherwise.
-         */
-        public boolean checkExpired() {
-            if (U.currentTimeMillis() > batchEndTs) {
-                writeLock().lock();
-
-                try {
-                    finished = true;
-
-                    return true;
-                }
-                finally {
-                    writeLock().unlock();
-                }
-            }
-
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public ConcurrentLinkedDeque8.IteratorEx<E> iterator() {
-            readLock().lock();
-
-            try {
-                if (evts != null)
-                    return (ConcurrentLinkedDeque8.IteratorEx<E>)evts.iterator();
-
-                return new ConcurrentLinkedDeque8.IteratorEx<E>() {
-                    @Override public boolean removex() {
-                        throw new NoSuchElementException();
-                    }
-
-                    @Override public boolean hasNext() {
-                        return false;
-                    }
-
-                    @Override public E next() {
-                        throw new NoSuchElementException();
-                    }
-
-                    @Override public void remove() {
-                        throw new NoSuchElementException();
-                    }
-                };
-            }
-            finally {
-                readLock().unlock();
-            }
-        }
-
-        /**
-         * Polls up to {@code cnt} objects from batch in concurrent fashion.
-         *
-         * @param cnt Number of objects to poll.
-         * @return Collection of polled elements (empty collection in case no events were
-         *         present).
-         */
-        public Collection<E> pollNonBatch(int cnt) {
-            readLock().lock();
-
-            try {
-                if (evts == null)
-                    return Collections.emptyList();
-
-                Collection<E> res = new ArrayList<>(cnt);
-
-                for (int i = 0; i < cnt; i++) {
-                    E evt = evts.poll();
-
-                    if (evt != null)
-                        res.add(evt);
-                    else
-                        return res;
-                }
-
-                return res;
-            }
-            finally {
-                readLock().unlock();
-            }
-        }
-
-        /**
-         * Shrinks this batch. No events can be polled from it after this method.
-         *
-         * @return Collection of events contained in batch before shrink (empty collection in
-         *         case no events were present).
-         */
-        public Collection<E> shrink() {
-            writeLock().lock();
-
-            try {
-                if (evts == null)
-                    return Collections.emptyList();
-
-                // Since iterator can concurrently delete elements, we must poll here.
-                Collection<E> res = new ArrayList<>(evts.sizex());
-
-                E o;
-
-                while ((o = evts.poll()) != null)
-                    res.add(o);
-
-                // Nothing cal be polled after shrink.
-                evts = null;
-
-                return res;
-            }
-            finally {
-                writeLock().unlock();
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            ConcurrentLinkedDeque8<E> evts0 = evts;
-
-            return S.toString(Batch.class, this, "evtQueueSize", evts0 == null ? 0 : evts0.sizex());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeWindow.java
deleted file mode 100644
index 46c582c..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeWindow.java
+++ /dev/null
@@ -1,462 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.window;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.streamer.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Window which is bounded by size and time interval.
- */
-public class StreamerBoundedTimeWindow<E> extends StreamerWindowAdapter<E> {
-    /** Window structures holder. */
-    private AtomicReference<WindowHolder> ref = new AtomicReference<>();
-
-    /** Time interval. */
-    private long timeInterval;
-
-    /** Window maximum size. */
-    private int maxSize;
-
-    /** Unique flag. */
-    private boolean unique;
-
-    /** Event order counter. */
-    private AtomicLong orderCnt = new AtomicLong();
-
-    /**
-     * Gets window maximum size.
-     *
-     * @return Maximum size.
-     */
-    public int getMaximumSize() {
-        return maxSize;
-    }
-
-    /**
-     * Sets window maximum size.
-     *
-     * @param maxSize Max size.
-     */
-    public void setMaximumSize(int maxSize) {
-        this.maxSize = maxSize;
-    }
-
-    /**
-     * Gets window time interval.
-     *
-     * @return Time interval.
-     */
-    public long getTimeInterval() {
-        return timeInterval;
-    }
-
-    /**
-     * Sets window time interval.
-     *
-     * @param timeInterval Time interval.
-     */
-    public void setTimeInterval(long timeInterval) {
-        this.timeInterval = timeInterval;
-    }
-
-    /**
-     * Gets window unique flag.
-     *
-     * @return {@code True} if only unique events should be added to window.
-     */
-    public boolean isUnique() {
-        return unique;
-    }
-
-    /**
-     * Sets window unique flag.
-     *
-     * @param unique {@code True} if only unique events should be added to window.
-     */
-    public void setUnique(boolean unique) {
-        this.unique = unique;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void checkConfiguration() {
-        if (timeInterval <= 0)
-            throw new IgniteException("Failed to initialize window (timeInterval must be positive): [windowClass=" +
-                getClass().getSimpleName() + ", maxSize=" + maxSize + ", timeInterval=" + timeInterval + ", unique=" +
-                unique + ']');
-
-        if (maxSize < 0)
-            throw new IgniteException("Failed to initialize window (maximumSize cannot be negative): [windowClass=" +
-                getClass().getSimpleName() + ", maxSize=" + maxSize + ", timeInterval=" + timeInterval + ", unique=" +
-                unique + ']');
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void stop0() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("RedundantCast")
-    @Override protected void reset0() {
-        ref.set(new WindowHolder(newQueue(), unique ? (Set<Object>)new GridConcurrentHashSet<>() : null,
-            new AtomicInteger()));
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        return ref.get().size().get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int evictionQueueSize() {
-        // Get estimate for eviction queue size.
-        WindowHolder tup = ref.get();
-
-        GridConcurrentSkipListSet<Holder<E>> evtsQueue = tup.collection();
-
-        boolean sizeCheck = maxSize != 0;
-
-        int overflow = tup.size().get() - maxSize;
-
-        long timeBound = U.currentTimeMillis() - timeInterval;
-
-        int idx = 0;
-        int cnt = 0;
-
-        for (Holder holder : evtsQueue) {
-            if ((idx < overflow && sizeCheck) || holder.ts < timeBound)
-                cnt++;
-            else if ((idx >= overflow && sizeCheck) && holder.ts >= timeBound)
-                break;
-            else if (!sizeCheck && holder.ts >= timeBound)
-                break;
-
-            idx++;
-        }
-
-        return cnt;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean enqueue0(E evt) {
-        add(evt, U.currentTimeMillis());
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Collection<E> pollEvicted0(int cnt) {
-        Collection<E> res = new ArrayList<>(cnt);
-
-        for (int i = 0; i < cnt; i++) {
-            E evicted = pollEvictedInternal();
-
-            if (evicted == null)
-                return res;
-
-            res.add(evicted);
-        }
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Collection<E> pollEvictedBatch0() {
-        E res = pollEvictedInternal();
-
-        if (res == null)
-            return Collections.emptyList();
-
-        return Collections.singleton(res);
-    }
-
-    /** {@inheritDoc} */
-    @Nullable private <T> T pollEvictedInternal() {
-        WindowHolder tup = ref.get();
-
-        AtomicInteger size = tup.size();
-
-        GridConcurrentSkipListSet<Holder<E>> evtsQueue = tup.collection();
-
-        long now = U.currentTimeMillis();
-
-        while (true) {
-            int curSize = size.get();
-
-            if (maxSize > 0 && curSize > maxSize) {
-                if (size.compareAndSet(curSize, curSize - 1)) {
-                    Holder hldr = evtsQueue.pollFirst();
-
-                    if (hldr != null) {
-                        if (unique)
-                            tup.set().remove(hldr.val);
-
-                        return (T)hldr.val;
-                    }
-                    else {
-                        // No actual events in queue, it means that other thread is just adding event.
-                        // return null as it is a concurrent add call.
-                        size.incrementAndGet();
-
-                        return null;
-                    }
-                }
-            }
-            else {
-                // Check if first entry qualifies for eviction.
-                Holder first = evtsQueue.firstx();
-
-                if (first != null && first.ts < now - timeInterval) {
-                    if (evtsQueue.remove(first)) {
-                        if (unique)
-                            tup.set().remove(first.val);
-
-                        size.decrementAndGet();
-
-                        return (T)first.val;
-                    }
-                }
-                else
-                    return null;
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Collection<E> dequeue0(int cnt) {
-        WindowHolder tup = ref.get();
-
-        AtomicInteger size = tup.size();
-        GridConcurrentSkipListSet<Holder<E>> evtsQueue = tup.collection();
-
-        Collection<E> resCol = new ArrayList<>(cnt);
-
-        while (true) {
-            int curSize = size.get();
-
-            if (curSize > 0) {
-                if (size.compareAndSet(curSize, curSize - 1)) {
-                    Holder<E> h = evtsQueue.pollLast();
-
-                    if (h != null) {
-                        resCol.add(h.val);
-
-                        if (unique)
-                            tup.set().remove(h.val);
-
-                        if (resCol.size() >= cnt)
-                            return resCol;
-                    }
-                    else {
-                        size.incrementAndGet();
-
-                        return resCol;
-                    }
-                }
-            }
-            else
-                return resCol;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected GridStreamerWindowIterator<E> iterator0() {
-        final WindowHolder win = ref.get();
-
-        final GridConcurrentSkipListSet<Holder<E>> col = win.collection();
-        final Set<Object> set = win.set();
-
-        final Iterator<Holder<E>> it = col.iterator();
-
-        return new GridStreamerWindowIterator<E>() {
-            private Holder<E> lastRet;
-
-            @Override public boolean hasNext() {
-                return it.hasNext();
-            }
-
-            @Override public E next() {
-                lastRet = it.next();
-
-                return lastRet.val;
-            }
-
-            @Override public E removex() {
-                if (lastRet == null)
-                    throw new IllegalStateException();
-
-                if (col.remove(lastRet)) {
-                    if (set != null)
-                        set.remove(lastRet.val);
-
-                    win.size().decrementAndGet();
-
-                    return lastRet.val;
-                }
-                else
-                    return null;
-            }
-        };
-    }
-
-    /**
-     * Checks queue consistency. Used in tests.
-     */
-    void consistencyCheck() {
-        WindowHolder win = ref.get();
-
-        assert win.collection().size() == win.size().get();
-
-        if (win.set() != null) {
-            // Check no duplicates in collection.
-
-            Collection<Object> vals = new HashSet<>();
-
-            for (Object evt : win.collection())
-                assert vals.add(((Holder)evt).val);
-        }
-    }
-
-    /**
-     * @return New queue.
-     */
-    private GridConcurrentSkipListSet<Holder<E>> newQueue() {
-        return new GridConcurrentSkipListSet<>(new Comparator<Holder>() {
-            @Override public int compare(Holder h1, Holder h2) {
-                if (h1 == h2)
-                    return 0;
-
-                if (h1.ts != h2.ts)
-                    return h1.ts < h2.ts ? -1 : 1;
-
-                return h1.order < h2.order ? -1 : 1;
-            }
-        });
-    }
-
-    /**
-     * @param evt Event to add.
-     * @param ts Event timestamp.
-     */
-    private void add(E evt, long ts) {
-        WindowHolder tup = ref.get();
-
-        if (!unique) {
-            tup.collection().add(new Holder<>(evt, ts, orderCnt.incrementAndGet()));
-
-            tup.size().incrementAndGet();
-        }
-        else {
-            if (tup.set().add(evt)) {
-                tup.collection().add(new Holder<>(evt, ts, orderCnt.incrementAndGet()));
-
-                tup.size().incrementAndGet();
-            }
-        }
-    }
-
-    /**
-     * @param evts Events to add.
-     * @param ts Timestamp for added events.
-     */
-    private void addAll(Iterable<E> evts, long ts) {
-        for (E evt : evts)
-            add(evt, ts);
-    }
-
-    /**
-     * Holder.
-     */
-    private static class Holder<E> {
-        /** Value. */
-        private E val;
-
-        /** Event timestamp. */
-        private long ts;
-
-        /** Event order. */
-        private long order;
-
-        /**
-         * @param val Event.
-         * @param ts Timestamp.
-         * @param order Order.
-         */
-        private Holder(E val, long ts, long order) {
-            this.val = val;
-            this.ts = ts;
-            this.order = order;
-        }
-    }
-
-    /**
-     * Window holder.
-     */
-    @SuppressWarnings("ConstantConditions")
-    private class WindowHolder extends GridTuple3<GridConcurrentSkipListSet<Holder<E>>, Set<Object>, AtomicInteger> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * Empty constructor required by {@link Externalizable}.
-         */
-        public WindowHolder() {
-            // No-op.
-        }
-
-        /**
-         * @param col Collection.
-         * @param set Set if unique.
-         * @param size Size.
-         */
-        private WindowHolder(@Nullable GridConcurrentSkipListSet<Holder<E>> col,
-            @Nullable Set<Object> set, @Nullable AtomicInteger size) {
-            super(col, set, size);
-        }
-
-        /**
-         * @return Holders collection.
-         */
-        public GridConcurrentSkipListSet<Holder<E>> collection() {
-            return get1();
-        }
-
-        /**
-         * @return Uniqueness set.
-         */
-        public Set<Object> set() {
-            return get2();
-        }
-
-        /**
-         * @return Size counter.
-         */
-        public AtomicInteger size() {
-            return get3();
-        }
-    }
-}


Mime
View raw message