ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [17/32] incubator-ignite git commit: # Renaming
Date Fri, 05 Dec 2014 10:03:18 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/tree/GridStreamerTreeIndexProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/tree/GridStreamerTreeIndexProvider.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/tree/GridStreamerTreeIndexProvider.java
deleted file mode 100644
index e1caf14..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/tree/GridStreamerTreeIndexProvider.java
+++ /dev/null
@@ -1,946 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index.tree;
-
-import com.romix.scala.collection.concurrent.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.streamer.index.*;
-import org.gridgain.grid.streamer.index.hash.*;
-import org.gridgain.grid.util.snaptree.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.gridgain.grid.streamer.index.GridStreamerIndexPolicy.*;
-
-/**
- * Tree index implementation of a {@link GridStreamerIndexProvider}.
- * <p>
- * The advantage of a tree index is that it maintains entries in a
- * sorted order, which is invaluable for many kinds of tasks, where
- * event ordering makes sense (like {@code GridStreamingPopularNumbersExample}).
- * The drawback is that the index entry values should be comparable to each other,
- * and you'll are likely to need to implement a custom comparator for values in
- * place of a default one.
- * <p>
- * If ordering is not required, consider using {@link GridStreamerHashIndexProvider}
- * instead, which is more efficient (O(1) vs. O(log(n))) and does not require
- * comparability.
- *
- * @see GridStreamerHashIndexProvider
- *
- */
-public class GridStreamerTreeIndexProvider<E, K, V> extends GridStreamerIndexProviderAdapter<E, K, V> {
-    /** */
-    private SnapTreeMap<IndexKey<V>, Entry<E, K, V>> idx;
-
-    /** */
-    private TrieMap<K, Entry<E, K, V>> key2Entry;
-
-    /** */
-    private final AtomicLong idxGen = new AtomicLong();
-
-    /** */
-    private Comparator<V> cmp;
-
-    /** */
-    private final ThreadLocal<State<E, K, V>> state = new ThreadLocal<>();
-
-    /**
-     * Sets comparator.
-     *
-     * @param cmp Comparator.
-     */
-    public void setComparator(Comparator<V> cmp) {
-        this.cmp = cmp;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected GridStreamerIndex<E, K, V> index0() {
-        return new Index<>();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void initialize() {
-        idx = cmp == null ? new SnapTreeMap<IndexKey<V>, Entry<E, K, V>>() :
-            new SnapTreeMap<IndexKey<V>, Entry<E, K, V>>(new Comparator<IndexKey<V>>() {
-                @Override public int compare(IndexKey<V> o1, IndexKey<V> o2) {
-                    int res = cmp.compare(o1.value(), o2.value());
-
-                    return res != 0 || isUnique() ? res :
-                        ((Key<V>)o1).seed > ((Key<V>)o2).seed ? 1 :
-                            ((Key<V>)o1).seed == ((Key<V>)o2).seed ? 0 : -1;
-                }
-            });
-
-        key2Entry = new TrieMap<>();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void reset0() {
-        // This will recreate maps.
-        initialize();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void add(E evt, K key, GridStreamerIndexUpdateSync sync) throws GridException {
-        State<E, K, V> state0 = state.get();
-
-        if (state0 != null)
-            throw new IllegalStateException("Previous operation has not been finished: " + state0);
-
-        Entry<E, K, V> oldEntry = trieGet(key, key2Entry);
-
-        GridStreamerIndexUpdater<E, K, V> updater = getUpdater();
-
-        if (oldEntry == null) {
-            V val = updater.initialValue(evt, key);
-
-            if (val == null)
-                return; // Ignore event.
-
-            IndexKey<V> idxKey = nextKey(val);
-
-            state0 = new State<>(null, null, idxKey, null, false, false);
-
-            if (isUnique())
-                // Lock new key.
-                lockIndexKey(idxKey, sync);
-
-            state.set(state0);
-
-            Entry<E, K, V> newEntry = newEntry(key, val, idxKey, evt);
-
-            // Save new entry to state.
-            state0.newEntry(newEntry);
-
-            // Put new value to index.
-            Entry<E, K, V> old = idx.putIfAbsent(idxKey, newEntry);
-
-            if (isUnique()) {
-                if (old != null)
-                    throw new GridException("Index unique key violation [evt=" + evt + ", key=" + key +
-                        ", idxKey=" + idxKey + ']');
-            }
-            else
-                assert old == null;
-
-            // Put new entry.
-            Entry<E, K, V> rmv = key2Entry.put(key, newEntry);
-
-            assert rmv == null;
-
-            // Update passed.
-            state0.finished(true);
-        }
-        else {
-            V val = updater.onAdded(oldEntry, evt);
-
-            if (val == null) {
-                remove(evt, key, sync);
-
-                return;
-            }
-
-            IndexKey<V> newIdxKey = nextKey(val);
-
-            IndexKey<V> oldIdxKey = oldEntry.keyIndex();
-
-            assert oldIdxKey != null; // Shouldn't be null for tree index.
-
-            int order = compareKeys(oldIdxKey, newIdxKey);
-
-            state0 = new State<>(oldIdxKey, oldEntry, newIdxKey, null, false, order == 0);
-
-            if (isUnique()) {
-                if (order == 0)
-                    // Keys are equal.
-                    lockIndexKey(newIdxKey, sync);
-                else
-                    lockKeys(oldIdxKey, newIdxKey, order, sync);
-            }
-
-            state.set(state0);
-
-            Entry<E, K, V> newEntry = addEvent(oldEntry, key, val, newIdxKey, evt);
-
-            // Save new entry to state.
-            state0.newEntry(newEntry);
-
-            if (state0.keysEqual()) {
-                assert isUnique();
-
-                boolean b = idx.replace(newIdxKey, oldEntry, newEntry);
-
-                assert b;
-            }
-            else {
-                // Put new value to index with new key.
-                Entry<E, K, V> old = idx.putIfAbsent(newIdxKey, newEntry);
-
-                if (isUnique()) {
-                    if (old != null)
-                        throw new GridException("Index unique key violation [evt=" + evt + ", key=" + key +
-                            ", idxKey=" + newIdxKey + ']');
-                }
-                else
-                    assert old == null;
-
-                boolean rmv0 = idx.remove(oldIdxKey, oldEntry);
-
-                assert rmv0;
-            }
-
-            // Replace former entry with the new one.
-            boolean b = key2Entry.replace(key, oldEntry, newEntry);
-
-            assert b;
-
-            // Update passed.
-            state0.finished(true);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void remove(E evt, K key, GridStreamerIndexUpdateSync sync) throws GridException {
-        State<E, K, V> state0 = state.get();
-
-        if (state0 != null)
-            throw new IllegalStateException("Previous operation has not been finished: " + state0);
-
-        Entry<E, K, V> oldEntry = trieGet(key, key2Entry);
-
-        if (oldEntry == null)
-            return;
-
-        GridStreamerIndexUpdater<E, K, V> updater = getUpdater();
-
-        V val = updater.onRemoved(oldEntry, evt);
-
-        IndexKey<V> oldIdxKey = oldEntry.keyIndex();
-
-        assert oldIdxKey != null; // Shouldn't be null for tree index.
-
-        if (val == null) {
-            state0 = new State<>(oldIdxKey, oldEntry, null, null, false, false);
-
-            if (isUnique())
-                // Lock old key.
-                lockIndexKey(oldIdxKey, sync);
-
-            state.set(state0);
-
-            boolean b = idx.remove(oldIdxKey, oldEntry);
-
-            assert b;
-
-            b = key2Entry.remove(key, oldEntry);
-
-            assert b;
-
-            state0.finished(true);
-        }
-        else {
-            IndexKey<V> newIdxKey = nextKey(val);
-
-            int order = compareKeys(oldIdxKey, newIdxKey);
-
-            state0 = new State<>(oldIdxKey, oldEntry, newIdxKey, null, false, order == 0);
-
-            if (isUnique()) {
-                if (order == 0)
-                    // Keys are equal.
-                    lockIndexKey(newIdxKey, sync);
-                else
-                    lockKeys(oldIdxKey, newIdxKey, order, sync);
-            }
-
-            state.set(state0);
-
-            Entry<E, K, V> newEntry = removeEvent(oldEntry, key, val, newIdxKey, evt);
-
-            // Save new entry to state.
-            state0.newEntry(newEntry);
-
-            if (state0.keysEqual()) {
-                assert isUnique();
-
-                boolean b = idx.replace(newIdxKey, oldEntry, newEntry);
-
-                assert b;
-            }
-            else {
-                // Put new value to index with new key.
-                Entry<E, K, V> old = idx.putIfAbsent(newIdxKey, newEntry);
-
-                if (isUnique()) {
-                    if (old != null)
-                        throw new GridException("Index unique key violation [evt=" + evt + ", key=" + key +
-                            ", idxKey=" + newIdxKey + ']');
-                }
-                else
-                    assert old == null;
-
-                boolean rmv0 = idx.remove(oldIdxKey, oldEntry);
-
-                assert rmv0;
-            }
-
-            // Replace former entry with the new one.
-            boolean b = key2Entry.replace(key, oldEntry, newEntry);
-
-            assert b;
-
-            state0.finished(true);
-        }
-    }
-
-    /**
-     * @param key1 Key.
-     * @param key2 Key.
-     * @param order Keys comparison result.
-     * @param sync Sync.
-     * @throws GridException If interrupted.
-     */
-    private void lockKeys(IndexKey<V> key1, IndexKey<V> key2, int order, GridStreamerIndexUpdateSync sync)
-        throws GridException {
-        assert isUnique();
-        assert key1 != null;
-        assert key2 != null;
-        assert order != 0;
-
-        boolean success = false;
-
-        try {
-            if (order > 0) {
-                lockIndexKey(key1, sync);
-                lockIndexKey(key2, sync);
-            }
-            else {
-                // Reverse order.
-                lockIndexKey(key2, sync);
-                lockIndexKey(key1, sync);
-            }
-
-            success = true;
-        }
-        finally {
-            if (!success) {
-                unlockIndexKey(key1, sync);
-                unlockIndexKey(key2, sync);
-            }
-        }
-    }
-
-    /**
-     * @param key1 Key.
-     * @param key2 Key.
-     * @return Comparison result.
-     */
-    private int compareKeys(IndexKey<V> key1, IndexKey<V> key2) {
-        assert key1 != null;
-        assert key2 != null;
-
-        return cmp != null ? cmp.compare(key1.value(), key2.value()) :
-            ((Comparable<V>)key1.value()).compareTo(key2.value());
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void endUpdate0(GridStreamerIndexUpdateSync sync, E evt, K key, boolean rollback) {
-        State<E, K, V> state0 = state.get();
-
-        if (state0 == null)
-            return;
-
-        state.remove();
-
-        IndexKey<V> oldIdxKey = state0.oldIndexKey();
-        Entry<E, K, V> oldEntry = state0.oldEntry();
-        IndexKey<V> newIdxKey = state0.newIndexKey();
-        Entry<E, K, V> newEntry = state0.newEntry();
-
-        if (rollback && state0.finished()) {
-            // Rollback after index was updated.
-            if (oldEntry != null && newEntry != null) {
-                if (state0.keysEqual()) {
-                    assert isUnique();
-
-                    boolean b = idx.replace(oldIdxKey, newEntry, oldEntry);
-
-                    assert b;
-                }
-                else {
-                    boolean b = idx.remove(newIdxKey, newEntry);
-
-                    assert b;
-
-                    Entry<E, K, V> old = idx.put(oldIdxKey, oldEntry);
-
-                    assert old == null;
-
-                    b = key2Entry.replace(key, newEntry, oldEntry);
-
-                    assert b;
-                }
-            }
-            else if (newEntry == null) {
-                // Old was removed. Need to put it back.
-                Entry<E, K, V> old = key2Entry.put(key, oldEntry);
-
-                assert old == null;
-
-                old = idx.put(oldIdxKey, oldEntry);
-
-                assert old == null;
-            }
-            else {
-                assert oldEntry == null;
-
-                // New entry was added. Remove it.
-                boolean b = idx.remove(newIdxKey, newEntry);
-
-                assert b;
-
-                b = key2Entry.remove(key, newEntry);
-
-                assert b;
-            }
-        }
-
-        // Unlock only if unique.
-        if (isUnique()) {
-            if (oldIdxKey != null)
-                unlockIndexKey(oldIdxKey, sync);
-
-            if (state0.keysEqual())
-                // No need to unlock second key.
-                return;
-
-            if (newIdxKey != null)
-                unlockIndexKey(newIdxKey, sync);
-        }
-    }
-
-    /**
-     * @param val Value.
-     * @return Index key.
-     */
-    protected IndexKey<V> nextKey(V val) {
-        return new Key<>(val, isUnique() ? 0 : idxGen.incrementAndGet(), cmp);
-    }
-
-    /**
-     * @param val Value.
-     * @param asc {@code True} if ascending.
-     * @param incl {@code True} if inclusive.
-     * @return Key for search.
-     */
-    private IndexKey<V> searchKeyFrom(V val, boolean asc, boolean incl) {
-        // (asc && incl) || (!asc && !incl) -> asc == incl
-        return new Key<>(val, asc == incl ? Long.MIN_VALUE : Long.MAX_VALUE, cmp);
-    }
-
-    /**
-     * @param val Value.
-     * @param asc {@code True} if ascending.
-     * @param incl {@code True} if inclusive.
-     * @return Key for search.
-     */
-    private IndexKey<V> searchKeyTo(V val, boolean asc, boolean incl) {
-        // (asc && incl) || (!asc && !incl) -> asc == incl
-        return new Key<>(val, asc == incl ? Long.MAX_VALUE : Long.MIN_VALUE, cmp);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean sorted() {
-        return true;
-    }
-
-    /**
-     *
-     */
-    private static class Key<V> implements Comparable<Key<V>>, IndexKey<V> {
-        /** */
-        private final V val;
-
-        /** */
-        private final long seed;
-
-        /** */
-        private final Comparator<V> cmp;
-
-        /**
-         * @param val Value.
-         * @param seed Seed.
-         * @param cmp Comparator.
-         */
-        private Key(V val, long seed, @Nullable Comparator<V> cmp) {
-            assert val != null;
-
-            this.val = val;
-            this.seed = seed;
-            this.cmp = cmp;
-        }
-
-        /** {@inheritDoc} */
-        @Override public V value() {
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int compareTo(Key<V> o) {
-            int res = cmp != null ? cmp.compare(val, o.val) : ((Comparable<V>)val).compareTo(o.val);
-
-            return res == 0 ? (seed < o.seed ? -1 : seed > o.seed ? 1 : 0) : res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return 31 * val.hashCode() + (int)(seed ^ (seed >>> 32));
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            if (obj == null || obj.getClass() != Key.class)
-                return false;
-
-            Key key = (Key)obj;
-
-            return seed == key.seed && val.equals(key.val);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(Key.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    private static class State<E, K, V> {
-        /** */
-        private IndexKey<V> oldIdxKey;
-
-        /** */
-        private Entry<E, K, V> oldEntry;
-
-        /** */
-        private IndexKey<V> newIdxKey;
-
-        /** */
-        private Entry<E, K, V> newEntry;
-
-        /** */
-        private boolean finished;
-
-        /** */
-        private final boolean keysEqual;
-
-        /**
-         * @param oldIdxKey Old index key.
-         * @param oldEntry Old entry.
-         * @param newIdxKey New Index key.
-         * @param newEntry New entry.
-         * @param finished Finished.
-         * @param keysEqual {@code True} if keys are equal.
-         */
-        private State(@Nullable IndexKey<V> oldIdxKey, @Nullable Entry<E, K, V> oldEntry, @Nullable IndexKey<V> newIdxKey,
-            @Nullable Entry<E, K, V> newEntry, boolean finished, boolean keysEqual) {
-            this.oldIdxKey = oldIdxKey;
-            this.oldEntry = oldEntry;
-            this.newIdxKey = newIdxKey;
-            this.newEntry = newEntry;
-            this.finished = finished;
-            this.keysEqual = keysEqual;
-        }
-
-        /**
-         * @return Old index entry.
-         */
-        IndexKey<V> oldIndexKey() {
-            return oldIdxKey;
-        }
-
-        /**
-         * @param oldIdxKey Old index key.
-         */
-        void oldIndexKey(IndexKey<V> oldIdxKey) {
-            this.oldIdxKey = oldIdxKey;
-        }
-
-        /**
-         * @return Old.
-         */
-        Entry<E, K, V> oldEntry() {
-            return oldEntry;
-        }
-
-        /**
-         * @param oldEntry Old.
-         */
-        void oldEntry(Entry<E, K, V> oldEntry) {
-            this.oldEntry = oldEntry;
-        }
-
-        /**
-         * @return New index key.
-         */
-        IndexKey<V> newIndexKey() {
-            return newIdxKey;
-        }
-
-        /**
-         * @param newIdxKey New index key.
-         */
-        void newIndexKey(IndexKey<V> newIdxKey) {
-            this.newIdxKey = newIdxKey;
-        }
-
-        /**
-         * @return New.
-         */
-        Entry<E, K, V> newEntry() {
-            return newEntry;
-        }
-
-        /**
-         * @param newEntry New.
-         */
-        void newEntry(Entry<E, K, V> newEntry) {
-            this.newEntry = newEntry;
-        }
-
-        /**
-         * @return Finished.
-         */
-        boolean finished() {
-            return finished;
-        }
-
-        /**
-         * @param finished Finished.
-         */
-        void finished(boolean finished) {
-            this.finished = finished;
-        }
-
-        /**
-         * @return {@code True} if both keys are not null and are equal (as comparables).
-         */
-        boolean keysEqual() {
-            return keysEqual;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(State.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    private class Index<I extends IndexKey<V>> implements GridStreamerIndex<E, K, V> {
-        /** */
-        private final TrieMap<K, Entry<E, K, V>> key2Entry0 = key2Entry.readOnlySnapshot();
-
-        /** */
-        private final SnapTreeMap<IndexKey<V>, Entry<E, K, V>> idx0 = idx.clone();
-
-        /** */
-        private final int evtsCnt = eventsCount();
-
-        /** {@inheritDoc} */
-        @Nullable @Override public String name() {
-            return getName();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean unique() {
-            return isUnique();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean sorted() {
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridStreamerIndexPolicy policy() {
-            return getPolicy();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int size() {
-            return key2Entry0.size();
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public GridStreamerIndexEntry<E, K, V> entry(K key) {
-            A.notNull(key, "key");
-
-            return trieGet(key, key2Entry0);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<GridStreamerIndexEntry<E, K, V>> entries(int cnt) {
-            Collection col = cnt >= 0 ? idx0.values() : idx0.descendingMap().values();
-
-            return (Collection<GridStreamerIndexEntry<E, K, V>>)(cnt == 0 ? Collections.unmodifiableCollection(col) :
-                F.limit(col, U.safeAbs(cnt)));
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<K> keySet(final int cnt) {
-            Set<K> col = new AbstractSet<K>() {
-                private Collection<K> entries = F.viewReadOnly(
-                    cnt >= 0 ? idx0.values() : idx0.descendingMap().values(),
-                    entryToKey);
-
-                @NotNull @Override public Iterator<K> iterator() {
-                    return entries.iterator();
-                }
-
-                @Override public int size() {
-                    return entries.size();
-                }
-            };
-
-            return cnt == 0 ? col : F.limit(col, U.safeAbs(cnt));
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<V> values(int cnt) {
-            Collection<GridStreamerIndexEntry<E, K, V>> col = entries(cnt);
-
-            return F.viewReadOnly(col, entryToVal);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<E> events(int cnt) {
-            Collection<E> evts = events(cnt >= 0, null, false, null, false);
-
-            return cnt == 0 ? evts : F.limit(evts, U.safeAbs(cnt));
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<GridStreamerIndexEntry<E, K, V>> entrySet(V val) {
-            return entrySet(true, val, true, val, true);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<GridStreamerIndexEntry<E, K, V>> entrySet(final boolean asc, @Nullable final V fromVal,
-            final boolean fromIncl, @Nullable final V toVal, final boolean toIncl) {
-            Set<GridStreamerIndexEntry<E, K, V>> set = new AbstractSet<GridStreamerIndexEntry<E, K, V>>() {
-                private Map<IndexKey<V>, Entry<E, K, V>> map = subMap(asc, fromVal, fromIncl, toVal, toIncl);
-
-                @NotNull @Override public Iterator<GridStreamerIndexEntry<E, K, V>> iterator() {
-                    Collection vals = map.values();
-
-                    return (Iterator<GridStreamerIndexEntry<E, K, V>>)vals.iterator();
-                }
-
-                @Override public int size() {
-                    return map.size();
-                }
-            };
-
-            return Collections.unmodifiableSet(set);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<K> keySet(V val) {
-            return keySet(true, val, true, val, true);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<K> keySet(final boolean asc, @Nullable final V fromVal, final boolean fromIncl,
-            @Nullable final V toVal, final boolean toIncl) {
-            Set<K> set = new AbstractSet<K>() {
-                private Map<IndexKey<V>, Entry<E, K, V>> map = subMap(asc, fromVal, fromIncl, toVal, toIncl);
-
-                @NotNull @Override public Iterator<K> iterator() {
-                    return F.iterator(map.values(), entryToKey, true);
-                }
-
-                @Override public int size() {
-                    return map.size();
-                }
-            };
-
-            return Collections.unmodifiableSet(set);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<V> values(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal,
-            boolean toIncl) {
-            Map<IndexKey<V>, Entry<E, K, V>> map = subMap(asc, fromVal, fromIncl, toVal, toIncl);
-
-            return F.viewReadOnly(map.values(), entryToVal);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<E> events(V val) {
-            A.notNull(val, "val");
-
-            return events(true, val, true, val, true);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<E> events(final boolean asc, @Nullable final V fromVal, final boolean fromIncl,
-            @Nullable final V toVal, final boolean toIncl) {
-            if (getPolicy() == EVENT_TRACKING_OFF)
-                throw new IllegalStateException("Event tracking is off: " + this);
-
-            Collection<E> evts = new AbstractCollection<E>() {
-                private final Map<IndexKey<V>, Entry<E, K, V>> map = subMap(asc, fromVal, fromIncl, toVal, toIncl);
-
-                private int size = -1;
-
-                @NotNull @Override public Iterator<E> iterator() {
-                    return new Iterator<E>() {
-                        private final Iterator<Entry<E, K, V>> entryIter = map.values().iterator();
-
-                        private Iterator<E> evtIter;
-
-                        private boolean moved = true;
-
-                        private boolean more;
-
-                        @Override public boolean hasNext() {
-                            if (!moved)
-                                return more;
-
-                            moved = false;
-
-                            if (evtIter != null && evtIter.hasNext())
-                                return more = true;
-
-                            while (entryIter.hasNext()) {
-                                evtIter = eventsIterator(entryIter.next());
-
-                                if (evtIter.hasNext())
-                                    return more = true;
-                            }
-
-                            return more = false;
-                        }
-
-                        @Override public E next() {
-                            if (hasNext()) {
-                                moved = true;
-
-                                return evtIter.next();
-                            }
-
-                            throw new NoSuchElementException();
-                        }
-
-                        @Override public void remove() {
-                            assert false;
-                        }
-                    };
-                }
-
-                @Override public int size() {
-                    return size != -1 ? size :
-                        fromVal == null && toVal == null ? (size = evtsCnt) : (size = F.size(iterator()));
-                }
-
-                /**
-                 * @param entry Entry.
-                 * @return Events iterator.
-                 */
-                @SuppressWarnings("fallthrough")
-                Iterator<E> eventsIterator(GridStreamerIndexEntry<E,K,V> entry) {
-                    switch (getPolicy()) {
-                        case EVENT_TRACKING_ON:
-                        case EVENT_TRACKING_ON_DEDUP:
-                            Collection<E> evts = entry.events();
-
-                            assert evts != null;
-
-                            return evts.iterator();
-
-                        default:
-                            assert false;
-
-                            throw new IllegalStateException("Event tracking is off: " + Index.this);
-                    }
-                }
-            };
-
-            return Collections.unmodifiableCollection(evts);
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public GridStreamerIndexEntry<E, K, V> firstEntry() {
-            return idx0.firstEntry().getValue();
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public GridStreamerIndexEntry<E, K, V> lastEntry() {
-            return idx0.lastEntry().getValue();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator<GridStreamerIndexEntry<E, K, V>> iterator() {
-            return entries(0).iterator();
-        }
-
-        /**
-         * @param asc Ascending.
-         * @param fromVal From.
-         * @param fromIncl Include from.
-         * @param toVal To.
-         * @param toIncl Include to.
-         * @return Map.
-         */
-        private Map<IndexKey<V>, Entry<E, K, V>> subMap(boolean asc, @Nullable V fromVal, boolean fromIncl,
-            @Nullable V toVal, boolean toIncl) {
-            if (fromVal != null && toVal != null) {
-                int cmpRes = cmp != null ? cmp.compare(toVal, fromVal) : ((Comparable<V>)toVal).compareTo(fromVal);
-
-                if ((asc && cmpRes < 0) || (!asc && cmpRes > 0))
-                    throw new IllegalArgumentException("Boundaries are invalid [asc=" + asc + ", fromVal=" + fromVal +
-                        ", toVal=" + toVal + ']');
-            }
-
-            if (idx0.isEmpty())
-                return Collections.emptyMap();
-
-            ConcurrentNavigableMap<IndexKey<V>,Entry<E,K,V>> map = asc ? idx0 : idx0.descendingMap();
-
-            if (fromVal == null) {
-                fromVal = map.firstKey().value();
-
-                fromIncl = true;
-            }
-
-            if (toVal == null) {
-                toVal = map.lastKey().value();
-
-                toIncl = true;
-            }
-
-            return map.subMap(searchKeyFrom(fromVal, asc, fromIncl), fromIncl, searchKeyTo(toVal, asc, toIncl), toIncl);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(Index.class, this, "provider", GridStreamerTreeIndexProvider.this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/tree/StreamerTreeIndexProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/tree/StreamerTreeIndexProvider.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/tree/StreamerTreeIndexProvider.java
new file mode 100644
index 0000000..09f22d4
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/tree/StreamerTreeIndexProvider.java
@@ -0,0 +1,945 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.streamer.index.tree;
+
+import com.romix.scala.collection.concurrent.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.streamer.index.*;
+import org.gridgain.grid.util.snaptree.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.gridgain.grid.streamer.index.StreamerIndexPolicy.*;
+
+/**
+ * Tree index implementation of a {@link org.gridgain.grid.streamer.index.StreamerIndexProvider}.
+ * <p>
+ * The advantage of a tree index is that it maintains entries in a
+ * sorted order, which is invaluable for many kinds of tasks, where
+ * event ordering makes sense (like {@code GridStreamingPopularNumbersExample}).
+ * The drawback is that the index entry values should be comparable to each other,
+ * and you'll are likely to need to implement a custom comparator for values in
+ * place of a default one.
+ * <p>
+ * If ordering is not required, consider using {@link org.gridgain.grid.streamer.index.hash.StreamerHashIndexProvider}
+ * instead, which is more efficient (O(1) vs. O(log(n))) and does not require
+ * comparability.
+ *
+ * @see org.gridgain.grid.streamer.index.hash.StreamerHashIndexProvider
+ *
+ */
+public class StreamerTreeIndexProvider<E, K, V> extends StreamerIndexProviderAdapter<E, K, V> {
+    /** */
+    private SnapTreeMap<IndexKey<V>, Entry<E, K, V>> idx;
+
+    /** */
+    private TrieMap<K, Entry<E, K, V>> key2Entry;
+
+    /** */
+    private final AtomicLong idxGen = new AtomicLong();
+
+    /** */
+    private Comparator<V> cmp;
+
+    /** */
+    private final ThreadLocal<State<E, K, V>> state = new ThreadLocal<>();
+
+    /**
+     * Sets comparator.
+     *
+     * @param cmp Comparator.
+     */
+    public void setComparator(Comparator<V> cmp) {
+        this.cmp = cmp;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected StreamerIndex<E, K, V> index0() {
+        return new Index<>();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize() {
+        idx = cmp == null ? new SnapTreeMap<IndexKey<V>, Entry<E, K, V>>() :
+            new SnapTreeMap<IndexKey<V>, Entry<E, K, V>>(new Comparator<IndexKey<V>>() {
+                @Override public int compare(IndexKey<V> o1, IndexKey<V> o2) {
+                    int res = cmp.compare(o1.value(), o2.value());
+
+                    return res != 0 || isUnique() ? res :
+                        ((Key<V>)o1).seed > ((Key<V>)o2).seed ? 1 :
+                            ((Key<V>)o1).seed == ((Key<V>)o2).seed ? 0 : -1;
+                }
+            });
+
+        key2Entry = new TrieMap<>();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reset0() {
+        // This will recreate maps.
+        initialize();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void add(E evt, K key, StreamerIndexUpdateSync sync) throws GridException {
+        State<E, K, V> state0 = state.get();
+
+        if (state0 != null)
+            throw new IllegalStateException("Previous operation has not been finished: " + state0);
+
+        Entry<E, K, V> oldEntry = trieGet(key, key2Entry);
+
+        StreamerIndexUpdater<E, K, V> updater = getUpdater();
+
+        if (oldEntry == null) {
+            V val = updater.initialValue(evt, key);
+
+            if (val == null)
+                return; // Ignore event.
+
+            IndexKey<V> idxKey = nextKey(val);
+
+            state0 = new State<>(null, null, idxKey, null, false, false);
+
+            if (isUnique())
+                // Lock new key.
+                lockIndexKey(idxKey, sync);
+
+            state.set(state0);
+
+            Entry<E, K, V> newEntry = newEntry(key, val, idxKey, evt);
+
+            // Save new entry to state.
+            state0.newEntry(newEntry);
+
+            // Put new value to index.
+            Entry<E, K, V> old = idx.putIfAbsent(idxKey, newEntry);
+
+            if (isUnique()) {
+                if (old != null)
+                    throw new GridException("Index unique key violation [evt=" + evt + ", key=" + key +
+                        ", idxKey=" + idxKey + ']');
+            }
+            else
+                assert old == null;
+
+            // Put new entry.
+            Entry<E, K, V> rmv = key2Entry.put(key, newEntry);
+
+            assert rmv == null;
+
+            // Update passed.
+            state0.finished(true);
+        }
+        else {
+            V val = updater.onAdded(oldEntry, evt);
+
+            if (val == null) {
+                remove(evt, key, sync);
+
+                return;
+            }
+
+            IndexKey<V> newIdxKey = nextKey(val);
+
+            IndexKey<V> oldIdxKey = oldEntry.keyIndex();
+
+            assert oldIdxKey != null; // Shouldn't be null for tree index.
+
+            int order = compareKeys(oldIdxKey, newIdxKey);
+
+            state0 = new State<>(oldIdxKey, oldEntry, newIdxKey, null, false, order == 0);
+
+            if (isUnique()) {
+                if (order == 0)
+                    // Keys are equal.
+                    lockIndexKey(newIdxKey, sync);
+                else
+                    lockKeys(oldIdxKey, newIdxKey, order, sync);
+            }
+
+            state.set(state0);
+
+            Entry<E, K, V> newEntry = addEvent(oldEntry, key, val, newIdxKey, evt);
+
+            // Save new entry to state.
+            state0.newEntry(newEntry);
+
+            if (state0.keysEqual()) {
+                assert isUnique();
+
+                boolean b = idx.replace(newIdxKey, oldEntry, newEntry);
+
+                assert b;
+            }
+            else {
+                // Put new value to index with new key.
+                Entry<E, K, V> old = idx.putIfAbsent(newIdxKey, newEntry);
+
+                if (isUnique()) {
+                    if (old != null)
+                        throw new GridException("Index unique key violation [evt=" + evt + ", key=" + key +
+                            ", idxKey=" + newIdxKey + ']');
+                }
+                else
+                    assert old == null;
+
+                boolean rmv0 = idx.remove(oldIdxKey, oldEntry);
+
+                assert rmv0;
+            }
+
+            // Replace former entry with the new one.
+            boolean b = key2Entry.replace(key, oldEntry, newEntry);
+
+            assert b;
+
+            // Update passed.
+            state0.finished(true);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void remove(E evt, K key, StreamerIndexUpdateSync sync) throws GridException {
+        State<E, K, V> state0 = state.get();
+
+        if (state0 != null)
+            throw new IllegalStateException("Previous operation has not been finished: " + state0);
+
+        Entry<E, K, V> oldEntry = trieGet(key, key2Entry);
+
+        if (oldEntry == null)
+            return;
+
+        StreamerIndexUpdater<E, K, V> updater = getUpdater();
+
+        V val = updater.onRemoved(oldEntry, evt);
+
+        IndexKey<V> oldIdxKey = oldEntry.keyIndex();
+
+        assert oldIdxKey != null; // Shouldn't be null for tree index.
+
+        if (val == null) {
+            state0 = new State<>(oldIdxKey, oldEntry, null, null, false, false);
+
+            if (isUnique())
+                // Lock old key.
+                lockIndexKey(oldIdxKey, sync);
+
+            state.set(state0);
+
+            boolean b = idx.remove(oldIdxKey, oldEntry);
+
+            assert b;
+
+            b = key2Entry.remove(key, oldEntry);
+
+            assert b;
+
+            state0.finished(true);
+        }
+        else {
+            IndexKey<V> newIdxKey = nextKey(val);
+
+            int order = compareKeys(oldIdxKey, newIdxKey);
+
+            state0 = new State<>(oldIdxKey, oldEntry, newIdxKey, null, false, order == 0);
+
+            if (isUnique()) {
+                if (order == 0)
+                    // Keys are equal.
+                    lockIndexKey(newIdxKey, sync);
+                else
+                    lockKeys(oldIdxKey, newIdxKey, order, sync);
+            }
+
+            state.set(state0);
+
+            Entry<E, K, V> newEntry = removeEvent(oldEntry, key, val, newIdxKey, evt);
+
+            // Save new entry to state.
+            state0.newEntry(newEntry);
+
+            if (state0.keysEqual()) {
+                assert isUnique();
+
+                boolean b = idx.replace(newIdxKey, oldEntry, newEntry);
+
+                assert b;
+            }
+            else {
+                // Put new value to index with new key.
+                Entry<E, K, V> old = idx.putIfAbsent(newIdxKey, newEntry);
+
+                if (isUnique()) {
+                    if (old != null)
+                        throw new GridException("Index unique key violation [evt=" + evt + ", key=" + key +
+                            ", idxKey=" + newIdxKey + ']');
+                }
+                else
+                    assert old == null;
+
+                boolean rmv0 = idx.remove(oldIdxKey, oldEntry);
+
+                assert rmv0;
+            }
+
+            // Replace former entry with the new one.
+            boolean b = key2Entry.replace(key, oldEntry, newEntry);
+
+            assert b;
+
+            state0.finished(true);
+        }
+    }
+
+    /**
+     * @param key1 Key.
+     * @param key2 Key.
+     * @param order Keys comparison result.
+     * @param sync Sync.
+     * @throws GridException If interrupted.
+     */
+    private void lockKeys(IndexKey<V> key1, IndexKey<V> key2, int order, StreamerIndexUpdateSync sync)
+        throws GridException {
+        assert isUnique();
+        assert key1 != null;
+        assert key2 != null;
+        assert order != 0;
+
+        boolean success = false;
+
+        try {
+            if (order > 0) {
+                lockIndexKey(key1, sync);
+                lockIndexKey(key2, sync);
+            }
+            else {
+                // Reverse order.
+                lockIndexKey(key2, sync);
+                lockIndexKey(key1, sync);
+            }
+
+            success = true;
+        }
+        finally {
+            if (!success) {
+                unlockIndexKey(key1, sync);
+                unlockIndexKey(key2, sync);
+            }
+        }
+    }
+
+    /**
+     * @param key1 Key.
+     * @param key2 Key.
+     * @return Comparison result.
+     */
+    private int compareKeys(IndexKey<V> key1, IndexKey<V> key2) {
+        assert key1 != null;
+        assert key2 != null;
+
+        return cmp != null ? cmp.compare(key1.value(), key2.value()) :
+            ((Comparable<V>)key1.value()).compareTo(key2.value());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void endUpdate0(StreamerIndexUpdateSync sync, E evt, K key, boolean rollback) {
+        State<E, K, V> state0 = state.get();
+
+        if (state0 == null)
+            return;
+
+        state.remove();
+
+        IndexKey<V> oldIdxKey = state0.oldIndexKey();
+        Entry<E, K, V> oldEntry = state0.oldEntry();
+        IndexKey<V> newIdxKey = state0.newIndexKey();
+        Entry<E, K, V> newEntry = state0.newEntry();
+
+        if (rollback && state0.finished()) {
+            // Rollback after index was updated.
+            if (oldEntry != null && newEntry != null) {
+                if (state0.keysEqual()) {
+                    assert isUnique();
+
+                    boolean b = idx.replace(oldIdxKey, newEntry, oldEntry);
+
+                    assert b;
+                }
+                else {
+                    boolean b = idx.remove(newIdxKey, newEntry);
+
+                    assert b;
+
+                    Entry<E, K, V> old = idx.put(oldIdxKey, oldEntry);
+
+                    assert old == null;
+
+                    b = key2Entry.replace(key, newEntry, oldEntry);
+
+                    assert b;
+                }
+            }
+            else if (newEntry == null) {
+                // Old was removed. Need to put it back.
+                Entry<E, K, V> old = key2Entry.put(key, oldEntry);
+
+                assert old == null;
+
+                old = idx.put(oldIdxKey, oldEntry);
+
+                assert old == null;
+            }
+            else {
+                assert oldEntry == null;
+
+                // New entry was added. Remove it.
+                boolean b = idx.remove(newIdxKey, newEntry);
+
+                assert b;
+
+                b = key2Entry.remove(key, newEntry);
+
+                assert b;
+            }
+        }
+
+        // Unlock only if unique.
+        if (isUnique()) {
+            if (oldIdxKey != null)
+                unlockIndexKey(oldIdxKey, sync);
+
+            if (state0.keysEqual())
+                // No need to unlock second key.
+                return;
+
+            if (newIdxKey != null)
+                unlockIndexKey(newIdxKey, sync);
+        }
+    }
+
+    /**
+     * @param val Value.
+     * @return Index key.
+     */
+    protected IndexKey<V> nextKey(V val) {
+        return new Key<>(val, isUnique() ? 0 : idxGen.incrementAndGet(), cmp);
+    }
+
+    /**
+     * @param val Value.
+     * @param asc {@code True} if ascending.
+     * @param incl {@code True} if inclusive.
+     * @return Key for search.
+     */
+    private IndexKey<V> searchKeyFrom(V val, boolean asc, boolean incl) {
+        // (asc && incl) || (!asc && !incl) -> asc == incl
+        return new Key<>(val, asc == incl ? Long.MIN_VALUE : Long.MAX_VALUE, cmp);
+    }
+
+    /**
+     * @param val Value.
+     * @param asc {@code True} if ascending.
+     * @param incl {@code True} if inclusive.
+     * @return Key for search.
+     */
+    private IndexKey<V> searchKeyTo(V val, boolean asc, boolean incl) {
+        // (asc && incl) || (!asc && !incl) -> asc == incl
+        return new Key<>(val, asc == incl ? Long.MAX_VALUE : Long.MIN_VALUE, cmp);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean sorted() {
+        return true;
+    }
+
+    /**
+     *
+     */
+    private static class Key<V> implements Comparable<Key<V>>, IndexKey<V> {
+        /** */
+        private final V val;
+
+        /** */
+        private final long seed;
+
+        /** */
+        private final Comparator<V> cmp;
+
+        /**
+         * @param val Value.
+         * @param seed Seed.
+         * @param cmp Comparator.
+         */
+        private Key(V val, long seed, @Nullable Comparator<V> cmp) {
+            assert val != null;
+
+            this.val = val;
+            this.seed = seed;
+            this.cmp = cmp;
+        }
+
+        /** {@inheritDoc} */
+        @Override public V value() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(Key<V> o) {
+            int res = cmp != null ? cmp.compare(val, o.val) : ((Comparable<V>)val).compareTo(o.val);
+
+            return res == 0 ? (seed < o.seed ? -1 : seed > o.seed ? 1 : 0) : res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return 31 * val.hashCode() + (int)(seed ^ (seed >>> 32));
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            if (obj == null || obj.getClass() != Key.class)
+                return false;
+
+            Key key = (Key)obj;
+
+            return seed == key.seed && val.equals(key.val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Key.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class State<E, K, V> {
+        /** */
+        private IndexKey<V> oldIdxKey;
+
+        /** */
+        private Entry<E, K, V> oldEntry;
+
+        /** */
+        private IndexKey<V> newIdxKey;
+
+        /** */
+        private Entry<E, K, V> newEntry;
+
+        /** */
+        private boolean finished;
+
+        /** */
+        private final boolean keysEqual;
+
+        /**
+         * @param oldIdxKey Old index key.
+         * @param oldEntry Old entry.
+         * @param newIdxKey New Index key.
+         * @param newEntry New entry.
+         * @param finished Finished.
+         * @param keysEqual {@code True} if keys are equal.
+         */
+        private State(@Nullable IndexKey<V> oldIdxKey, @Nullable Entry<E, K, V> oldEntry, @Nullable IndexKey<V> newIdxKey,
+            @Nullable Entry<E, K, V> newEntry, boolean finished, boolean keysEqual) {
+            this.oldIdxKey = oldIdxKey;
+            this.oldEntry = oldEntry;
+            this.newIdxKey = newIdxKey;
+            this.newEntry = newEntry;
+            this.finished = finished;
+            this.keysEqual = keysEqual;
+        }
+
+        /**
+         * @return Old index entry.
+         */
+        IndexKey<V> oldIndexKey() {
+            return oldIdxKey;
+        }
+
+        /**
+         * @param oldIdxKey Old index key.
+         */
+        void oldIndexKey(IndexKey<V> oldIdxKey) {
+            this.oldIdxKey = oldIdxKey;
+        }
+
+        /**
+         * @return Old.
+         */
+        Entry<E, K, V> oldEntry() {
+            return oldEntry;
+        }
+
+        /**
+         * @param oldEntry Old.
+         */
+        void oldEntry(Entry<E, K, V> oldEntry) {
+            this.oldEntry = oldEntry;
+        }
+
+        /**
+         * @return New index key.
+         */
+        IndexKey<V> newIndexKey() {
+            return newIdxKey;
+        }
+
+        /**
+         * @param newIdxKey New index key.
+         */
+        void newIndexKey(IndexKey<V> newIdxKey) {
+            this.newIdxKey = newIdxKey;
+        }
+
+        /**
+         * @return New.
+         */
+        Entry<E, K, V> newEntry() {
+            return newEntry;
+        }
+
+        /**
+         * @param newEntry New.
+         */
+        void newEntry(Entry<E, K, V> newEntry) {
+            this.newEntry = newEntry;
+        }
+
+        /**
+         * @return Finished.
+         */
+        boolean finished() {
+            return finished;
+        }
+
+        /**
+         * @param finished Finished.
+         */
+        void finished(boolean finished) {
+            this.finished = finished;
+        }
+
+        /**
+         * @return {@code True} if both keys are not null and are equal (as comparables).
+         */
+        boolean keysEqual() {
+            return keysEqual;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(State.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private class Index<I extends IndexKey<V>> implements StreamerIndex<E, K, V> {
+        /** */
+        private final TrieMap<K, Entry<E, K, V>> key2Entry0 = key2Entry.readOnlySnapshot();
+
+        /** */
+        private final SnapTreeMap<IndexKey<V>, Entry<E, K, V>> idx0 = idx.clone();
+
+        /** */
+        private final int evtsCnt = eventsCount();
+
+        /** {@inheritDoc} */
+        @Nullable @Override public String name() {
+            return getName();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean unique() {
+            return isUnique();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean sorted() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public StreamerIndexPolicy policy() {
+            return getPolicy();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int size() {
+            return key2Entry0.size();
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public StreamerIndexEntry<E, K, V> entry(K key) {
+            A.notNull(key, "key");
+
+            return trieGet(key, key2Entry0);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<StreamerIndexEntry<E, K, V>> entries(int cnt) {
+            Collection col = cnt >= 0 ? idx0.values() : idx0.descendingMap().values();
+
+            return (Collection<StreamerIndexEntry<E, K, V>>)(cnt == 0 ? Collections.unmodifiableCollection(col) :
+                F.limit(col, U.safeAbs(cnt)));
+        }
+
+        /** {@inheritDoc} */
+        @Override public Set<K> keySet(final int cnt) {
+            Set<K> col = new AbstractSet<K>() {
+                private Collection<K> entries = F.viewReadOnly(
+                    cnt >= 0 ? idx0.values() : idx0.descendingMap().values(),
+                    entryToKey);
+
+                @NotNull @Override public Iterator<K> iterator() {
+                    return entries.iterator();
+                }
+
+                @Override public int size() {
+                    return entries.size();
+                }
+            };
+
+            return cnt == 0 ? col : F.limit(col, U.safeAbs(cnt));
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<V> values(int cnt) {
+            Collection<StreamerIndexEntry<E, K, V>> col = entries(cnt);
+
+            return F.viewReadOnly(col, entryToVal);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<E> events(int cnt) {
+            Collection<E> evts = events(cnt >= 0, null, false, null, false);
+
+            return cnt == 0 ? evts : F.limit(evts, U.safeAbs(cnt));
+        }
+
+        /** {@inheritDoc} */
+        @Override public Set<StreamerIndexEntry<E, K, V>> entrySet(V val) {
+            return entrySet(true, val, true, val, true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Set<StreamerIndexEntry<E, K, V>> entrySet(final boolean asc, @Nullable final V fromVal,
+            final boolean fromIncl, @Nullable final V toVal, final boolean toIncl) {
+            Set<StreamerIndexEntry<E, K, V>> set = new AbstractSet<StreamerIndexEntry<E, K, V>>() {
+                private Map<IndexKey<V>, Entry<E, K, V>> map = subMap(asc, fromVal, fromIncl, toVal, toIncl);
+
+                @NotNull @Override public Iterator<StreamerIndexEntry<E, K, V>> iterator() {
+                    Collection vals = map.values();
+
+                    return (Iterator<StreamerIndexEntry<E, K, V>>)vals.iterator();
+                }
+
+                @Override public int size() {
+                    return map.size();
+                }
+            };
+
+            return Collections.unmodifiableSet(set);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Set<K> keySet(V val) {
+            return keySet(true, val, true, val, true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Set<K> keySet(final boolean asc, @Nullable final V fromVal, final boolean fromIncl,
+            @Nullable final V toVal, final boolean toIncl) {
+            Set<K> set = new AbstractSet<K>() {
+                private Map<IndexKey<V>, Entry<E, K, V>> map = subMap(asc, fromVal, fromIncl, toVal, toIncl);
+
+                @NotNull @Override public Iterator<K> iterator() {
+                    return F.iterator(map.values(), entryToKey, true);
+                }
+
+                @Override public int size() {
+                    return map.size();
+                }
+            };
+
+            return Collections.unmodifiableSet(set);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<V> values(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal,
+            boolean toIncl) {
+            Map<IndexKey<V>, Entry<E, K, V>> map = subMap(asc, fromVal, fromIncl, toVal, toIncl);
+
+            return F.viewReadOnly(map.values(), entryToVal);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<E> events(V val) {
+            A.notNull(val, "val");
+
+            return events(true, val, true, val, true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<E> events(final boolean asc, @Nullable final V fromVal, final boolean fromIncl,
+            @Nullable final V toVal, final boolean toIncl) {
+            if (getPolicy() == EVENT_TRACKING_OFF)
+                throw new IllegalStateException("Event tracking is off: " + this);
+
+            Collection<E> evts = new AbstractCollection<E>() {
+                private final Map<IndexKey<V>, Entry<E, K, V>> map = subMap(asc, fromVal, fromIncl, toVal, toIncl);
+
+                private int size = -1;
+
+                @NotNull @Override public Iterator<E> iterator() {
+                    return new Iterator<E>() {
+                        private final Iterator<Entry<E, K, V>> entryIter = map.values().iterator();
+
+                        private Iterator<E> evtIter;
+
+                        private boolean moved = true;
+
+                        private boolean more;
+
+                        @Override public boolean hasNext() {
+                            if (!moved)
+                                return more;
+
+                            moved = false;
+
+                            if (evtIter != null && evtIter.hasNext())
+                                return more = true;
+
+                            while (entryIter.hasNext()) {
+                                evtIter = eventsIterator(entryIter.next());
+
+                                if (evtIter.hasNext())
+                                    return more = true;
+                            }
+
+                            return more = false;
+                        }
+
+                        @Override public E next() {
+                            if (hasNext()) {
+                                moved = true;
+
+                                return evtIter.next();
+                            }
+
+                            throw new NoSuchElementException();
+                        }
+
+                        @Override public void remove() {
+                            assert false;
+                        }
+                    };
+                }
+
+                @Override public int size() {
+                    return size != -1 ? size :
+                        fromVal == null && toVal == null ? (size = evtsCnt) : (size = F.size(iterator()));
+                }
+
+                /**
+                 * @param entry Entry.
+                 * @return Events iterator.
+                 */
+                @SuppressWarnings("fallthrough")
+                Iterator<E> eventsIterator(StreamerIndexEntry<E,K,V> entry) {
+                    switch (getPolicy()) {
+                        case EVENT_TRACKING_ON:
+                        case EVENT_TRACKING_ON_DEDUP:
+                            Collection<E> evts = entry.events();
+
+                            assert evts != null;
+
+                            return evts.iterator();
+
+                        default:
+                            assert false;
+
+                            throw new IllegalStateException("Event tracking is off: " + Index.this);
+                    }
+                }
+            };
+
+            return Collections.unmodifiableCollection(evts);
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public StreamerIndexEntry<E, K, V> firstEntry() {
+            return idx0.firstEntry().getValue();
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public StreamerIndexEntry<E, K, V> lastEntry() {
+            return idx0.lastEntry().getValue();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<StreamerIndexEntry<E, K, V>> iterator() {
+            return entries(0).iterator();
+        }
+
+        /**
+         * @param asc Ascending.
+         * @param fromVal From.
+         * @param fromIncl Include from.
+         * @param toVal To.
+         * @param toIncl Include to.
+         * @return Map.
+         */
+        private Map<IndexKey<V>, Entry<E, K, V>> subMap(boolean asc, @Nullable V fromVal, boolean fromIncl,
+            @Nullable V toVal, boolean toIncl) {
+            if (fromVal != null && toVal != null) {
+                int cmpRes = cmp != null ? cmp.compare(toVal, fromVal) : ((Comparable<V>)toVal).compareTo(fromVal);
+
+                if ((asc && cmpRes < 0) || (!asc && cmpRes > 0))
+                    throw new IllegalArgumentException("Boundaries are invalid [asc=" + asc + ", fromVal=" + fromVal +
+                        ", toVal=" + toVal + ']');
+            }
+
+            if (idx0.isEmpty())
+                return Collections.emptyMap();
+
+            ConcurrentNavigableMap<IndexKey<V>,Entry<E,K,V>> map = asc ? idx0 : idx0.descendingMap();
+
+            if (fromVal == null) {
+                fromVal = map.firstKey().value();
+
+                fromIncl = true;
+            }
+
+            if (toVal == null) {
+                toVal = map.lastKey().value();
+
+                toIncl = true;
+            }
+
+            return map.subMap(searchKeyFrom(fromVal, asc, fromIncl), fromIncl, searchKeyTo(toVal, asc, toIncl), toIncl);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Index.class, this, "provider", StreamerTreeIndexProvider.this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerWindowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerWindowAdapter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerWindowAdapter.java
index b6a8a01..83d9127 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerWindowAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerWindowAdapter.java
@@ -34,10 +34,10 @@ public abstract class StreamerWindowAdapter<E> implements LifecycleAware, Stream
     private IgnitePredicate<Object> filter;
 
     /** Indexes. */
-    private Map<String, GridStreamerIndexProvider<E, ?, ?>> idxsAsMap;
+    private Map<String, StreamerIndexProvider<E, ?, ?>> idxsAsMap;
 
     /** */
-    private GridStreamerIndexProvider<E, ?, ?>[] idxs;
+    private StreamerIndexProvider<E, ?, ?>[] idxs;
 
     /** Lock for updates and snapshot. */
     private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
@@ -251,7 +251,7 @@ public abstract class StreamerWindowAdapter<E> implements LifecycleAware, Stream
         checkConfiguration();
 
         if (idxs != null) {
-            for (GridStreamerIndexProvider<E, ?, ?> idx : idxs)
+            for (StreamerIndexProvider<E, ?, ?> idx : idxs)
                 idx.initialize();
         }
 
@@ -264,7 +264,7 @@ public abstract class StreamerWindowAdapter<E> implements LifecycleAware, Stream
 
         try {
             if (idxs != null) {
-                for (GridStreamerIndexProvider<E, ?, ?> idx : idxs)
+                for (StreamerIndexProvider<E, ?, ?> idx : idxs)
                     idx.reset();
             }
 
@@ -359,14 +359,14 @@ public abstract class StreamerWindowAdapter<E> implements LifecycleAware, Stream
     }
 
     /** {@inheritDoc} */
-    @Override public <K, V> GridStreamerIndex<E, K, V> index() {
+    @Override public <K, V> StreamerIndex<E, K, V> index() {
         return index(null);
     }
 
     /** {@inheritDoc} */
-    @Override public <K, V> GridStreamerIndex<E, K, V> index(@Nullable String name) {
+    @Override public <K, V> StreamerIndex<E, K, V> index(@Nullable String name) {
         if (idxsAsMap != null) {
-            GridStreamerIndexProvider<E, K, V> idx = (GridStreamerIndexProvider<E, K, V>)idxsAsMap.get(name);
+            StreamerIndexProvider<E, K, V> idx = (StreamerIndexProvider<E, K, V>)idxsAsMap.get(name);
 
             if (idx == null)
                 throw new IllegalArgumentException("Streamer index is not configured: " + name);
@@ -378,11 +378,11 @@ public abstract class StreamerWindowAdapter<E> implements LifecycleAware, Stream
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<GridStreamerIndex<E, ?, ?>> indexes() {
+    @Override public Collection<StreamerIndex<E, ?, ?>> indexes() {
         if (idxs != null) {
-            Collection<GridStreamerIndex<E, ?, ?>> res = new ArrayList<>(idxs.length);
+            Collection<StreamerIndex<E, ?, ?>> res = new ArrayList<>(idxs.length);
 
-            for (GridStreamerIndexProvider<E, ?, ?> idx : idxs)
+            for (StreamerIndexProvider<E, ?, ?> idx : idxs)
                 res.add(idx.index());
 
             return res;
@@ -396,7 +396,7 @@ public abstract class StreamerWindowAdapter<E> implements LifecycleAware, Stream
      *
      * @return Index providers.
      */
-    public GridStreamerIndexProvider<E, ?, ?>[] indexProviders() {
+    public StreamerIndexProvider<E, ?, ?>[] indexProviders() {
         return idxs;
     }
 
@@ -407,16 +407,16 @@ public abstract class StreamerWindowAdapter<E> implements LifecycleAware, Stream
      * @throws IllegalArgumentException If some index names are not unique.
      */
     @SuppressWarnings("unchecked")
-    public void setIndexes(GridStreamerIndexProvider<E, ?, ?>... idxs) throws IllegalArgumentException {
+    public void setIndexes(StreamerIndexProvider<E, ?, ?>... idxs) throws IllegalArgumentException {
         A.ensure(!F.isEmpty(idxs), "!F.isEmpty(idxs)");
 
         idxsAsMap = new HashMap<>(idxs.length, 1.0f);
-        this.idxs = new GridStreamerIndexProvider[idxs.length];
+        this.idxs = new StreamerIndexProvider[idxs.length];
 
         int i = 0;
 
-        for (GridStreamerIndexProvider<E, ?, ?> idx : idxs) {
-            GridStreamerIndexProvider<E, ?, ?> old = idxsAsMap.put(idx.getName(), idx);
+        for (StreamerIndexProvider<E, ?, ?> idx : idxs) {
+            StreamerIndexProvider<E, ?, ?> old = idxsAsMap.put(idx.getName(), idx);
 
             if (old != null)
                 throw new IllegalArgumentException("Index name is not unique [idx1=" + old + ", idx2=" + idx + ']');
@@ -439,12 +439,12 @@ public abstract class StreamerWindowAdapter<E> implements LifecycleAware, Stream
      */
     protected void updateIndexes(E evt, boolean rmv) throws GridException {
         if (idxs != null) {
-            GridStreamerIndexUpdateSync sync = new GridStreamerIndexUpdateSync();
+            StreamerIndexUpdateSync sync = new StreamerIndexUpdateSync();
 
             boolean rollback = true;
 
             try {
-                for (GridStreamerIndexProvider<E, ?, ?> idx : idxs) {
+                for (StreamerIndexProvider<E, ?, ?> idx : idxs) {
                     if (rmv)
                         idx.remove(sync, evt);
                     else
@@ -454,7 +454,7 @@ public abstract class StreamerWindowAdapter<E> implements LifecycleAware, Stream
                 rollback = false;
             }
             finally {
-                for (GridStreamerIndexProvider<E, ?, ?> idx : idxs)
+                for (StreamerIndexProvider<E, ?, ?> idx : idxs)
                     idx.endUpdate(sync, evt, rollback, rmv);
 
                 sync.finish(1);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml b/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml
index d11f805..b87dc2f 100644
--- a/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml
+++ b/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml
@@ -39,7 +39,7 @@
                             <property name="maximumSize" value="500"/>
                             <property name="indexes">
                                 <list>
-                                    <bean class="org.gridgain.grid.streamer.index.tree.GridStreamerTreeIndexProvider">
+                                    <bean class="org.gridgain.grid.streamer.index.tree.StreamerTreeIndexProvider">
                                         <property name="updater">
                                             <bean class="org.gridgain.loadtests.streamer.IndexUpdater"/>
                                         </property>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
index a5ffff6..f8b7de3 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
@@ -79,17 +79,17 @@ public class GridStreamerLifecycleAwareSelfTest extends GridAbstractLifecycleAwa
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public GridStreamerIndex index() {
+        @Nullable @Override public StreamerIndex index() {
             return null;
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public GridStreamerIndex index(@Nullable String name) {
+        @Nullable @Override public StreamerIndex index(@Nullable String name) {
             return null;
         }
 
         /** {@inheritDoc} */
-        @Override public Collection<GridStreamerIndex> indexes() {
+        @Override public Collection<StreamerIndex> indexes() {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/test/java/org/gridgain/grid/streamer/index/GridStreamerIndexSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/streamer/index/GridStreamerIndexSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/streamer/index/GridStreamerIndexSelfTest.java
index 99d0694..495a576 100644
--- a/modules/core/src/test/java/org/gridgain/grid/streamer/index/GridStreamerIndexSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/streamer/index/GridStreamerIndexSelfTest.java
@@ -22,7 +22,7 @@ import org.jetbrains.annotations.*;
 import java.util.*;
 import java.util.concurrent.atomic.*;
 
-import static org.gridgain.grid.streamer.index.GridStreamerIndexPolicy.*;
+import static org.gridgain.grid.streamer.index.StreamerIndexPolicy.*;
 import static org.gridgain.testframework.GridTestUtils.*;
 
 /**
@@ -33,7 +33,7 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testTreeIndex() throws Exception {
-        for (GridStreamerIndexPolicy plc : GridStreamerIndexPolicy.values()) {
+        for (StreamerIndexPolicy plc : StreamerIndexPolicy.values()) {
             checkUniqueIndex(indexProvider(true, "idx", new UniqueStringIndexUpdater(), plc, true));
 
             checkNonUniqueIndex(indexProvider(true, "idx", new IndexUpdater(), plc, false));
@@ -44,7 +44,7 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testHashIndex() throws Exception {
-        for (GridStreamerIndexPolicy plc : GridStreamerIndexPolicy.values()) {
+        for (StreamerIndexPolicy plc : StreamerIndexPolicy.values()) {
             checkUniqueIndex(indexProvider(false, "idx", new UniqueStringIndexUpdater(), plc, true));
 
             checkNonUniqueIndex(indexProvider(false, "idx", new IndexUpdater(), plc, false));
@@ -55,10 +55,10 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testMultipleIndexUpdate() throws Exception {
-        GridStreamerIndexProvider<String, String, Integer> idxProvider =
+        StreamerIndexProvider<String, String, Integer> idxProvider =
             indexProvider(true, "idx", new IndexUpdater(), EVENT_TRACKING_ON, false);
 
-        GridStreamerIndexProvider<String, String, String> idxProvider1 =
+        StreamerIndexProvider<String, String, String> idxProvider1 =
             indexProvider(true, "idx1", new UniqueStringIndexUpdater(), EVENT_TRACKING_ON, true);
 
         StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
@@ -74,8 +74,8 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
         win.enqueue("D");
 
         // Snapshot both indexes.
-        GridStreamerIndex<String, String, Integer> idx = win.index("idx");
-        GridStreamerIndex<String, String, String> idx1 = win.index("idx1");
+        StreamerIndex<String, String, Integer> idx = win.index("idx");
+        StreamerIndex<String, String, String> idx1 = win.index("idx1");
 
         info("Idx: " + idx.entries(0));
         info("Idx1: " + idx1.entries(0));
@@ -89,8 +89,8 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
             info("Caught expected exception: " + e);
         }
 
-        GridStreamerIndex<String, String, Integer> idxAfter = win.index("idx");
-        GridStreamerIndex<String, String, String> idx1After = win.index("idx1");
+        StreamerIndex<String, String, Integer> idxAfter = win.index("idx");
+        StreamerIndex<String, String, String> idx1After = win.index("idx1");
 
         info("Idx (after): " + idxAfter.entries(0));
         info("Idx1 (after): " + idx1After.entries(0));
@@ -155,25 +155,25 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
     public void testUpdaterOnAddedNull() throws Exception {
         checkIndexUpdater(new IndexUpdater() {
             @Nullable @Override
-            public Integer onAdded(GridStreamerIndexEntry<String, String, Integer> entry, String evt) {
+            public Integer onAdded(StreamerIndexEntry<String, String, Integer> entry, String evt) {
                 return "A".equals(evt) ? null : entry.value() + 1;
             }
         });
     }
 
     /**
-     * Checks the correct behaviour of {@link GridStreamerIndexUpdater}, given that
+     * Checks the correct behaviour of {@link StreamerIndexUpdater}, given that
      * it discards event "A" and accepts event "B".
      *
      * @param updater Index updater.
      * @throws GridException If failed.
      */
-    private void checkIndexUpdater(GridStreamerIndexUpdater<String, String, Integer> updater) throws GridException {
-        List<GridStreamerIndexProvider<String, String, Integer>> idxps = Arrays.asList(
-            indexProvider(true, "tree", updater, GridStreamerIndexPolicy.EVENT_TRACKING_ON, false),
-            indexProvider(false, "hash", updater, GridStreamerIndexPolicy.EVENT_TRACKING_ON, false));
+    private void checkIndexUpdater(StreamerIndexUpdater<String, String, Integer> updater) throws GridException {
+        List<StreamerIndexProvider<String, String, Integer>> idxps = Arrays.asList(
+            indexProvider(true, "tree", updater, StreamerIndexPolicy.EVENT_TRACKING_ON, false),
+            indexProvider(false, "hash", updater, StreamerIndexPolicy.EVENT_TRACKING_ON, false));
 
-        for (GridStreamerIndexProvider<String, String, Integer> idxp : idxps) {
+        for (StreamerIndexProvider<String, String, Integer> idxp : idxps) {
             StreamerUnboundedWindow<String> win = new StreamerUnboundedWindow<>();
 
             win.setIndexes(idxp);
@@ -184,7 +184,7 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
             win.enqueue("A");
             win.enqueue("B");
 
-            GridStreamerIndex<String, Object, Object> idx = win.index(idxp.getName());
+            StreamerIndex<String, Object, Object> idx = win.index(idxp.getName());
 
             assertNotNull(idx);
 
@@ -202,10 +202,10 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
      * @param unique Unique.
      * @return Index provider.
      */
-    private <E, K, V> GridStreamerIndexProvider<E, K, V> indexProvider(boolean treeIdx, String name,
-        GridStreamerIndexUpdater<E, K, V> updater, GridStreamerIndexPolicy plc, boolean unique) {
+    private <E, K, V> StreamerIndexProvider<E, K, V> indexProvider(boolean treeIdx, String name,
+        StreamerIndexUpdater<E, K, V> updater, StreamerIndexPolicy plc, boolean unique) {
         if (treeIdx) {
-            GridStreamerTreeIndexProvider<E, K, V> idx = new GridStreamerTreeIndexProvider<>();
+            StreamerTreeIndexProvider<E, K, V> idx = new StreamerTreeIndexProvider<>();
 
             idx.setName(name);
             idx.setUpdater(updater);
@@ -215,7 +215,7 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
             return idx;
         }
         else {
-            GridStreamerHashIndexProvider<E, K, V> idx = new GridStreamerHashIndexProvider<>();
+            StreamerHashIndexProvider<E, K, V> idx = new StreamerHashIndexProvider<>();
 
             idx.setName(name);
             idx.setUpdater(updater);
@@ -233,7 +233,7 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
      */
     private void checkUniqueHashIndexMultithreaded(int threadCnt, final int iters)
         throws Exception {
-        GridStreamerIndexProvider<String, String, Integer> idxProvider =
+        StreamerIndexProvider<String, String, Integer> idxProvider =
             indexProvider(false, "idx", new IndexUpdater(), EVENT_TRACKING_ON_DEDUP, true);
 
         for (int i = 0; i < iters && !Thread.currentThread().isInterrupted(); i++) {
@@ -265,7 +265,7 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
             // Only one thread should succeed, because the index is unique.
             assertEquals(threadCnt - 1, nIdxErrors.get());
 
-            GridStreamerIndex<String, String, Integer> idx = win.index("idx");
+            StreamerIndex<String, String, Integer> idx = win.index("idx");
 
             // Only one event should be present and have value 1.
             assertEquals(1, idx.entries(0).size());
@@ -297,12 +297,12 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
                 @Override public void applyx() throws GridException {
                     try {
                         while (!Thread.currentThread().isInterrupted()) {
-                            GridStreamerIndex<String, String, Integer> idx = win.index("idx");
+                            StreamerIndex<String, String, Integer> idx = win.index("idx");
 
                             boolean canPoll = F.forAll(
                                 idx.entries(-1 * threadCnt),
-                                new P1<GridStreamerIndexEntry<String, String, Integer>>() {
-                                    @Override public boolean apply(GridStreamerIndexEntry<String, String, Integer> e) {
+                                new P1<StreamerIndexEntry<String, String, Integer>>() {
+                                    @Override public boolean apply(StreamerIndexEntry<String, String, Integer> e) {
                                         return e.value() > 2;
                                     }
                                 });
@@ -331,8 +331,8 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
                     for (int i = 0; i < iters && !Thread.currentThread().isInterrupted(); i++) {
                         win.enqueue(evt);
 
-                        GridStreamerIndex<String, String, Integer> idx = win.index("idx");
-                        GridStreamerIndexEntry<String, String, Integer> entry = idx.entry(evt);
+                        StreamerIndex<String, String, Integer> idx = win.index("idx");
+                        StreamerIndexEntry<String, String, Integer> entry = idx.entry(evt);
 
                         assertNotNull(entry);
 
@@ -370,7 +370,7 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
      * @param idx Index.
      * @throws GridException If failed.
      */
-    private void checkNonUniqueIndex(GridStreamerIndexProvider<String, String, Integer> idx) throws GridException {
+    private void checkNonUniqueIndex(StreamerIndexProvider<String, String, Integer> idx) throws GridException {
         assert !idx.isUnique();
 
         StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
@@ -387,20 +387,20 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
             win.enqueue("D"); i++;
         }
 
-        GridStreamerIndex<String, String, Integer> idx0 = win.index("idx");
+        StreamerIndex<String, String, Integer> idx0 = win.index("idx");
 
         String s;
 
         while ((s = win.pollEvicted()) != null)
             info("Evicted String: " + s);
 
-        GridStreamerIndex<String, String, Integer> idx1 = win.index("idx");
+        StreamerIndex<String, String, Integer> idx1 = win.index("idx");
 
-        if (idx instanceof GridStreamerTreeIndexProvider) { // Tree index.
+        if (idx instanceof StreamerTreeIndexProvider) { // Tree index.
             assert idx0.sorted();
 
             // Users with unique names.
-            for (GridStreamerIndexEntry<String, String, Integer> e : idx0.entrySet(1)) {
+            for (StreamerIndexEntry<String, String, Integer> e : idx0.entrySet(1)) {
                 info("Entry [e=" + e + ", evts=" + e.events() + ']');
 
                 if (idx.getPolicy() == EVENT_TRACKING_ON || idx.getPolicy() == EVENT_TRACKING_ON_DEDUP) {
@@ -411,7 +411,7 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
 
             assertTrue(idx0.entrySet(2).isEmpty());
 
-            for (GridStreamerIndexEntry<String, String, Integer> e : idx0.entrySet(5)) {
+            for (StreamerIndexEntry<String, String, Integer> e : idx0.entrySet(5)) {
                 info("Entry [e=" + e + ", evts=" + e.events() + ']');
 
                 if (idx.getPolicy() == EVENT_TRACKING_ON)
@@ -426,9 +426,9 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
 
             assertEquals(5, idx0.entrySet(1).size());
 
-            List<GridStreamerIndexEntry<String, String, Integer>> asc =
+            List<StreamerIndexEntry<String, String, Integer>> asc =
                 new ArrayList<>(idx0.entrySet(true, null, true, null, true));
-            List<GridStreamerIndexEntry<String, String, Integer>> desc =
+            List<StreamerIndexEntry<String, String, Integer>> desc =
                 new ArrayList<>(idx0.entrySet(false, null, true, null, true));
 
             assertEquals(8, asc.size());
@@ -460,7 +460,7 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
 
         assertEquals(4, idx1.size());
 
-        for (GridStreamerIndexEntry<String, String, Integer> e : idx1.entries(0)) {
+        for (StreamerIndexEntry<String, String, Integer> e : idx1.entries(0)) {
             Collection<String> evts = e.events();
 
             info("Entry [e=" + e + ", evts=" + evts + ']');
@@ -505,7 +505,7 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
      * @param idx Index.
      * @throws GridException If failed.
      */
-    private void checkUniqueIndex(GridStreamerIndexProvider<String, String, String> idx) throws GridException {
+    private void checkUniqueIndex(StreamerIndexProvider<String, String, String> idx) throws GridException {
         assert idx.isUnique();
 
         StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
@@ -529,20 +529,20 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
             }
         }
 
-        GridStreamerIndex<String, String, String> idx0 = win.index("idx");
+        StreamerIndex<String, String, String> idx0 = win.index("idx");
 
         String s;
 
         while ((s = win.pollEvicted()) != null)
             info("Evicted string: " + s);
 
-        GridStreamerIndex<String, String, String> idx1 = win.index("idx");
+        StreamerIndex<String, String, String> idx1 = win.index("idx");
 
-        if (idx instanceof GridStreamerTreeIndexProvider) { // Tree index.
+        if (idx instanceof StreamerTreeIndexProvider) { // Tree index.
             assert idx0.sorted();
 
             // Users with unique names.
-            for (GridStreamerIndexEntry<String, String, String> e : idx0.entrySet("A0")) {
+            for (StreamerIndexEntry<String, String, String> e : idx0.entrySet("A0")) {
                 info("Entry [e=" + e + ", evts=" + e.events() + ']');
 
                 if (idx.getPolicy() == EVENT_TRACKING_ON || idx.getPolicy() == EVENT_TRACKING_ON_DEDUP) {
@@ -555,9 +555,9 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
 
             assertEquals(1, idx0.entrySet("A0").size());
 
-            List<GridStreamerIndexEntry<String, String, String>> asc =
+            List<StreamerIndexEntry<String, String, String>> asc =
                 new ArrayList<>(idx0.entrySet(true, null, true, null, true));
-            List<GridStreamerIndexEntry<String, String, String>> desc =
+            List<StreamerIndexEntry<String, String, String>> desc =
                 new ArrayList<>(idx0.entrySet(false, null, true, null, true));
 
             assertEquals(20, asc.size());
@@ -571,7 +571,7 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
 
         assertEquals(5, idx1.size());
 
-        for (GridStreamerIndexEntry<String, String, String> e : idx1.entries(0)) {
+        for (StreamerIndexEntry<String, String, String> e : idx1.entries(0)) {
             Collection<String> evts = e.events();
 
             info("Entry [e=" + e + ", evts=" + evts + ']');
@@ -599,7 +599,7 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
     /**
      * Name index updater.
      */
-    private static class IndexUpdater implements GridStreamerIndexUpdater<String, String, Integer> {
+    private static class IndexUpdater implements StreamerIndexUpdater<String, String, Integer> {
         /** {@inheritDoc} */
         @Nullable @Override public String indexKey(String evt) {
             return evt;
@@ -611,12 +611,12 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public Integer onAdded(GridStreamerIndexEntry<String, String, Integer> entry, String evt) {
+        @Nullable @Override public Integer onAdded(StreamerIndexEntry<String, String, Integer> entry, String evt) {
             return entry.value() + 1;
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public Integer onRemoved(GridStreamerIndexEntry<String, String, Integer> entry,
+        @Nullable @Override public Integer onRemoved(StreamerIndexEntry<String, String, Integer> entry,
             String evt) {
             int res = entry.value() - 1;
 
@@ -627,7 +627,7 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
     /**
      * Name index updater.
      */
-    private static class HashIndexUpdater implements GridStreamerIndexUpdater<String, String, Integer> {
+    private static class HashIndexUpdater implements StreamerIndexUpdater<String, String, Integer> {
         /** {@inheritDoc} */
         @Nullable @Override public String indexKey(String evt) {
             return evt;
@@ -639,12 +639,12 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public Integer onAdded(GridStreamerIndexEntry<String, String, Integer> entry, String evt) {
+        @Nullable @Override public Integer onAdded(StreamerIndexEntry<String, String, Integer> entry, String evt) {
             return entry.value() + 1;
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public Integer onRemoved(GridStreamerIndexEntry<String, String, Integer> entry,
+        @Nullable @Override public Integer onRemoved(StreamerIndexEntry<String, String, Integer> entry,
             String evt) {
             int res = entry.value() - 1;
 
@@ -655,7 +655,7 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
     /**
      * Name index updater.
      */
-    private static class UniqueStringIndexUpdater implements GridStreamerIndexUpdater<String, String, String> {
+    private static class UniqueStringIndexUpdater implements StreamerIndexUpdater<String, String, String> {
         /** {@inheritDoc} */
         @Nullable @Override public String indexKey(String evt) {
             return evt;
@@ -667,13 +667,13 @@ public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public String onAdded(GridStreamerIndexEntry<String, String, String> entry, String evt)
+        @Nullable @Override public String onAdded(StreamerIndexEntry<String, String, String> entry, String evt)
             throws GridException {
             throw new GridException("Unique key violation: " + evt);
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public String onRemoved(GridStreamerIndexEntry<String, String, String> entry,
+        @Nullable @Override public String onRemoved(StreamerIndexEntry<String, String, String> entry,
             String evt) {
             // On remove we return null as index is unique.
             return null;


Mime
View raw message