ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [27/32] incubator-ignite git commit: # Renaming
Date Fri, 05 Dec 2014 10:03:28 GMT
# Renaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4794dd4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4794dd4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4794dd4d

Branch: refs/heads/master
Commit: 4794dd4d2340f8c12585bc699a939e0f0a8490c4
Parents: 3299c52
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Dec 5 12:53:17 2014 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Dec 5 12:53:17 2014 +0300

----------------------------------------------------------------------
 .../index/GridStreamerIndexSelfTest.java        | 683 ++++++++++++++
 .../window/GridStreamerWindowSelfTest.java      | 905 +++++++++++++++++++
 .../index/GridStreamerIndexSelfTest.java        | 682 --------------
 .../window/GridStreamerWindowSelfTest.java      | 905 -------------------
 .../testsuites/GridStreamerSelfTestSuite.java   |   4 +-
 5 files changed, 1590 insertions(+), 1589 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4794dd4d/modules/core/src/test/java/org/apache/ignite/streamer/index/GridStreamerIndexSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/streamer/index/GridStreamerIndexSelfTest.java b/modules/core/src/test/java/org/apache/ignite/streamer/index/GridStreamerIndexSelfTest.java
new file mode 100644
index 0000000..19e9cdd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/streamer/index/GridStreamerIndexSelfTest.java
@@ -0,0 +1,683 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+*  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+*  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+*  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+*  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+*/
+
+package org.apache.ignite.streamer.index;
+
+import org.apache.ignite.lang.*;
+import org.apache.ignite.streamer.window.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.streamer.index.*;
+import org.gridgain.grid.streamer.index.hash.*;
+import org.gridgain.grid.streamer.index.tree.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.gridgain.grid.streamer.index.StreamerIndexPolicy.*;
+import static org.gridgain.testframework.GridTestUtils.*;
+
+/**
+ * Tests for Streamer window index.
+ */
+public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTreeIndex() throws Exception {
+        for (StreamerIndexPolicy plc : StreamerIndexPolicy.values()) {
+            checkUniqueIndex(indexProvider(true, "idx", new UniqueStringIndexUpdater(), plc, true));
+
+            checkNonUniqueIndex(indexProvider(true, "idx", new IndexUpdater(), plc, false));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testHashIndex() throws Exception {
+        for (StreamerIndexPolicy plc : StreamerIndexPolicy.values()) {
+            checkUniqueIndex(indexProvider(false, "idx", new UniqueStringIndexUpdater(), plc, true));
+
+            checkNonUniqueIndex(indexProvider(false, "idx", new IndexUpdater(), plc, false));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultipleIndexUpdate() throws Exception {
+        StreamerIndexProvider<String, String, Integer> idxProvider =
+            indexProvider(true, "idx", new IndexUpdater(), EVENT_TRACKING_ON, false);
+
+        StreamerIndexProvider<String, String, String> idxProvider1 =
+            indexProvider(true, "idx1", new UniqueStringIndexUpdater(), EVENT_TRACKING_ON, true);
+
+        StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
+
+        win.setMaximumSize(5);
+        win.setIndexes(idxProvider, idxProvider1);
+
+        win.start();
+
+        win.enqueue("A");
+        win.enqueue("B");
+        win.enqueue("C");
+        win.enqueue("D");
+
+        // Snapshot both indexes.
+        StreamerIndex<String, String, Integer> idx = win.index("idx");
+        StreamerIndex<String, String, String> idx1 = win.index("idx1");
+
+        info("Idx: " + idx.entries(0));
+        info("Idx1: " + idx1.entries(0));
+
+        try {
+            win.enqueue("A");
+
+            fail("Exception should have been thrown.");
+        }
+        catch (GridException e) {
+            info("Caught expected exception: " + e);
+        }
+
+        StreamerIndex<String, String, Integer> idxAfter = win.index("idx");
+        StreamerIndex<String, String, String> idx1After = win.index("idx1");
+
+        info("Idx (after): " + idxAfter.entries(0));
+        info("Idx1 (after): " + idx1After.entries(0));
+
+        assertEquals(4, idx.entries(0).size());
+        assertEquals(4, idx1.entries(0).size());
+
+        assertTrue(F.eqOrdered(idx.entries(0), idxAfter.entries(0)));
+        assertTrue(F.eqOrdered(idx1.entries(0), idx1After.entries(0)));
+
+        idxProvider.reset();
+
+        assertEquals(4, idx.entries(0).size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSortedIndexMultithreaded() throws Exception {
+        checkSortedIndexMultithreaded(32, 500, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSortedIndexMultithreadedWithConcurrentPollEvicted() throws Exception {
+        checkSortedIndexMultithreaded(32, 500, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUniqueHashIndexMultithreaded() throws Exception {
+        checkUniqueHashIndexMultithreaded(32, 500);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdaterIndexKeyNull() throws Exception {
+        checkIndexUpdater(new IndexUpdater() {
+            @Nullable @Override public String indexKey(String evt) {
+                return "A".equals(evt) ? null : evt;
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdaterInitialValueNull() throws Exception {
+        checkIndexUpdater(new IndexUpdater() {
+            @Nullable @Override public Integer initialValue(String evt, String key) {
+                return "A".equals(evt) ? null : 1;
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdaterOnAddedNull() throws Exception {
+        checkIndexUpdater(new IndexUpdater() {
+            @Nullable @Override
+            public Integer onAdded(StreamerIndexEntry<String, String, Integer> entry, String evt) {
+                return "A".equals(evt) ? null : entry.value() + 1;
+            }
+        });
+    }
+
+    /**
+     * Checks the correct behaviour of {@link StreamerIndexUpdater}, given that
+     * it discards event "A" and accepts event "B".
+     *
+     * @param updater Index updater.
+     * @throws GridException If failed.
+     */
+    private void checkIndexUpdater(StreamerIndexUpdater<String, String, Integer> updater) throws GridException {
+        List<StreamerIndexProvider<String, String, Integer>> idxps = Arrays.asList(
+            indexProvider(true, "tree", updater, StreamerIndexPolicy.EVENT_TRACKING_ON, false),
+            indexProvider(false, "hash", updater, StreamerIndexPolicy.EVENT_TRACKING_ON, false));
+
+        for (StreamerIndexProvider<String, String, Integer> idxp : idxps) {
+            StreamerUnboundedWindow<String> win = new StreamerUnboundedWindow<>();
+
+            win.setIndexes(idxp);
+
+            win.start();
+
+            win.enqueue("A");
+            win.enqueue("A");
+            win.enqueue("B");
+
+            StreamerIndex<String, Object, Object> idx = win.index(idxp.getName());
+
+            assertNotNull(idx);
+
+            assertNull(idx.entry("A"));
+
+            assertNotNull(idx.entry("B"));
+        }
+    }
+
+    /**
+     * @param treeIdx {@code True} to create tree index.
+     * @param name Name.
+     * @param updater Updater.
+     * @param plc Policy.
+     * @param unique Unique.
+     * @return Index provider.
+     */
+    private <E, K, V> StreamerIndexProvider<E, K, V> indexProvider(boolean treeIdx, String name,
+        StreamerIndexUpdater<E, K, V> updater, StreamerIndexPolicy plc, boolean unique) {
+        if (treeIdx) {
+            StreamerTreeIndexProvider<E, K, V> idx = new StreamerTreeIndexProvider<>();
+
+            idx.setName(name);
+            idx.setUpdater(updater);
+            idx.setUnique(unique);
+            idx.setPolicy(plc);
+
+            return idx;
+        }
+        else {
+            StreamerHashIndexProvider<E, K, V> idx = new StreamerHashIndexProvider<>();
+
+            idx.setName(name);
+            idx.setUpdater(updater);
+            idx.setUnique(unique);
+            idx.setPolicy(plc);
+
+            return idx;
+        }
+    }
+
+    /**
+     * @param threadCnt Thread count.
+     * @param iters Number of iterations for each worker thread.
+     * @throws Exception If failed.
+     */
+    private void checkUniqueHashIndexMultithreaded(int threadCnt, final int iters)
+        throws Exception {
+        StreamerIndexProvider<String, String, Integer> idxProvider =
+            indexProvider(false, "idx", new IndexUpdater(), EVENT_TRACKING_ON_DEDUP, true);
+
+        for (int i = 0; i < iters && !Thread.currentThread().isInterrupted(); i++) {
+            final StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
+
+            win.setMaximumSize(threadCnt * 2);
+            win.setIndexes(idxProvider);
+
+            win.start();
+
+            final String evt = "evt" + i;
+            final AtomicInteger nIdxErrors = new AtomicInteger();
+
+            // Submit the same event in multiple threads.
+            runMultiThreaded(new CAX() {
+                @Override public void applyx() throws GridException {
+                    try {
+                        win.enqueue(evt);
+                    }
+                    catch (GridException e) {
+                        if (e.getMessage().contains("Index unique key violation"))
+                            nIdxErrors.incrementAndGet();
+                        else
+                            throw e;
+                    }
+                }
+            }, threadCnt, "put");
+
+            // Only one thread should succeed, because the index is unique.
+            assertEquals(threadCnt - 1, nIdxErrors.get());
+
+            StreamerIndex<String, String, Integer> idx = win.index("idx");
+
+            // Only one event should be present and have value 1.
+            assertEquals(1, idx.entries(0).size());
+            assertEquals((Integer)1, idx.entry(evt).value());
+        }
+    }
+
+    /**
+     * @param threadCnt Thread count.
+     * @param iters Number of iterations for each worker thread.
+     * @param pollEvicted Poll evicted events concurrently, if true.
+     * @throws Exception If failed.
+     */
+    public void checkSortedIndexMultithreaded(final int threadCnt, final int iters, final boolean pollEvicted)
+        throws Exception {
+        final StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
+
+        win.setMaximumSize(threadCnt * 2);
+        win.setIndexes(indexProvider(true, "idx", new IndexUpdater(), EVENT_TRACKING_ON_DEDUP, false));
+
+        win.start();
+
+        IgniteFuture<Long> pollFut = null;
+
+        if (pollEvicted) {
+            // These threads poll evicted events from the window if it doesn't break
+            // the test invariant.
+            pollFut = runMultiThreadedAsync(new CAX() {
+                @Override public void applyx() throws GridException {
+                    try {
+                        while (!Thread.currentThread().isInterrupted()) {
+                            StreamerIndex<String, String, Integer> idx = win.index("idx");
+
+                            boolean canPoll = F.forAll(
+                                idx.entries(-1 * threadCnt),
+                                new P1<StreamerIndexEntry<String, String, Integer>>() {
+                                    @Override public boolean apply(StreamerIndexEntry<String, String, Integer> e) {
+                                        return e.value() > 2;
+                                    }
+                                });
+
+                            if (!canPoll || win.pollEvicted() == null)
+                                U.sleep(50);
+                        }
+                    }
+                    catch (GridInterruptedException ignored) {
+                        // No-op.
+                    }
+                }
+            }, threadCnt / 4, "test-poll");
+        }
+
+        try {
+            // Each of these threads generates a single event repeatedly and checks
+            // if it is still present in the window. In the tested index events are
+            // sorted by value and the value is a number of repeated events, so, this
+            // should be invariant.
+            IgniteFuture<Long> fut1 = runMultiThreadedAsync(new CAX() {
+                @Override public void applyx() throws GridException {
+                    final String evt = Thread.currentThread().getName();
+                    int cntr = 1;
+
+                    for (int i = 0; i < iters && !Thread.currentThread().isInterrupted(); i++) {
+                        win.enqueue(evt);
+
+                        StreamerIndex<String, String, Integer> idx = win.index("idx");
+                        StreamerIndexEntry<String, String, Integer> entry = idx.entry(evt);
+
+                        assertNotNull(entry);
+
+                        // If concurrent eviction is disabled, check if the
+                        // value grows each time we enqueue a new event.
+                        if (!pollEvicted)
+                            assertEquals((Integer)cntr++, entry.value());
+
+                        // If queued event more than once, the first threadCnt entries
+                        // in descending order should contain an entry with this thread's event.
+                        if (i > 0)
+                            assert idx.entries(-1 * threadCnt).contains(entry);
+                    }
+                }
+            }, threadCnt / 2, "test-multi");
+
+            // This thread generates a set of single non-repeating events from 0 to iters.
+            IgniteFuture<Long> fut2 = runMultiThreadedAsync(new CAX() {
+                @Override public void applyx() throws GridException {
+                    for (int i = 0; i < iters && !Thread.currentThread().isInterrupted(); i++)
+                        win.enqueue(String.valueOf(i));
+                }
+            }, 1, "test-single");
+
+            fut2.get(getTestTimeout());
+            fut1.get(getTestTimeout());
+        }
+        finally {
+            if (pollFut != null)
+                pollFut.cancel();
+        }
+    }
+
+    /**
+     * @param idx Index.
+     * @throws GridException If failed.
+     */
+    private void checkNonUniqueIndex(StreamerIndexProvider<String, String, Integer> idx) throws GridException {
+        assert !idx.isUnique();
+
+        StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
+
+        win.setMaximumSize(5);
+        win.setIndexes(idx);
+
+        win.start();
+
+        for (int i = 0; i < 20; ) {
+            win.enqueue("A" + i); i++;
+            win.enqueue("B"); i++;
+            win.enqueue("C"); i++;
+            win.enqueue("D"); i++;
+        }
+
+        StreamerIndex<String, String, Integer> idx0 = win.index("idx");
+
+        String s;
+
+        while ((s = win.pollEvicted()) != null)
+            info("Evicted String: " + s);
+
+        StreamerIndex<String, String, Integer> idx1 = win.index("idx");
+
+        if (idx instanceof StreamerTreeIndexProvider) { // Tree index.
+            assert idx0.sorted();
+
+            // Users with unique names.
+            for (StreamerIndexEntry<String, String, Integer> e : idx0.entrySet(1)) {
+                info("Entry [e=" + e + ", evts=" + e.events() + ']');
+
+                if (idx.getPolicy() == EVENT_TRACKING_ON || idx.getPolicy() == EVENT_TRACKING_ON_DEDUP) {
+                    assertEquals(1, e.events().size());
+                    assertEquals('A', F.first(e.events()).charAt(0));
+                }
+            }
+
+            assertTrue(idx0.entrySet(2).isEmpty());
+
+            for (StreamerIndexEntry<String, String, Integer> e : idx0.entrySet(5)) {
+                info("Entry [e=" + e + ", evts=" + e.events() + ']');
+
+                if (idx.getPolicy() == EVENT_TRACKING_ON)
+                    assertEquals(5, e.events().size());
+
+                else if (idx.getPolicy() == EVENT_TRACKING_ON_DEDUP)
+                    assertEquals(1, e.events().size());
+
+                else
+                    assertNull(e.events());
+            }
+
+            assertEquals(5, idx0.entrySet(1).size());
+
+            List<StreamerIndexEntry<String, String, Integer>> asc =
+                new ArrayList<>(idx0.entrySet(true, null, true, null, true));
+            List<StreamerIndexEntry<String, String, Integer>> desc =
+                new ArrayList<>(idx0.entrySet(false, null, true, null, true));
+
+            assertEquals(8, asc.size());
+            assertEquals(8, desc.size());
+
+            for (int i = 0; i < asc.size(); i++)
+                assertEquals(asc.get(i), desc.get(desc.size() - i - 1));
+
+            try {
+                idx0.entrySet(true, 10, true, -10, true);
+
+                assert false;
+            }
+            catch (IllegalArgumentException e) {
+                info("Caught expected exception: " + e);
+            }
+
+            try {
+                idx0.entrySet(false, -10, true, 10, true);
+
+                assert false;
+            }
+            catch (IllegalArgumentException e) {
+                info("Caught expected exception: " + e);
+            }
+        }
+        else
+            assert !idx0.sorted();
+
+        assertEquals(4, idx1.size());
+
+        for (StreamerIndexEntry<String, String, Integer> e : idx1.entries(0)) {
+            Collection<String> evts = e.events();
+
+            info("Entry [e=" + e + ", evts=" + evts + ']');
+
+            if (idx.getPolicy() == EVENT_TRACKING_ON) {
+                assert evts != null;
+
+                switch (evts.size()) {
+                    case 1:
+                        assert F.containsAny(evts, "A16", "B", "C") : "Wrong tracked event: " + F.first(evts);
+
+                        break;
+
+                    case 2:
+                        Collection<String> dedup = F.dedup(evts);
+
+                        assert dedup.size() == 1 && "D".equals(F.first(dedup)) : "Wrong tracked events: " + evts;
+
+                        break;
+
+                    default:
+                        fail("Wrong tracked events: " + evts);
+                }
+            }
+            else if (idx.getPolicy() == EVENT_TRACKING_ON_DEDUP)
+                assert evts != null && evts.size() == 1 && F.containsAny(evts, "A16", "B", "C", "D") :
+                    "Wrong tracked events: " + evts;
+            else if (idx.getPolicy() == EVENT_TRACKING_OFF)
+                assert evts == null;
+        }
+
+        // Check that idx0 is unaffected.
+        assertEquals(8, idx0.size());
+
+        idx.reset();
+
+        assertEquals(0, idx.index().size());
+        assertEquals(8, idx0.size());
+    }
+
+    /**
+     * @param idx Index.
+     * @throws GridException If failed.
+     */
+    private void checkUniqueIndex(StreamerIndexProvider<String, String, String> idx) throws GridException {
+        assert idx.isUnique();
+
+        StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
+
+        win.setMaximumSize(5);
+        win.setIndexes(idx);
+
+        win.start();
+
+        for (int i = 0; i < 20; i++)
+            win.enqueue("A" + i);
+
+        for (int i = 0; i < 20; i++) {
+            try {
+                win.enqueue("A" + i);
+
+                fail("Exception should have been thrown.");
+            }
+            catch (GridException e) {
+                info("Caught expected exception: " + e);
+            }
+        }
+
+        StreamerIndex<String, String, String> idx0 = win.index("idx");
+
+        String s;
+
+        while ((s = win.pollEvicted()) != null)
+            info("Evicted string: " + s);
+
+        StreamerIndex<String, String, String> idx1 = win.index("idx");
+
+        if (idx instanceof StreamerTreeIndexProvider) { // Tree index.
+            assert idx0.sorted();
+
+            // Users with unique names.
+            for (StreamerIndexEntry<String, String, String> e : idx0.entrySet("A0")) {
+                info("Entry [e=" + e + ", evts=" + e.events() + ']');
+
+                if (idx.getPolicy() == EVENT_TRACKING_ON || idx.getPolicy() == EVENT_TRACKING_ON_DEDUP) {
+                    assertEquals(1, e.events().size());
+                    assertEquals('A', F.first(e.events()).charAt(0));
+                }
+            }
+
+            assertTrue(idx0.entrySet("B").isEmpty());
+
+            assertEquals(1, idx0.entrySet("A0").size());
+
+            List<StreamerIndexEntry<String, String, String>> asc =
+                new ArrayList<>(idx0.entrySet(true, null, true, null, true));
+            List<StreamerIndexEntry<String, String, String>> desc =
+                new ArrayList<>(idx0.entrySet(false, null, true, null, true));
+
+            assertEquals(20, asc.size());
+            assertEquals(20, desc.size());
+
+            for (int i = 0; i < asc.size(); i++)
+                assertEquals(asc.get(i), desc.get(desc.size() - i - 1));
+        }
+        else
+            assert !idx0.sorted();
+
+        assertEquals(5, idx1.size());
+
+        for (StreamerIndexEntry<String, String, String> e : idx1.entries(0)) {
+            Collection<String> evts = e.events();
+
+            info("Entry [e=" + e + ", evts=" + evts + ']');
+
+            if (idx.getPolicy() == EVENT_TRACKING_ON || idx.getPolicy() == EVENT_TRACKING_ON_DEDUP) {
+                assert evts != null && evts.size() == 1 : "Wrong tracked events: " + evts;
+
+                int i = Integer.parseInt(F.first(evts).substring(1));
+
+                assert i >= 15 && i < 20 : "Wrong event: " + F.first(evts);
+            }
+            else if (idx.getPolicy() == EVENT_TRACKING_OFF)
+                assert evts == null;
+        }
+
+        // Check that idx0 is unaffected.
+        assertEquals(20, idx0.size());
+
+        idx.reset();
+
+        assertEquals(0, idx.index().size());
+        assertEquals(20, idx0.size());
+    }
+
+    /**
+     * Name index updater.
+     */
+    private static class IndexUpdater implements StreamerIndexUpdater<String, String, Integer> {
+        /** {@inheritDoc} */
+        @Nullable @Override public String indexKey(String evt) {
+            return evt;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Integer initialValue(String evt, String key) {
+            return 1;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Integer onAdded(StreamerIndexEntry<String, String, Integer> entry, String evt) {
+            return entry.value() + 1;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Integer onRemoved(StreamerIndexEntry<String, String, Integer> entry,
+            String evt) {
+            int res = entry.value() - 1;
+
+            return res == 0 ? null : res;
+        }
+    }
+
+    /**
+     * Name index updater.
+     */
+    private static class HashIndexUpdater implements StreamerIndexUpdater<String, String, Integer> {
+        /** {@inheritDoc} */
+        @Nullable @Override public String indexKey(String evt) {
+            return evt;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Integer initialValue(String evt, String key) {
+            return 1;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Integer onAdded(StreamerIndexEntry<String, String, Integer> entry, String evt) {
+            return entry.value() + 1;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Integer onRemoved(StreamerIndexEntry<String, String, Integer> entry,
+            String evt) {
+            int res = entry.value() - 1;
+
+            return res == 0 ? null : res;
+        }
+    }
+
+    /**
+     * Name index updater.
+     */
+    private static class UniqueStringIndexUpdater implements StreamerIndexUpdater<String, String, String> {
+        /** {@inheritDoc} */
+        @Nullable @Override public String indexKey(String evt) {
+            return evt;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public String initialValue(String evt, String key) {
+            return evt;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public String onAdded(StreamerIndexEntry<String, String, String> entry, String evt)
+            throws GridException {
+            throw new GridException("Unique key violation: " + evt);
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public String onRemoved(StreamerIndexEntry<String, String, String> entry,
+            String evt) {
+            // On remove we return null as index is unique.
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4794dd4d/modules/core/src/test/java/org/apache/ignite/streamer/window/GridStreamerWindowSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/streamer/window/GridStreamerWindowSelfTest.java b/modules/core/src/test/java/org/apache/ignite/streamer/window/GridStreamerWindowSelfTest.java
new file mode 100644
index 0000000..ce864f9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/streamer/window/GridStreamerWindowSelfTest.java
@@ -0,0 +1,905 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.streamer.window;
+
+import org.apache.ignite.lang.*;
+import org.apache.ignite.streamer.*;
+import org.apache.ignite.streamer.window.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.testframework.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Streamer window self test.
+ */
+public class GridStreamerWindowSelfTest extends GridCommonAbstractTest {
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedSizeWindowValidation() throws Exception {
+        final StreamerBoundedSizeWindow win = new StreamerBoundedSizeWindow();
+
+        win.start();
+
+        win.setMaximumSize(-1);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                win.start();
+
+                return null;
+            }
+        }, GridException.class, null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedTimeWindowValidation() throws Exception {
+        final StreamerBoundedTimeWindow win = new StreamerBoundedTimeWindow();
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                win.start();
+
+                return null;
+            }
+        }, GridException.class, null);
+
+        win.setTimeInterval(1);
+
+        win.start();
+
+        win.setMaximumSize(-1);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                win.start();
+
+                return null;
+            }
+        }, GridException.class, null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedSizeBatchWindowValidation() throws Exception {
+        final StreamerBoundedSizeBatchWindow win = new StreamerBoundedSizeBatchWindow();
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                win.start();
+
+                return null;
+            }
+        }, GridException.class, null);
+
+        win.setBatchSize(1);
+
+        win.start();
+
+        win.setMaximumBatches(-1);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                win.start();
+
+                return null;
+            }
+        }, GridException.class, null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedTimeBatchWindowValidation() throws Exception {
+        final StreamerBoundedTimeBatchWindow win = new StreamerBoundedTimeBatchWindow();
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                win.start();
+
+                return null;
+            }
+        }, GridException.class, null);
+
+        win.setBatchSize(1);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                win.start();
+
+                return null;
+            }
+        }, GridException.class, null);
+
+        win.setBatchTimeInterval(1);
+        win.setBatchSize(-1);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                win.start();
+
+                return null;
+            }
+        }, GridException.class, null);
+
+        win.setBatchSize(1);
+
+        win.start();
+
+        win.setMaximumBatches(-1);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                win.start();
+
+                return null;
+            }
+        }, GridException.class, null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedWindow() throws Exception {
+        final StreamerBoundedSizeWindow<Integer> win = new StreamerBoundedSizeWindow<>();
+
+        win.setMaximumSize(50);
+
+        win.start();
+
+        for (int i = 0; i < 50; i++)
+            win.enqueue(i);
+
+        assertNull(win.pollEvicted());
+
+        for(int i = 50; i < 60; i++)
+            win.enqueue(i);
+
+        for (int i = 0; i < 10; i++)
+            assert i == win.pollEvicted();
+
+        assertNull(win.pollEvicted());
+
+        checkIterator(win);
+
+        win.setMaximumSize(2);
+
+        win.start();
+
+        win.enqueue(3, 2, 1);
+
+        checkSnapshot(win.snapshot(true), 3, 2, 1);
+        checkSnapshot(win.snapshot(false), 2, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedWindowUnique() throws Exception {
+        final StreamerBoundedSizeWindow<Integer> win = new StreamerBoundedSizeWindow<>();
+
+        win.setMaximumSize(50);
+        win.setUnique(true);
+
+        win.start();
+
+        for (int i = 0; i < 50; i++)
+            win.enqueue(i);
+
+        for (int i = 0; i < 50; i++)
+            win.enqueue(i);
+
+        assertNull(win.pollEvicted());
+
+        int idx = 0;
+
+        for (Object evt : win) {
+            Integer next = (Integer)evt;
+
+            assertEquals((Integer)idx++, next);
+        }
+
+        checkIterator(win);
+
+        win.setMaximumSize(2);
+
+        win.start();
+
+        win.enqueue(3, 2, 1, 3);
+
+        checkSnapshot(win.snapshot(true), 3, 2, 1);
+        checkSnapshot(win.snapshot(false), 2, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedSortedWindow() throws Exception {
+        final StreamerBoundedSizeSortedWindow<Integer> win = new StreamerBoundedSizeSortedWindow<>();
+
+        win.setMaximumSize(60);
+
+        win.start();
+
+        for (int i = 59; i >= 0; i--)
+            win.enqueue(i);
+
+        assertNull(win.pollEvicted());
+
+        for (int i = 59; i >= 0; i--)
+            win.enqueue(i);
+
+        for (int i = 59; i >= 30; i--) {
+            assert i == win.pollEvicted();
+            assert i == win.pollEvicted();
+        }
+
+        assertNull(win.pollEvicted());
+
+        checkIterator(win);
+
+        win.setMaximumSize(2);
+
+        win.start();
+
+        win.enqueue(3, 2, 1, 4);
+
+        checkSnapshot(win.snapshot(true), 1, 2, 3, 4);
+        checkSnapshot(win.snapshot(false), 3, 4);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedSortedWindowUnique() throws Exception {
+        final StreamerBoundedSizeSortedWindow<Integer> win = new StreamerBoundedSizeSortedWindow<>();
+
+        win.setMaximumSize(-1);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                win.start();
+
+                return null;
+            }
+        }, GridException.class, null);
+
+        win.setMaximumSize(60);
+        win.setUnique(true);
+
+        win.start();
+
+        for (int i = 59; i >= 0; i--)
+            win.enqueue(i);
+
+        assertNull(win.pollEvicted());
+
+        for (int i = 59; i >= 0; i--)
+            win.enqueue(i);
+
+        assertNull(win.pollEvicted());
+
+        for (int i = 99; i >= 60; i--)
+            win.enqueue(i);
+
+        for (int i = 99; i >= 60; i--)
+            assert i == win.pollEvicted();
+
+        assertNull(win.pollEvicted());
+
+        checkIterator(win);
+
+        win.setMaximumSize(2);
+
+        win.start();
+
+        win.enqueue(3, 2, 1, 3, 4);
+
+        checkSnapshot(win.snapshot(true), 1, 2, 3, 4);
+        checkSnapshot(win.snapshot(false), 3, 4);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedSizeBatchDequeueWindow() throws Exception {
+        final StreamerBoundedSizeBatchWindow<Integer> win = new StreamerBoundedSizeBatchWindow<>();
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                win.start();
+
+                return null;
+            }
+        }, GridException.class, null);
+
+        win.setBatchSize(10);
+        win.setMaximumBatches(2);
+
+        win.start();
+
+        for (int i = 0; i < 20; i++)
+            win.enqueue(i);
+
+        assertNull(win.pollEvicted());
+        assertEquals(0, win.pollEvictedBatch().size());
+
+        win.enqueue(20);
+
+        Collection<Integer> evicted = win.pollEvictedBatch();
+
+        assertEquals(10, evicted.size());
+
+        Iterator<Integer> it = evicted.iterator();
+
+        for (int i = 0; i < 10; i++)
+            assert i == it.next();
+
+        assertNull(win.pollEvicted());
+        assertEquals(0, win.pollEvictedBatch().size());
+
+        for (int i = 21; i < 30; i++)
+            win.enqueue(i);
+
+        assertNull(win.pollEvicted());
+        assertEquals(0, win.pollEvictedBatch().size());
+
+        win.enqueue(30);
+
+        assert 10 == win.pollEvicted();
+
+        evicted = win.pollEvictedBatch();
+
+        assertEquals(9, evicted.size());
+
+        it = evicted.iterator();
+
+        for (int i = 11; i < 20; i++)
+            assert i == it.next();
+
+        assertNull(win.pollEvicted());
+        assertEquals(0, win.pollEvictedBatch().size());
+
+        checkIterator(win);
+
+        win.setMaximumBatches(2);
+        win.setBatchSize(2);
+
+        win.start();
+
+        win.enqueue(1, 2, 3, 4, 5, 6, 7);
+
+        // We expect that the first two batches will be evicted.
+        checkSnapshot(win.snapshot(true), 1, 2, 3, 4, 5, 6, 7);
+        checkSnapshot(win.snapshot(false), 5, 6, 7);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedTimeDequeueWindow() throws Exception {
+        final StreamerBoundedTimeWindow<Integer> win = new StreamerBoundedTimeWindow<>();
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                win.start();
+
+                return null;
+            }
+        }, GridException.class, null);
+
+        win.setMaximumSize(60);
+        win.setTimeInterval(40);
+
+        win.start();
+
+        for (int i = 59; i >= 0; i--)
+            win.enqueue(i);
+
+        assertNull(win.pollEvicted());
+
+        for (int i = 59; i >= 0; i--)
+            win.enqueue(i);
+
+        for (int i = 59; i >= 0; i--)
+            assert i == win.pollEvicted();
+
+        assertNull(win.pollEvicted());
+
+        checkIterator(win);
+
+        win.setMaximumSize(2);
+        win.setTimeInterval(200);
+
+        win.start();
+
+        win.enqueue(1, 2, 3);
+
+        checkSnapshot(win.snapshot(true), 1, 2, 3);
+        checkSnapshot(win.snapshot(false), 2, 3);
+
+        U.sleep(400);
+
+        win.enqueue(4);
+
+        checkSnapshot(win.snapshot(true), 1, 2, 3, 4);
+        checkSnapshot(win.snapshot(false), 4);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedTimeBatchDequeueWindow() throws Exception {
+        final StreamerBoundedTimeBatchWindow<Integer> win = new StreamerBoundedTimeBatchWindow<>();
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                win.start();
+
+                return null;
+            }
+        }, GridException.class, null);
+
+        win.setBatchSize(50);
+        win.setBatchTimeInterval(500);
+        win.setMaximumBatches(2);
+
+        win.start();
+
+        for (int i = 0; i < 25; i++)
+            win.enqueue(i);
+
+        U.sleep(1000);
+
+        Collection<Integer> evicted = win.pollEvictedBatch();
+
+        assertNotNull(evicted);
+        assertEquals(25, evicted.size());
+
+        for (int i = 0; i < 101; i++)
+            win.enqueue(i);
+
+        evicted = win.pollEvictedBatch();
+
+        assertNotNull(evicted);
+        assertEquals(50, evicted.size());
+
+        U.sleep(1000);
+
+        evicted = win.pollEvictedBatch();
+
+        assertNotNull(evicted);
+        assertEquals(50, evicted.size());
+
+        evicted = win.pollEvictedBatch();
+
+        assertNotNull(evicted);
+        assertEquals(1, evicted.size());
+
+        checkIterator(win);
+
+        win.setMaximumBatches(2);
+        win.setBatchSize(2);
+        win.setBatchTimeInterval(200);
+
+        win.start();
+
+        win.enqueue(1, 2, 3, 4, 5, 6, 7);
+
+        // We expect that the first two batches will be evicted.
+        checkSnapshot(win.snapshot(true), 1, 2, 3, 4, 5, 6, 7);
+        checkSnapshot(win.snapshot(false), 5, 6, 7);
+
+        U.sleep(400);
+
+        checkSnapshot(win.snapshot(true), 1, 2, 3, 4, 5, 6, 7);
+        checkSnapshot(win.snapshot(false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUnboundedDequeueWindow() throws Exception {
+        final StreamerUnboundedWindow<Integer> win = new StreamerUnboundedWindow<>();
+
+        win.start();
+
+        for (int i = 0; i < 50; i++)
+            win.enqueue(i);
+
+        assertNull(win.pollEvicted());
+
+        assert win.size() == 50;
+
+        checkIterator(win);
+
+        win.reset();
+
+        win.enqueue(3, 1, 2);
+
+        checkSnapshot(win.snapshot(true), 3, 1, 2);
+        checkSnapshot(win.snapshot(false), 3, 1, 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedSizeDequeueWindowMultithreaded() throws Exception {
+        StreamerBoundedSizeWindow<Integer> win = new StreamerBoundedSizeWindow<>();
+
+        win.setMaximumSize(500);
+        win.setUnique(false);
+
+        win.start();
+
+        checkWindowMultithreaded(win, 100000, 10, 1000);
+
+        win.consistencyCheck();
+
+        finalChecks(win, 500);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedSizeDequeueWindowUniqueMultithreaded() throws Exception {
+        StreamerBoundedSizeWindow<Integer> win = new StreamerBoundedSizeWindow<>();
+
+        win.setMaximumSize(500);
+        win.setUnique(true);
+
+        win.start();
+
+        checkWindowMultithreaded(win, 100000, 10, 1000);
+
+        win.consistencyCheck();
+
+        finalChecks(win, 500);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedSizeBatchDequeueWindowMultithreaded() throws Exception {
+        StreamerBoundedSizeBatchWindow<Integer> win = new StreamerBoundedSizeBatchWindow<>();
+
+        win.setMaximumBatches(10);
+        win.setBatchSize(50);
+
+        win.start();
+
+        checkWindowMultithreaded(win, 100000, 10, 1000);
+
+        win.consistencyCheck();
+
+        finalChecks(win, 500);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedSizeSortedDequeueWindowMultithreaded() throws Exception {
+        StreamerBoundedSizeSortedWindow<Integer> win = new StreamerBoundedSizeSortedWindow<>();
+
+        win.setMaximumSize(500);
+        win.setUnique(false);
+
+        win.start();
+
+        checkWindowMultithreaded(win, 100000, 10, 1000);
+
+        win.consistencyCheck();
+
+        finalChecks(win, 500);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedSizeSortedDequeueWindowUniqueMultithreaded() throws Exception {
+        StreamerBoundedSizeSortedWindow<Integer> win = new StreamerBoundedSizeSortedWindow<>();
+
+        win.setMaximumSize(500);
+        win.setUnique(true);
+
+        win.start();
+
+        checkWindowMultithreaded(win, 100000, 10, 1000);
+
+        win.consistencyCheck();
+
+        finalChecks(win, 500);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedTimeDequeueWindowMultithreaded() throws Exception {
+        StreamerBoundedTimeWindow<Integer> win = new StreamerBoundedTimeWindow<>();
+
+        win.setMaximumSize(500);
+        win.setTimeInterval(40); // 40ms time interval.
+        win.setUnique(false);
+
+        win.start();
+
+        checkWindowMultithreaded(win, 100000, 10, 1000);
+
+        win.consistencyCheck();
+
+        finalChecks(win, 500);
+
+        U.sleep(1000);
+
+        finalChecks(win, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedTimeDequeueWindowUniqueMultithreaded() throws Exception {
+        StreamerBoundedTimeWindow<Integer> win = new StreamerBoundedTimeWindow<>();
+
+        win.setMaximumSize(500);
+        win.setTimeInterval(40); // 40ms time interval.
+        win.setUnique(true);
+
+        win.start();
+
+        checkWindowMultithreaded(win, 100000, 10, 1000);
+
+        win.consistencyCheck();
+
+        finalChecks(win, 500);
+
+        U.sleep(1000);
+
+        finalChecks(win, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBoundedTimeBatchDequeueWindowMultithreaded() throws Exception {
+        StreamerBoundedTimeBatchWindow<Integer> win = new StreamerBoundedTimeBatchWindow<>();
+
+        win.setMaximumBatches(10);
+        win.setBatchTimeInterval(100);
+        win.setBatchSize(50);
+
+        win.start();
+
+        checkWindowMultithreaded(win, 100000, 10, 1000);
+
+        win.consistencyCheck();
+
+        finalChecks(win, 500);
+
+        U.sleep(1000);
+
+        finalChecks(win, 0);
+    }
+
+    /**
+     * Check iterator behaviour.
+     *
+     * @param win Window.
+     * @throws Exception If failed.
+     */
+    private void checkIterator(StreamerWindow<Integer> win) throws Exception {
+        win.reset();
+
+        assert win.size() == 0;
+
+        win.enqueue(1);
+
+        assert win.size() == 1;
+
+        final Iterator<Integer> iter = win.iterator();
+
+        win.enqueue(2);
+
+        assert win.size() == 2;
+
+        assert iter.hasNext();
+
+        GridTestUtils.assertThrows(log(), new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                iter.remove();
+
+                return null;
+            }
+        }, IllegalStateException.class, null);
+
+        assert iter.next() == 1;
+
+        iter.remove();
+
+        assert !iter.hasNext();
+
+        GridTestUtils.assertThrows(log(), new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                iter.next();
+
+                return null;
+            }
+        }, NoSuchElementException.class, null);
+
+        assert win.size() == 1;
+    }
+
+    /**
+     * Final checks.
+     *
+     * @param win Window to check.
+     * @param maxSize Max window size.
+     * @throws GridException If failed.
+     */
+    private void finalChecks(StreamerWindow<Integer> win, int maxSize) throws GridException {
+        int evictQueueSize = win.evictionQueueSize();
+
+        info("Eviction queue size for final checks: " + evictQueueSize);
+
+        Collection<Integer> evicted = win.pollEvictedAll();
+
+        info("Evicted entries in final checks: " + evicted.size());
+
+        int winSize = win.size();
+
+        win.pollEvictedAll();
+
+        assertTrue("Unexpected window size [winSize=" + winSize + " maxSize=" + maxSize + ']', winSize <= maxSize);
+    }
+
+    /**
+     * @param win Window to check.
+     * @param iterCnt Iteration count.
+     * @param threadCnt Thread count.
+     * @param range Range for key generation.
+     * @throws Exception If failed.
+     */
+    private void checkWindowMultithreaded(
+        final StreamerWindow<Integer> win,
+        final int iterCnt,
+        int threadCnt,
+        final int range
+    ) throws Exception {
+        final AtomicInteger polled = new GridAtomicInteger();
+
+        final AtomicInteger added = new GridAtomicInteger();
+
+        IgniteFuture<?> fut = multithreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                Random rnd = new Random();
+
+                for (int i = 0; i < iterCnt; i++) {
+                    if (i > 0 && i % 10000 == 0)
+                        info("Finished " + i + " iterations");
+
+                    int op = rnd.nextInt(8);
+
+                    switch (op) {
+                        case 0: {
+                            // Add.
+                            for (int j = 0; j < 30; j++)
+                                win.enqueue(rnd.nextInt(range));
+
+                            added.addAndGet(30);
+
+                            break;
+                        }
+
+                        case 1: {
+                            // Add bunch.
+                            for (int j = 0; j < 10; j++)
+                                win.enqueue(rnd.nextInt(range), rnd.nextInt(range), rnd.nextInt(range),
+                                    rnd.nextInt(range), rnd.nextInt(range), rnd.nextInt(range));
+
+                            added.addAndGet(10 * 6);
+
+                            break;
+                        }
+
+                        case 2: {
+                            Object o = win.pollEvicted();
+
+                            if (o != null)
+                                polled.incrementAndGet();
+
+                            break;
+                        }
+
+                        case 3: {
+                            Collection<Integer> p0 = win.pollEvicted(50);
+
+                            polled.addAndGet(p0.size());
+
+                            break;
+                        }
+
+                        case 4: {
+                            Collection<Integer> p0 = win.pollEvictedBatch();
+
+                            polled.addAndGet(p0.size());
+
+                            break;
+                        }
+
+                        case 5: {
+                            Object o = win.dequeue();
+
+                            if (o != null)
+                                polled.incrementAndGet();
+
+                            break;
+                        }
+
+                        case 6: {
+                            Collection<Integer> p0 = win.dequeue(50);
+
+                            polled.addAndGet(p0.size());
+
+                            break;
+                        }
+
+                        case 7: {
+                            Iterator<Integer> it = win.iterator();
+
+                            while (it.hasNext()) {
+                                it.next();
+
+                                if (rnd.nextInt(10) == 5) {
+                                    it.remove();
+
+                                    polled.incrementAndGet();
+                                }
+                            }
+
+                            break;
+                        }
+                    }
+                }
+
+                return null;
+            }
+        }, threadCnt);
+
+        fut.get();
+
+        // Cannot assert on added, polled and window size because iterator does not return status.
+        info("Window size: " + win.size());
+        info("Added=" + added.get() + ", polled=" + polled.get());
+    }
+
+    /**
+     * Check snapshto content.
+     *
+     * @param snapshot Snapshot.
+     * @param vals Expected values.
+     */
+    private void checkSnapshot(Collection<Integer> snapshot, Object... vals) {
+        assert snapshot.size() == vals.length;
+
+        int i = 0;
+
+        for (Object evt : snapshot)
+            assertTrue(F.eq(evt, vals[i++]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4794dd4d/modules/core/src/test/java/org/gridgain/grid/streamer/index/GridStreamerIndexSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/streamer/index/GridStreamerIndexSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/streamer/index/GridStreamerIndexSelfTest.java
deleted file mode 100644
index 60aba6a..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/streamer/index/GridStreamerIndexSelfTest.java
+++ /dev/null
@@ -1,682 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
-*  __  ____/___________(_)______  /__  ____/______ ____(_)_______
-*  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
-*  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
-*  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
-*/
-
-package org.gridgain.grid.streamer.index;
-
-import org.apache.ignite.lang.*;
-import org.apache.ignite.streamer.window.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.streamer.index.hash.*;
-import org.gridgain.grid.streamer.index.tree.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.gridgain.testframework.junits.common.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.gridgain.grid.streamer.index.StreamerIndexPolicy.*;
-import static org.gridgain.testframework.GridTestUtils.*;
-
-/**
- * Tests for Streamer window index.
- */
-public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTreeIndex() throws Exception {
-        for (StreamerIndexPolicy plc : StreamerIndexPolicy.values()) {
-            checkUniqueIndex(indexProvider(true, "idx", new UniqueStringIndexUpdater(), plc, true));
-
-            checkNonUniqueIndex(indexProvider(true, "idx", new IndexUpdater(), plc, false));
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testHashIndex() throws Exception {
-        for (StreamerIndexPolicy plc : StreamerIndexPolicy.values()) {
-            checkUniqueIndex(indexProvider(false, "idx", new UniqueStringIndexUpdater(), plc, true));
-
-            checkNonUniqueIndex(indexProvider(false, "idx", new IndexUpdater(), plc, false));
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testMultipleIndexUpdate() throws Exception {
-        StreamerIndexProvider<String, String, Integer> idxProvider =
-            indexProvider(true, "idx", new IndexUpdater(), EVENT_TRACKING_ON, false);
-
-        StreamerIndexProvider<String, String, String> idxProvider1 =
-            indexProvider(true, "idx1", new UniqueStringIndexUpdater(), EVENT_TRACKING_ON, true);
-
-        StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
-
-        win.setMaximumSize(5);
-        win.setIndexes(idxProvider, idxProvider1);
-
-        win.start();
-
-        win.enqueue("A");
-        win.enqueue("B");
-        win.enqueue("C");
-        win.enqueue("D");
-
-        // Snapshot both indexes.
-        StreamerIndex<String, String, Integer> idx = win.index("idx");
-        StreamerIndex<String, String, String> idx1 = win.index("idx1");
-
-        info("Idx: " + idx.entries(0));
-        info("Idx1: " + idx1.entries(0));
-
-        try {
-            win.enqueue("A");
-
-            fail("Exception should have been thrown.");
-        }
-        catch (GridException e) {
-            info("Caught expected exception: " + e);
-        }
-
-        StreamerIndex<String, String, Integer> idxAfter = win.index("idx");
-        StreamerIndex<String, String, String> idx1After = win.index("idx1");
-
-        info("Idx (after): " + idxAfter.entries(0));
-        info("Idx1 (after): " + idx1After.entries(0));
-
-        assertEquals(4, idx.entries(0).size());
-        assertEquals(4, idx1.entries(0).size());
-
-        assertTrue(F.eqOrdered(idx.entries(0), idxAfter.entries(0)));
-        assertTrue(F.eqOrdered(idx1.entries(0), idx1After.entries(0)));
-
-        idxProvider.reset();
-
-        assertEquals(4, idx.entries(0).size());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSortedIndexMultithreaded() throws Exception {
-        checkSortedIndexMultithreaded(32, 500, false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSortedIndexMultithreadedWithConcurrentPollEvicted() throws Exception {
-        checkSortedIndexMultithreaded(32, 500, true);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testUniqueHashIndexMultithreaded() throws Exception {
-        checkUniqueHashIndexMultithreaded(32, 500);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testUpdaterIndexKeyNull() throws Exception {
-        checkIndexUpdater(new IndexUpdater() {
-            @Nullable @Override public String indexKey(String evt) {
-                return "A".equals(evt) ? null : evt;
-            }
-        });
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testUpdaterInitialValueNull() throws Exception {
-        checkIndexUpdater(new IndexUpdater() {
-            @Nullable @Override public Integer initialValue(String evt, String key) {
-                return "A".equals(evt) ? null : 1;
-            }
-        });
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testUpdaterOnAddedNull() throws Exception {
-        checkIndexUpdater(new IndexUpdater() {
-            @Nullable @Override
-            public Integer onAdded(StreamerIndexEntry<String, String, Integer> entry, String evt) {
-                return "A".equals(evt) ? null : entry.value() + 1;
-            }
-        });
-    }
-
-    /**
-     * Checks the correct behaviour of {@link StreamerIndexUpdater}, given that
-     * it discards event "A" and accepts event "B".
-     *
-     * @param updater Index updater.
-     * @throws GridException If failed.
-     */
-    private void checkIndexUpdater(StreamerIndexUpdater<String, String, Integer> updater) throws GridException {
-        List<StreamerIndexProvider<String, String, Integer>> idxps = Arrays.asList(
-            indexProvider(true, "tree", updater, StreamerIndexPolicy.EVENT_TRACKING_ON, false),
-            indexProvider(false, "hash", updater, StreamerIndexPolicy.EVENT_TRACKING_ON, false));
-
-        for (StreamerIndexProvider<String, String, Integer> idxp : idxps) {
-            StreamerUnboundedWindow<String> win = new StreamerUnboundedWindow<>();
-
-            win.setIndexes(idxp);
-
-            win.start();
-
-            win.enqueue("A");
-            win.enqueue("A");
-            win.enqueue("B");
-
-            StreamerIndex<String, Object, Object> idx = win.index(idxp.getName());
-
-            assertNotNull(idx);
-
-            assertNull(idx.entry("A"));
-
-            assertNotNull(idx.entry("B"));
-        }
-    }
-
-    /**
-     * @param treeIdx {@code True} to create tree index.
-     * @param name Name.
-     * @param updater Updater.
-     * @param plc Policy.
-     * @param unique Unique.
-     * @return Index provider.
-     */
-    private <E, K, V> StreamerIndexProvider<E, K, V> indexProvider(boolean treeIdx, String name,
-        StreamerIndexUpdater<E, K, V> updater, StreamerIndexPolicy plc, boolean unique) {
-        if (treeIdx) {
-            StreamerTreeIndexProvider<E, K, V> idx = new StreamerTreeIndexProvider<>();
-
-            idx.setName(name);
-            idx.setUpdater(updater);
-            idx.setUnique(unique);
-            idx.setPolicy(plc);
-
-            return idx;
-        }
-        else {
-            StreamerHashIndexProvider<E, K, V> idx = new StreamerHashIndexProvider<>();
-
-            idx.setName(name);
-            idx.setUpdater(updater);
-            idx.setUnique(unique);
-            idx.setPolicy(plc);
-
-            return idx;
-        }
-    }
-
-    /**
-     * @param threadCnt Thread count.
-     * @param iters Number of iterations for each worker thread.
-     * @throws Exception If failed.
-     */
-    private void checkUniqueHashIndexMultithreaded(int threadCnt, final int iters)
-        throws Exception {
-        StreamerIndexProvider<String, String, Integer> idxProvider =
-            indexProvider(false, "idx", new IndexUpdater(), EVENT_TRACKING_ON_DEDUP, true);
-
-        for (int i = 0; i < iters && !Thread.currentThread().isInterrupted(); i++) {
-            final StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
-
-            win.setMaximumSize(threadCnt * 2);
-            win.setIndexes(idxProvider);
-
-            win.start();
-
-            final String evt = "evt" + i;
-            final AtomicInteger nIdxErrors = new AtomicInteger();
-
-            // Submit the same event in multiple threads.
-            runMultiThreaded(new CAX() {
-                @Override public void applyx() throws GridException {
-                    try {
-                        win.enqueue(evt);
-                    }
-                    catch (GridException e) {
-                        if (e.getMessage().contains("Index unique key violation"))
-                            nIdxErrors.incrementAndGet();
-                        else
-                            throw e;
-                    }
-                }
-            }, threadCnt, "put");
-
-            // Only one thread should succeed, because the index is unique.
-            assertEquals(threadCnt - 1, nIdxErrors.get());
-
-            StreamerIndex<String, String, Integer> idx = win.index("idx");
-
-            // Only one event should be present and have value 1.
-            assertEquals(1, idx.entries(0).size());
-            assertEquals((Integer)1, idx.entry(evt).value());
-        }
-    }
-
-    /**
-     * @param threadCnt Thread count.
-     * @param iters Number of iterations for each worker thread.
-     * @param pollEvicted Poll evicted events concurrently, if true.
-     * @throws Exception If failed.
-     */
-    public void checkSortedIndexMultithreaded(final int threadCnt, final int iters, final boolean pollEvicted)
-        throws Exception {
-        final StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
-
-        win.setMaximumSize(threadCnt * 2);
-        win.setIndexes(indexProvider(true, "idx", new IndexUpdater(), EVENT_TRACKING_ON_DEDUP, false));
-
-        win.start();
-
-        IgniteFuture<Long> pollFut = null;
-
-        if (pollEvicted) {
-            // These threads poll evicted events from the window if it doesn't break
-            // the test invariant.
-            pollFut = runMultiThreadedAsync(new CAX() {
-                @Override public void applyx() throws GridException {
-                    try {
-                        while (!Thread.currentThread().isInterrupted()) {
-                            StreamerIndex<String, String, Integer> idx = win.index("idx");
-
-                            boolean canPoll = F.forAll(
-                                idx.entries(-1 * threadCnt),
-                                new P1<StreamerIndexEntry<String, String, Integer>>() {
-                                    @Override public boolean apply(StreamerIndexEntry<String, String, Integer> e) {
-                                        return e.value() > 2;
-                                    }
-                                });
-
-                            if (!canPoll || win.pollEvicted() == null)
-                                U.sleep(50);
-                        }
-                    }
-                    catch (GridInterruptedException ignored) {
-                        // No-op.
-                    }
-                }
-            }, threadCnt / 4, "test-poll");
-        }
-
-        try {
-            // Each of these threads generates a single event repeatedly and checks
-            // if it is still present in the window. In the tested index events are
-            // sorted by value and the value is a number of repeated events, so, this
-            // should be invariant.
-            IgniteFuture<Long> fut1 = runMultiThreadedAsync(new CAX() {
-                @Override public void applyx() throws GridException {
-                    final String evt = Thread.currentThread().getName();
-                    int cntr = 1;
-
-                    for (int i = 0; i < iters && !Thread.currentThread().isInterrupted(); i++) {
-                        win.enqueue(evt);
-
-                        StreamerIndex<String, String, Integer> idx = win.index("idx");
-                        StreamerIndexEntry<String, String, Integer> entry = idx.entry(evt);
-
-                        assertNotNull(entry);
-
-                        // If concurrent eviction is disabled, check if the
-                        // value grows each time we enqueue a new event.
-                        if (!pollEvicted)
-                            assertEquals((Integer)cntr++, entry.value());
-
-                        // If queued event more than once, the first threadCnt entries
-                        // in descending order should contain an entry with this thread's event.
-                        if (i > 0)
-                            assert idx.entries(-1 * threadCnt).contains(entry);
-                    }
-                }
-            }, threadCnt / 2, "test-multi");
-
-            // This thread generates a set of single non-repeating events from 0 to iters.
-            IgniteFuture<Long> fut2 = runMultiThreadedAsync(new CAX() {
-                @Override public void applyx() throws GridException {
-                    for (int i = 0; i < iters && !Thread.currentThread().isInterrupted(); i++)
-                        win.enqueue(String.valueOf(i));
-                }
-            }, 1, "test-single");
-
-            fut2.get(getTestTimeout());
-            fut1.get(getTestTimeout());
-        }
-        finally {
-            if (pollFut != null)
-                pollFut.cancel();
-        }
-    }
-
-    /**
-     * @param idx Index.
-     * @throws GridException If failed.
-     */
-    private void checkNonUniqueIndex(StreamerIndexProvider<String, String, Integer> idx) throws GridException {
-        assert !idx.isUnique();
-
-        StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
-
-        win.setMaximumSize(5);
-        win.setIndexes(idx);
-
-        win.start();
-
-        for (int i = 0; i < 20; ) {
-            win.enqueue("A" + i); i++;
-            win.enqueue("B"); i++;
-            win.enqueue("C"); i++;
-            win.enqueue("D"); i++;
-        }
-
-        StreamerIndex<String, String, Integer> idx0 = win.index("idx");
-
-        String s;
-
-        while ((s = win.pollEvicted()) != null)
-            info("Evicted String: " + s);
-
-        StreamerIndex<String, String, Integer> idx1 = win.index("idx");
-
-        if (idx instanceof StreamerTreeIndexProvider) { // Tree index.
-            assert idx0.sorted();
-
-            // Users with unique names.
-            for (StreamerIndexEntry<String, String, Integer> e : idx0.entrySet(1)) {
-                info("Entry [e=" + e + ", evts=" + e.events() + ']');
-
-                if (idx.getPolicy() == EVENT_TRACKING_ON || idx.getPolicy() == EVENT_TRACKING_ON_DEDUP) {
-                    assertEquals(1, e.events().size());
-                    assertEquals('A', F.first(e.events()).charAt(0));
-                }
-            }
-
-            assertTrue(idx0.entrySet(2).isEmpty());
-
-            for (StreamerIndexEntry<String, String, Integer> e : idx0.entrySet(5)) {
-                info("Entry [e=" + e + ", evts=" + e.events() + ']');
-
-                if (idx.getPolicy() == EVENT_TRACKING_ON)
-                    assertEquals(5, e.events().size());
-
-                else if (idx.getPolicy() == EVENT_TRACKING_ON_DEDUP)
-                    assertEquals(1, e.events().size());
-
-                else
-                    assertNull(e.events());
-            }
-
-            assertEquals(5, idx0.entrySet(1).size());
-
-            List<StreamerIndexEntry<String, String, Integer>> asc =
-                new ArrayList<>(idx0.entrySet(true, null, true, null, true));
-            List<StreamerIndexEntry<String, String, Integer>> desc =
-                new ArrayList<>(idx0.entrySet(false, null, true, null, true));
-
-            assertEquals(8, asc.size());
-            assertEquals(8, desc.size());
-
-            for (int i = 0; i < asc.size(); i++)
-                assertEquals(asc.get(i), desc.get(desc.size() - i - 1));
-
-            try {
-                idx0.entrySet(true, 10, true, -10, true);
-
-                assert false;
-            }
-            catch (IllegalArgumentException e) {
-                info("Caught expected exception: " + e);
-            }
-
-            try {
-                idx0.entrySet(false, -10, true, 10, true);
-
-                assert false;
-            }
-            catch (IllegalArgumentException e) {
-                info("Caught expected exception: " + e);
-            }
-        }
-        else
-            assert !idx0.sorted();
-
-        assertEquals(4, idx1.size());
-
-        for (StreamerIndexEntry<String, String, Integer> e : idx1.entries(0)) {
-            Collection<String> evts = e.events();
-
-            info("Entry [e=" + e + ", evts=" + evts + ']');
-
-            if (idx.getPolicy() == EVENT_TRACKING_ON) {
-                assert evts != null;
-
-                switch (evts.size()) {
-                    case 1:
-                        assert F.containsAny(evts, "A16", "B", "C") : "Wrong tracked event: " + F.first(evts);
-
-                        break;
-
-                    case 2:
-                        Collection<String> dedup = F.dedup(evts);
-
-                        assert dedup.size() == 1 && "D".equals(F.first(dedup)) : "Wrong tracked events: " + evts;
-
-                        break;
-
-                    default:
-                        fail("Wrong tracked events: " + evts);
-                }
-            }
-            else if (idx.getPolicy() == EVENT_TRACKING_ON_DEDUP)
-                assert evts != null && evts.size() == 1 && F.containsAny(evts, "A16", "B", "C", "D") :
-                    "Wrong tracked events: " + evts;
-            else if (idx.getPolicy() == EVENT_TRACKING_OFF)
-                assert evts == null;
-        }
-
-        // Check that idx0 is unaffected.
-        assertEquals(8, idx0.size());
-
-        idx.reset();
-
-        assertEquals(0, idx.index().size());
-        assertEquals(8, idx0.size());
-    }
-
-    /**
-     * @param idx Index.
-     * @throws GridException If failed.
-     */
-    private void checkUniqueIndex(StreamerIndexProvider<String, String, String> idx) throws GridException {
-        assert idx.isUnique();
-
-        StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
-
-        win.setMaximumSize(5);
-        win.setIndexes(idx);
-
-        win.start();
-
-        for (int i = 0; i < 20; i++)
-            win.enqueue("A" + i);
-
-        for (int i = 0; i < 20; i++) {
-            try {
-                win.enqueue("A" + i);
-
-                fail("Exception should have been thrown.");
-            }
-            catch (GridException e) {
-                info("Caught expected exception: " + e);
-            }
-        }
-
-        StreamerIndex<String, String, String> idx0 = win.index("idx");
-
-        String s;
-
-        while ((s = win.pollEvicted()) != null)
-            info("Evicted string: " + s);
-
-        StreamerIndex<String, String, String> idx1 = win.index("idx");
-
-        if (idx instanceof StreamerTreeIndexProvider) { // Tree index.
-            assert idx0.sorted();
-
-            // Users with unique names.
-            for (StreamerIndexEntry<String, String, String> e : idx0.entrySet("A0")) {
-                info("Entry [e=" + e + ", evts=" + e.events() + ']');
-
-                if (idx.getPolicy() == EVENT_TRACKING_ON || idx.getPolicy() == EVENT_TRACKING_ON_DEDUP) {
-                    assertEquals(1, e.events().size());
-                    assertEquals('A', F.first(e.events()).charAt(0));
-                }
-            }
-
-            assertTrue(idx0.entrySet("B").isEmpty());
-
-            assertEquals(1, idx0.entrySet("A0").size());
-
-            List<StreamerIndexEntry<String, String, String>> asc =
-                new ArrayList<>(idx0.entrySet(true, null, true, null, true));
-            List<StreamerIndexEntry<String, String, String>> desc =
-                new ArrayList<>(idx0.entrySet(false, null, true, null, true));
-
-            assertEquals(20, asc.size());
-            assertEquals(20, desc.size());
-
-            for (int i = 0; i < asc.size(); i++)
-                assertEquals(asc.get(i), desc.get(desc.size() - i - 1));
-        }
-        else
-            assert !idx0.sorted();
-
-        assertEquals(5, idx1.size());
-
-        for (StreamerIndexEntry<String, String, String> e : idx1.entries(0)) {
-            Collection<String> evts = e.events();
-
-            info("Entry [e=" + e + ", evts=" + evts + ']');
-
-            if (idx.getPolicy() == EVENT_TRACKING_ON || idx.getPolicy() == EVENT_TRACKING_ON_DEDUP) {
-                assert evts != null && evts.size() == 1 : "Wrong tracked events: " + evts;
-
-                int i = Integer.parseInt(F.first(evts).substring(1));
-
-                assert i >= 15 && i < 20 : "Wrong event: " + F.first(evts);
-            }
-            else if (idx.getPolicy() == EVENT_TRACKING_OFF)
-                assert evts == null;
-        }
-
-        // Check that idx0 is unaffected.
-        assertEquals(20, idx0.size());
-
-        idx.reset();
-
-        assertEquals(0, idx.index().size());
-        assertEquals(20, idx0.size());
-    }
-
-    /**
-     * Name index updater.
-     */
-    private static class IndexUpdater implements StreamerIndexUpdater<String, String, Integer> {
-        /** {@inheritDoc} */
-        @Nullable @Override public String indexKey(String evt) {
-            return evt;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Integer initialValue(String evt, String key) {
-            return 1;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Integer onAdded(StreamerIndexEntry<String, String, Integer> entry, String evt) {
-            return entry.value() + 1;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Integer onRemoved(StreamerIndexEntry<String, String, Integer> entry,
-            String evt) {
-            int res = entry.value() - 1;
-
-            return res == 0 ? null : res;
-        }
-    }
-
-    /**
-     * Name index updater.
-     */
-    private static class HashIndexUpdater implements StreamerIndexUpdater<String, String, Integer> {
-        /** {@inheritDoc} */
-        @Nullable @Override public String indexKey(String evt) {
-            return evt;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Integer initialValue(String evt, String key) {
-            return 1;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Integer onAdded(StreamerIndexEntry<String, String, Integer> entry, String evt) {
-            return entry.value() + 1;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Integer onRemoved(StreamerIndexEntry<String, String, Integer> entry,
-            String evt) {
-            int res = entry.value() - 1;
-
-            return res == 0 ? null : res;
-        }
-    }
-
-    /**
-     * Name index updater.
-     */
-    private static class UniqueStringIndexUpdater implements StreamerIndexUpdater<String, String, String> {
-        /** {@inheritDoc} */
-        @Nullable @Override public String indexKey(String evt) {
-            return evt;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public String initialValue(String evt, String key) {
-            return evt;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public String onAdded(StreamerIndexEntry<String, String, String> entry, String evt)
-            throws GridException {
-            throw new GridException("Unique key violation: " + evt);
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public String onRemoved(StreamerIndexEntry<String, String, String> entry,
-            String evt) {
-            // On remove we return null as index is unique.
-            return null;
-        }
-    }
-}


Mime
View raw message