ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [18/32] incubator-ignite git commit: # Renaming
Date Fri, 05 Dec 2014 10:03:19 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProvider.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProvider.java
new file mode 100644
index 0000000..696f33f
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProvider.java
@@ -0,0 +1,99 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.streamer.index;
+
+import org.gridgain.grid.*;
+
+/**
+ * Represents an actual instance of an index. Used by a {@link org.gridgain.grid.streamer.StreamerWindow}
+ * to perform event indexing.
+ * <p>
+ * To configure index for a streamer window, use
+ * {@link org.gridgain.grid.streamer.window.StreamerWindowAdapter#setIndexes(StreamerIndexProvider[])}.
+ */
+public interface StreamerIndexProvider<E, K, V> extends StreamerIndexProviderMBean {
+    /**
+     * Gets index name.
+     *
+     * @return Name of the index.
+     */
+    public String getName();
+
+    /**
+     * Gets user view for this index. This view is a snapshot
+     * of a current index state. Once returned, it does not
+     * change over time.
+     *
+     * @return User view for this index.
+     */
+    public StreamerIndex<E, K, V> index();
+
+    /**
+     * Initializes the index.
+     */
+    public void initialize();
+
+    /**
+     * Resets the index to an initial empty state.
+     */
+    public void reset();
+
+    /**
+     * Disposes the index.
+     */
+    public void dispose();
+
+    /**
+     * Adds an event to index.
+     *
+     * @param sync Index update synchronizer.
+     * @param evt Event to add to an index.
+     * @throws GridException If failed to add event to an index.
+     */
+    public void add(StreamerIndexUpdateSync sync, E evt) throws GridException;
+
+    /**
+     * Removes an event from index.
+     *
+     * @param sync Index update synchronizer.
+     * @param evt Event to remove from index.
+     * @throws GridException If failed to add event to an index.
+     */
+    public void remove(StreamerIndexUpdateSync sync, E evt) throws GridException;
+
+    /**
+     * Gets event indexing policy, which defines how events
+     * are tracked within an index.
+     *
+     * @return index policy.
+     */
+    public StreamerIndexPolicy getPolicy();
+
+    /**
+     * Checks whether this index is unique or not. If it is, equal events
+     * are not allowed, which means that if a newly-added event is found
+     * to be equal to one of the already present events
+     * ({@link Object#equals(Object)} returns {@code true}), an exception
+     * is thrown.
+     *
+     * @return {@code True} for unique index.
+     */
+    public boolean isUnique();
+
+    /**
+     * Finalizes an update operation.
+     *
+     * @param sync Index update synchronizer.
+     * @param evt Updated event.
+     * @param rollback Rollback flag. If {@code true}, a rollback was made.
+     * @param rmv Remove flag. If {@code true}, the event was removed from index.
+     */
+    public void endUpdate(StreamerIndexUpdateSync sync, E evt, boolean rollback, boolean rmv);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProviderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProviderAdapter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProviderAdapter.java
new file mode 100644
index 0000000..9f3448c
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProviderAdapter.java
@@ -0,0 +1,788 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.streamer.index;
+
+import com.romix.scala.*;
+import com.romix.scala.collection.concurrent.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+import org.pcollections.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.gridgain.grid.streamer.index.StreamerIndexPolicy.*;
+
+/**
+ * Convenient {@link StreamerIndexProvider} adapter implementing base configuration methods.
+ */
+public abstract class StreamerIndexProviderAdapter<E, K, V> implements StreamerIndexProvider<E, K, V> {
+    /** */
+    protected final IgniteClosure<StreamerIndexEntry<E, K, V>, V> entryToVal =
+        new C1<StreamerIndexEntry<E, K, V>, V>() {
+            @Override public V apply(StreamerIndexEntry<E, K, V> e) {
+                return e.value();
+            }
+        };
+
+    /** */
+    protected final IgniteClosure<StreamerIndexEntry<E, K, V>, K> entryToKey =
+        new C1<StreamerIndexEntry<E, K, V>, K>() {
+            @Override public K apply(StreamerIndexEntry<E, K, V> e) {
+                return e.key();
+            }
+        };
+
+    /** Keys currently being updated. */
+    private final ConcurrentMap<K, StreamerIndexUpdateSync> locks = new ConcurrentHashMap8<>();
+
+    /** Index name. */
+    private String name;
+
+    /** Index policy. */
+    private StreamerIndexPolicy plc = EVENT_TRACKING_OFF;
+
+    /** Index updater. */
+    private StreamerIndexUpdater<E, K, V> updater;
+
+    /** */
+    private final LongAdder evtsCnt = new LongAdder();
+
+    /** Read write lock. */
+    private final GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock();
+
+    /** */
+    private boolean unique;
+
+    /** */
+    private final ThreadLocal<K> threadLocKey = new ThreadLocal<>();
+
+    /** */
+    private final ConcurrentMap<IndexKey<V>, StreamerIndexUpdateSync> idxLocks = new ConcurrentHashMap8<>();
+
+    /** */
+    private boolean keyCheck = true;
+
+    /**
+     * Sets index name.
+     *
+     * @param name Index name.
+     */
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getName() {
+        return name;
+    }
+
+    /**
+     * Sets index policy.
+     *
+     * @param plc Policy.
+     */
+    public void setPolicy(StreamerIndexPolicy plc) {
+        this.plc = plc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public StreamerIndexPolicy getPolicy() {
+        return plc;
+    }
+
+    /**
+     * Sets unique flag.
+     *
+     * @param unique {@code True} for unique index.
+     */
+    public void setUnique(boolean unique) {
+        this.unique = unique;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isUnique() {
+        return unique;
+    }
+
+    /**
+     * Sets index updater.
+     *
+     * @param updater Updater.
+     */
+    public void setUpdater(StreamerIndexUpdater<E, K, V> updater) {
+        this.updater = updater;
+    }
+
+    /**
+     * Gets index updater.
+     *
+     * @return Updater.
+     */
+    public StreamerIndexUpdater<E, K, V> getUpdater() {
+        return updater;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void dispose() {
+        // No-op.
+    }
+
+    /**
+     * Add event to the index.
+     *
+     * @param sync Sync.
+     * @param evt Event.
+     */
+    @Override public void add(StreamerIndexUpdateSync sync, E evt) throws GridException {
+        assert evt != null;
+
+        if (threadLocKey.get() != null)
+            throw new IllegalStateException("Previous operation has not been finished: " + threadLocKey.get());
+
+        K key = updater.indexKey(evt);
+
+        if (key == null)
+            return; // Ignore event.
+
+        validateIndexKey(key);
+
+        readLock();
+
+        threadLocKey.set(key);
+
+        lockKey(key, sync);
+
+        add(evt, key, sync);
+    }
+
+    /**
+     * Remove event from the index.
+     *
+     * @param sync Sync.
+     * @param evt Event.
+     */
+    @Override public void remove(StreamerIndexUpdateSync sync, E evt) throws GridException {
+        assert evt != null;
+
+        if (threadLocKey.get() != null)
+            throw new IllegalStateException("Previous operation has not been finished: " + threadLocKey.get());
+
+        K key = updater.indexKey(evt);
+
+        assert key != null;
+
+        validateIndexKey(key);
+
+        readLock();
+
+        threadLocKey.set(key);
+
+        lockKey(key, sync);
+
+        remove(evt, key, sync);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void endUpdate(StreamerIndexUpdateSync sync, E evt, boolean rollback, boolean rmv) {
+        K key = threadLocKey.get();
+
+        if (key == null)
+            return;
+
+        if (!rollback) {
+            if (rmv)
+                evtsCnt.decrement();
+            else
+                evtsCnt.increment();
+        }
+
+        threadLocKey.remove();
+
+        endUpdate0(sync, evt, key, rollback);
+
+        unlockKey(key, sync);
+
+        readUnlock();
+    }
+
+    /**
+     * @param sync Sync.
+     * @param evt Event.
+     * @param key Key.
+     * @param rollback Rollback flag.
+     */
+    protected abstract void endUpdate0(StreamerIndexUpdateSync sync, E evt, K key, boolean rollback);
+
+    /** {@inheritDoc} */
+    @Override public void reset() {
+        writeLock();
+
+        try {
+            reset0();
+        }
+        finally {
+            writeUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public StreamerIndex<E, K, V> index() {
+        writeLock();
+
+        try {
+            return index0();
+        }
+        finally {
+            writeUnlock();
+        }
+    }
+
+    /**
+     * Called on reset.
+     */
+    protected abstract void reset0();
+
+    /**
+     * @return Index
+     */
+    protected abstract StreamerIndex<E, K, V> index0();
+
+    /**
+     *
+     */
+    protected void readLock() {
+        rwLock.readLock();
+    }
+
+    /**
+     *
+     */
+    protected void readUnlock() {
+        rwLock.readUnlock();
+    }
+
+    /**
+     *
+     */
+    protected void writeLock() {
+        rwLock.writeLock();
+    }
+
+    /**
+     *
+     */
+    protected void writeUnlock() {
+        rwLock.writeUnlock();
+    }
+
+    /**
+     * @return Events count,
+     */
+    protected int eventsCount() {
+        return evtsCnt.intValue();
+    }
+
+    /**
+     * Add event to the index.
+     *
+     * @param evt Event.
+     * @param key key.
+     * @param sync Sync.
+     * @throws GridException If failed.
+     */
+    protected abstract void add(E evt, K key, StreamerIndexUpdateSync sync) throws GridException;
+
+    /**
+     * Remove event from the index.
+     *
+     * @param evt Event.
+     * @param key Key.
+     * @param sync Sync.
+     * @throws GridException If failed.
+     */
+    protected abstract void remove(E evt, K key, StreamerIndexUpdateSync sync) throws GridException;
+
+    /**
+     * Lock updates on particular key.
+     *
+     * @param key Key.
+     * @param sync Sync.
+     * @throws GridException If failed.
+     */
+    private void lockKey(K key, StreamerIndexUpdateSync sync) throws GridException {
+        assert key != null;
+        assert sync != null;
+
+        while (true) {
+            StreamerIndexUpdateSync old = locks.putIfAbsent(key, sync);
+
+            if (old != null) {
+                try {
+                    old.await();
+                }
+                catch (InterruptedException e) {
+                    throw new GridException("Failed to lock on key (thread has been interrupted): " + key, e);
+                }
+
+                // No point to replace or remove sync here.
+                // Owner will first remove it, then will finish the sync.
+            }
+            else
+                break;
+        }
+    }
+
+    /**
+     * Unlock updates on particular key.
+     *
+     * @param key Key.
+     * @param sync Sync.
+     */
+    private void unlockKey(K key, StreamerIndexUpdateSync sync) {
+        assert key != null;
+
+        locks.remove(key, sync);
+    }
+
+    /**
+     * Lock updates on particular key.
+     *
+     * @param key Key.
+     * @param sync Sync.
+     * @throws GridException If failed.
+     */
+    protected void lockIndexKey(IndexKey<V> key, StreamerIndexUpdateSync sync) throws GridException {
+        assert key != null;
+        assert sync != null;
+        assert isUnique();
+
+        while (true) {
+            StreamerIndexUpdateSync old = idxLocks.putIfAbsent(key, sync);
+
+            if (old != null) {
+                try {
+                    old.await();
+                }
+                catch (InterruptedException e) {
+                    throw new GridException("Failed to lock on key (thread has been interrupted): " + key, e);
+                }
+
+                // No point to replace or remove sync here.
+                // Owner will first remove it, then will finish the sync.
+            }
+            else
+                break;
+        }
+    }
+
+    /**
+     * Unlock updates on particular key.
+     *
+     * @param key Key.
+     * @param sync Sync.
+     */
+    protected void unlockIndexKey(IndexKey<V> key, StreamerIndexUpdateSync sync) {
+        assert key != null;
+        assert isUnique();
+
+        idxLocks.remove(key, sync);
+    }
+
+    /**
+     * @param key Key,
+     * @param val Value.
+     * @param idxKey Index key.
+     * @param evt Event.
+     * @return Entry.
+     */
+    protected Entry<E, K, V> newEntry(K key, V val, @Nullable IndexKey<V> idxKey, E evt) {
+        StreamerIndexPolicy plc = getPolicy();
+
+        switch (plc) {
+            case EVENT_TRACKING_OFF:
+                return new NonTrackingEntry<>(key, val, idxKey);
+
+            case EVENT_TRACKING_ON:
+                return new EventTrackingEntry<>(addToCollection(null, evt), key, val, idxKey);
+
+            default:
+                assert plc == EVENT_TRACKING_ON_DEDUP : "Unknown policy: " + plc;
+
+                return new DedupTrackingEntry<>(addToMap(null, evt), key, val, idxKey);
+        }
+    }
+
+    /**
+     * @param oldEntry Old entry.
+     * @param key Key,
+     * @param val Value.
+     * @param idxKey Index key.
+     * @param evt Event.
+     * @return Entry.
+     */
+    protected Entry<E, K, V> addEvent(StreamerIndexEntry<E,K,V> oldEntry, K key, V val,
+        @Nullable IndexKey<V> idxKey, E evt) {
+        StreamerIndexPolicy plc = getPolicy();
+
+        switch (plc) {
+            case EVENT_TRACKING_OFF:
+                return new NonTrackingEntry<>(key, val, idxKey);
+
+            case EVENT_TRACKING_ON:
+                return new EventTrackingEntry<>(addToCollection(oldEntry.events(), evt), key, val, idxKey);
+
+            default:
+                assert plc == EVENT_TRACKING_ON_DEDUP : "Unknown policy: " + plc;
+
+                return new DedupTrackingEntry<>(addToMap(((DedupTrackingEntry<E, K, V>)oldEntry).rawEvents(), evt),
+                    key, val, idxKey);
+        }
+    }
+
+    /**
+     * @param oldEntry Old entry.
+     * @param key Key,
+     * @param val Value.
+     * @param idxKey Index key.
+     * @param evt Event.
+     * @return Entry.
+     */
+    protected Entry<E, K, V> removeEvent(StreamerIndexEntry<E, K, V> oldEntry, K key, V val,
+        @Nullable IndexKey<V> idxKey, E evt) {
+        StreamerIndexPolicy plc = getPolicy();
+
+        switch (plc) {
+            case EVENT_TRACKING_OFF:
+                return new NonTrackingEntry<>(key, val, idxKey);
+
+            case EVENT_TRACKING_ON:
+                Collection<E> oldEvts = oldEntry.events();
+
+                assert oldEvts != null; // Event tracking is on.
+
+                Collection<E> newEvts = removeFromCollection(oldEvts, evt);
+
+                return new EventTrackingEntry<>(newEvts != null ? newEvts : oldEvts, key, val, idxKey);
+
+            default:
+                assert plc == EVENT_TRACKING_ON_DEDUP : "Unknown policy: " + plc;
+
+                Map<E, Integer> oldMap = ((DedupTrackingEntry<E, K, V>)oldEntry).rawEvents();
+
+                assert oldMap != null; // Event tracking is on.
+
+                Map<E, Integer> newMap = removeFromMap(oldMap, evt);
+
+                return new DedupTrackingEntry<>(newMap != null ? newMap : oldMap, key, val, idxKey);
+        }
+    }
+
+    /**
+     * @param col Collection.
+     * @param evt Event.
+     * @return Cloned collection.
+     */
+    protected static <E> Collection<E> addToCollection(@Nullable Collection<E> col, E evt) {
+        PVector<E> res = col == null ? TreePVector.<E>empty() : (PVector<E>)col;
+
+        return res.plus(evt);
+    }
+
+    /**
+     * @param map Collection.
+     * @param evt Event.
+     * @return Cloned map.
+     */
+    protected static <E> Map<E, Integer> addToMap(@Nullable Map<E, Integer> map, E evt) {
+        HashPMap<E, Integer> res = map == null ? HashTreePMap.<E, Integer>empty() : (HashPMap<E, Integer>)map;
+
+        Integer cnt = res.get(evt);
+
+        return cnt != null ? res.minus(evt).plus(evt, cnt + 1) : res.plus(evt, 1);
+    }
+
+    /**
+     * @param col Collection.
+     * @param evt Event.
+     * @return Cloned collection.
+     */
+    @Nullable protected static <E> Collection<E> removeFromCollection(@Nullable Collection<E> col, E evt) {
+        if (col == null)
+            return null;
+
+        PVector<E> res = (PVector<E>)col;
+
+        res = res.minus(evt);
+
+        return res.isEmpty() ? null : res;
+    }
+
+    /**
+     * @param map Collection.
+     * @param evt Event.
+     * @return Cloned map.
+     */
+    @Nullable protected static <E> Map<E, Integer> removeFromMap(@Nullable Map<E, Integer> map, E evt) {
+        if (map == null)
+            return null;
+
+        HashPMap<E, Integer> res = (HashPMap<E, Integer>)map;
+
+        Integer cnt = res.get(evt);
+
+        return cnt == null ? res : cnt == 1 ? res.minus(evt) : res.minus(evt).plus(evt, cnt - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String updaterClass() {
+        return updater.getClass().getName();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean unique() {
+        return unique;
+    }
+
+    /** {@inheritDoc} */
+    @Override public StreamerIndexPolicy policy() {
+        return plc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return index0().size();
+    }
+
+    /**
+     * Validates that given index key has overridden equals and hashCode methods.
+     *
+     * @param key Index key.
+     * @throws IllegalArgumentException If validation fails.
+     */
+    private void validateIndexKey(@Nullable Object key) {
+        if (keyCheck) {
+            keyCheck = false;
+
+            if (key == null)
+                return;
+
+            if (!U.overridesEqualsAndHashCode(key))
+                throw new IllegalArgumentException("Index key must override hashCode() and equals() methods: " +
+                    key.getClass().getName());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StreamerIndexProviderAdapter.class, this);
+    }
+
+    /**
+     * Streamer window index key.
+     */
+    protected interface IndexKey<V> {
+        /**
+         * @return Value associated with this key.
+         */
+        public V value();
+    }
+
+    /**
+     * Utility method to safely get values from TrieMap.
+     * See: https://github.com/romix/java-concurrent-hash-trie-map/issues/4
+     *
+     * @param key Key.
+     * @param map Trie map.
+     * @return Value from map.
+     */
+    @SuppressWarnings({"IfMayBeConditional", "TypeMayBeWeakened"})
+    protected static <K, V> V trieGet(K key, TrieMap<K, V> map) {
+        Object r = map.get(key);
+
+        if(r instanceof Some)
+            return ((Some<V>)r).get ();
+        else if(r instanceof None)
+            return null;
+        else
+            return (V)r;
+    }
+
+    /**
+     * Streamer window index entry.
+     */
+    protected abstract static class Entry<E, K, V> implements StreamerIndexEntry<E, K, V> {
+        /** */
+        private final K key;
+
+        /** */
+        private final V val;
+
+        /** */
+        private final IndexKey<V> idxKey;
+
+        /**
+         * @param key Key.
+         * @param val Value.
+         * @param idxKey Key index.
+         */
+        Entry(K key, V val, @Nullable IndexKey<V> idxKey) {
+            assert key != null;
+            assert val != null;
+            assert idxKey == null || idxKey.value() == val : "Keys are invalid [idxKey=" + idxKey + ", val=" + val +']';
+
+            this.key = key;
+            this.val = val;
+            this.idxKey = idxKey;
+        }
+
+        /** {@inheritDoc} */
+        @Override public K key() {
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public V value() {
+            return val;
+        }
+
+        /**
+         * @return Internal key.
+         */
+        @Nullable public IndexKey<V> keyIndex() {
+            return idxKey;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            if (!(obj instanceof Entry))
+                return false;
+
+            StreamerIndexEntry<E, K, V> e = (StreamerIndexEntry<E, K, V>)obj;
+
+            return key.equals(e.key());
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Entry.class, this, "identity", System.identityHashCode(this));
+        }
+    }
+
+    /**
+     * Entry with index policy {@link StreamerIndexPolicy#EVENT_TRACKING_OFF}.
+     */
+    protected static class NonTrackingEntry<E, K, V> extends Entry<E, K, V> {
+        /**
+         * @param key Key.
+         * @param val Value.
+         * @param idxKey Key index.
+         */
+        public NonTrackingEntry(K key, V val, @Nullable IndexKey<V> idxKey) {
+            super(key, val, idxKey);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<E> events() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(NonTrackingEntry.class, this, super.toString());
+        }
+    }
+
+    /**
+     * Entry with index policy {@link StreamerIndexPolicy#EVENT_TRACKING_ON}.
+     */
+    protected static class EventTrackingEntry<E, K, V> extends Entry<E, K, V> {
+        /** */
+        private final Collection<E> evts;
+
+        /**
+         * @param evts Events.
+         * @param key Key.
+         * @param val Value.
+         * @param idxKey Key index.
+         */
+        public EventTrackingEntry(Collection<E> evts, K key, V val, @Nullable IndexKey<V> idxKey) {
+            super(key, val, idxKey);
+
+            assert evts == null || !evts.isEmpty() : "Invalid events: " + evts;
+
+            this.evts = evts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<E> events() {
+            return evts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(EventTrackingEntry.class, this, "evtCnt", evts.size(), "parent", super.toString());
+        }
+    }
+
+    /**
+     * Entry with index policy {@link StreamerIndexPolicy#EVENT_TRACKING_ON_DEDUP}.
+     */
+    protected static class DedupTrackingEntry<E, K, V> extends Entry<E, K, V> {
+        /** */
+        private final Map<E, Integer> evts;
+
+        /**
+         * @param evts Events.
+         * @param key Key.
+         * @param val Value.
+         * @param idxKey Key index.
+         */
+        public DedupTrackingEntry(Map<E, Integer> evts, K key, V val, @Nullable IndexKey<V> idxKey) {
+            super(key, val, idxKey);
+
+            assert evts == null || !evts.isEmpty() : "Invalid events: " + evts;
+
+            this.evts = evts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<E> events() {
+            return Collections.unmodifiableSet(evts.keySet());
+        }
+
+        /**
+         * @return Events.
+         */
+        @Nullable public Map<E, Integer> rawEvents() {
+            return evts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DedupTrackingEntry.class, this, "evtCnt", evts.size(), "parent", super.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProviderMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProviderMBean.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProviderMBean.java
new file mode 100644
index 0000000..cf89c29
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProviderMBean.java
@@ -0,0 +1,66 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.streamer.index;
+
+import org.apache.ignite.mbean.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Streamer window index provider MBean.
+ */
+public interface StreamerIndexProviderMBean {
+    /**
+     * Index name.
+     *
+     * @return Index name.
+     */
+    @IgniteMBeanDescription("Index name.")
+    @Nullable public String name();
+
+    /**
+     * Gets index updater class name.
+     *
+     * @return Index updater class.
+     */
+    @IgniteMBeanDescription("Index updater class name.")
+    public String updaterClass();
+
+    /**
+     * Gets index unique flag.
+     *
+     * @return Index unique flag.
+     */
+    @IgniteMBeanDescription("Index unique flag.")
+    public boolean unique();
+
+    /**
+     * Returns {@code true} if index supports sorting and therefore can perform range operations.
+     *
+     * @return Index sorted flag.
+     */
+    @IgniteMBeanDescription("Index sorted flag.")
+    public boolean sorted();
+
+    /**
+     * Gets index policy.
+     *
+     * @return Index policy.
+     */
+    @IgniteMBeanDescription("Index policy.")
+    public StreamerIndexPolicy policy();
+
+    /**
+     * Gets current index size.
+     *
+     * @return Current index size.
+     */
+    @IgniteMBeanDescription("Current index size.")
+    public int size();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexUpdateSync.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexUpdateSync.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexUpdateSync.java
new file mode 100644
index 0000000..dfa761c
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexUpdateSync.java
@@ -0,0 +1,69 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.streamer.index;
+
+import org.gridgain.grid.util.typedef.internal.*;
+
+/**
+ * Streamer index update synchronizer.
+ * <p>
+ * Used in {@link StreamerIndexProvider} to synchronize
+ * operations on window index.
+ *
+ * @see StreamerIndexProvider
+ *
+ */
+public class StreamerIndexUpdateSync {
+    /** */
+    private volatile int res;
+
+    /**
+     * Waits for a notification from another thread, which
+     * should call {@link #finish(int)} with an operation result.
+     * That result is returned by this method.
+     *
+     * @return Operation results, passed to {@link #finish(int)}.
+     * @throws InterruptedException If wait was interrupted.
+     */
+    public int await() throws InterruptedException {
+        int res0 = res;
+
+        if (res0 == 0) {
+            synchronized (this) {
+                while ((res0 = res) == 0)
+                    wait();
+            }
+        }
+
+        assert res0 != 0;
+
+        return res0;
+    }
+
+    /**
+     * Notifies all waiting threads to finish waiting.
+     *
+     * @param res Operation result to return from {@link #await()}.
+     */
+    public void finish(int res) {
+        assert res != 0;
+
+        synchronized (this) {
+            this.res = res;
+
+            notifyAll();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StreamerIndexUpdateSync.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexUpdater.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexUpdater.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexUpdater.java
new file mode 100644
index 0000000..77177f9
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexUpdater.java
@@ -0,0 +1,80 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.streamer.index;
+
+import org.gridgain.grid.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Index updater. The main responsibility of index updater is to maintain index values
+ * up to date whenever events are added or removed from window.
+ * <p>
+ * Updater is provided to index provider in configuration usually via
+ * {@link StreamerIndexProviderAdapter#setUpdater(StreamerIndexUpdater)} method.
+ */
+public interface StreamerIndexUpdater<E, K, V> {
+    /**
+     * Given an event, extract index key. For example, if you have a 'Person' object
+     * with field 'age' and need to index based on this field, then this method
+     * should return the value of age field.
+     * <p>
+     * If {@code null} is returned then event will be ignored by the index.
+     *
+     * @param evt Event being added or removed from the window.
+     * @return Index key for this event.
+     */
+    @Nullable public K indexKey(E evt);
+
+    /**
+     * Gets initial value for the index or {@code null} if event should be ignored.
+     * This method is called every time when an entry is added to the window in
+     * order to get initial value for given key.
+     *
+     * @param evt Event being added to or removed from window.
+     * @param key Index key return by {@link #indexKey(Object)} method.
+     * @return Initial value for given key, if {@code null} then event will be
+     *      ignored and index entry will not be created.
+     */
+    @Nullable public V initialValue(E evt, K key);
+
+    /**
+     * Callback invoked whenever an event is being added to the window. Given a key and
+     * a current index value for this key, the implementation should return the new
+     * value for this key. If returned value is {@code null}, then current entry will
+     * be removed from the index.
+     * <p>
+     * If index is sorted, then sorting happens based on the returned value.
+     *
+     * @param entry Current index entry.
+     * @param evt New event.
+     * @return New index value for given key, if {@code null}, then current
+     *      index entry will be removed the index.
+     * @throws GridException If entry should not be added to index (e.g. if uniqueness is violated).
+     */
+    @Nullable public V onAdded(StreamerIndexEntry<E, K, V> entry, E evt) throws GridException;
+
+    /**
+     * Callback invoked whenever an event is being removed from the window and has
+     * index entry for given key. If there was no entry for given key, then
+     * {@code onRemoved()} will not be called.
+     * <p>
+     * Given a key and a current index value for this key, the implementation should return the new
+     * value for this key. If returned value is {@code null}, then current entry will
+     * be removed from the index.
+     * <p>
+     * If index is sorted, then sorting happens based on the returned value.
+     *
+     * @param entry Current index entry.
+     * @param evt Event being removed from the window.
+     * @return New index value for given key, if {@code null}, then current
+     *      index entry will be removed the index.
+     */
+    @Nullable public V onRemoved(StreamerIndexEntry<E, K, V> entry, E evt);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/hash/GridStreamerHashIndexProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/hash/GridStreamerHashIndexProvider.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/hash/GridStreamerHashIndexProvider.java
deleted file mode 100644
index e6a8618..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/hash/GridStreamerHashIndexProvider.java
+++ /dev/null
@@ -1,493 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index.hash;
-
-import com.romix.scala.collection.concurrent.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.streamer.index.*;
-import org.gridgain.grid.streamer.index.tree.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-import static org.gridgain.grid.streamer.index.GridStreamerIndexPolicy.*;
-
-/**
- * Hash index implementation of a {@link GridStreamerIndexProvider}.
- * <p>
- * This implementation uses a concurrent hash map, which is extremely
- * efficient for CRUD operations. It does not, however, maintain the
- * ordering of entries, so, operations which imply ordering are not
- * supported.
- * <p>
- * If ordering is required, consider using {@link GridStreamerTreeIndexProvider}.
- *
- * @see GridStreamerTreeIndexProvider
- *
- */
-public class GridStreamerHashIndexProvider<E, K, V> extends GridStreamerIndexProviderAdapter<E, K, V> {
-    /** */
-    private TrieMap<K, Entry<E, K, V>> key2Entry;
-
-    /** */
-    private final ThreadLocal<State<E, K, V>> state = new ThreadLocal<>();
-
-    /** {@inheritDoc} */
-    @Override protected GridStreamerIndex<E, K, V> index0() {
-        return new Index<>();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void initialize() {
-        key2Entry = new TrieMap<>();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void reset0() {
-        // This will recreate maps.
-        initialize();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void add(E evt, K key, GridStreamerIndexUpdateSync sync) throws GridException {
-        State<E, K, V> state0 = state.get();
-
-        if (state0 != null)
-            throw new IllegalStateException("Previous operation has not been finished: " + state0);
-
-        Entry<E, K, V> oldEntry = trieGet(key, key2Entry);
-
-        GridStreamerIndexUpdater<E, K, V> updater = getUpdater();
-
-        if (oldEntry == null) {
-            V val = updater.initialValue(evt, key);
-
-            if (val == null)
-                return; // Ignore event.
-
-            state0 = new State<>(null, null, false);
-
-            state.set(state0);
-
-            Entry<E, K, V> newEntry = newEntry(key, val, null, evt);
-
-            // Save new entry to state.
-            state0.newEntry(newEntry);
-
-            // Put new entry.
-            Entry<E, K, V> rmv = key2Entry.put(key, newEntry);
-
-            assert rmv == null;
-
-            // Update passed.
-            state0.finished(true);
-        }
-        else {
-            if (isUnique())
-                throw new GridException("Index unique key violation [evt=" + evt + ", key=" + key + ']');
-
-            V val = updater.onAdded(oldEntry, evt);
-
-            if (val == null) {
-                remove(evt, key, sync);
-
-                return;
-            }
-
-            state0 = new State<>(oldEntry, null, false);
-
-            state.set(state0);
-
-            Entry<E, K, V> newEntry = addEvent(oldEntry, key, val, null, evt);
-
-            // Save new entry to state.
-            state0.newEntry(newEntry);
-
-            // Replace former entry with the new one.
-            Entry<E, K, V> rmv = key2Entry.put(key, newEntry);
-
-            assert rmv != null;
-
-            // Update passed.
-            state0.finished(true);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void remove(E evt, K key, GridStreamerIndexUpdateSync sync) throws GridException {
-        State<E, K, V> state0 = state.get();
-
-        if (state0 != null)
-            throw new IllegalStateException("Previous operation has not been finished: " + state0);
-
-        Entry<E, K, V> oldEntry = trieGet(key, key2Entry);
-
-        if (oldEntry == null)
-            return;
-
-        GridStreamerIndexUpdater<E, K, V> updater = getUpdater();
-
-        V val = updater.onRemoved(oldEntry, evt);
-
-        if (val == null) {
-            state0 = new State<>(oldEntry, null, false);
-
-            state.set(state0);
-
-            boolean b = key2Entry.remove(key, oldEntry);
-
-            assert b;
-
-            state0.finished(true);
-        }
-        else {
-            state0 = new State<>(oldEntry, null, false);
-
-            state.set(state0);
-
-            Entry<E, K, V> newEntry = removeEvent(oldEntry, key, val, null, evt);
-
-            // Save new entry to state.
-            state0.newEntry(newEntry);
-
-            // Replace former entry with the new one.
-            Entry<E, K, V> rmv = key2Entry.put(key, newEntry);
-
-            assert rmv != null;
-
-            state0.finished(true);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void endUpdate0(GridStreamerIndexUpdateSync sync, E evt, K key, boolean rollback) {
-        State<E, K, V> state0 = state.get();
-
-        if (state0 == null)
-            return;
-
-        state.remove();
-
-        if (rollback && state0.finished()) {
-            Entry<E, K, V> oldEntry = state0.oldEntry();
-            Entry<E, K, V> newEntry = state0.newEntry();
-
-            // Rollback after index was updated.
-            if (oldEntry != null && newEntry != null) {
-                boolean b = key2Entry.replace(key, newEntry, oldEntry);
-
-                assert b;
-            }
-            else if (newEntry == null) {
-                // Old was removed. Need to put it back.
-                Entry<E, K, V> old = key2Entry.put(key, oldEntry);
-
-                assert old == null;
-            }
-            else {
-                assert oldEntry == null;
-
-                // New entry was added. Remove it.
-                boolean b = key2Entry.remove(key, newEntry);
-
-                assert b;
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean sorted() {
-        return false;
-    }
-
-    /**
-     *
-     */
-    private class Index<I extends IndexKey<V>> implements GridStreamerIndex<E, K, V> {
-        /** */
-        private final TrieMap<K, Entry<E, K, V>> key2Entry0 = key2Entry.readOnlySnapshot();
-
-        /** */
-        private final int evtsCnt = eventsCount();
-
-        /** {@inheritDoc} */
-        @Nullable @Override public String name() {
-            return getName();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean unique() {
-            return isUnique();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean sorted() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridStreamerIndexPolicy policy() {
-            return getPolicy();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int size() {
-            return key2Entry0.size();
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public GridStreamerIndexEntry<E, K, V> entry(K key) {
-            A.notNull(key, "key");
-
-            return trieGet(key, key2Entry0);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<GridStreamerIndexEntry<E, K, V>> entries(int cnt) {
-            A.ensure(cnt >= 0, "cnt >= 0");
-
-            Collection vals = Collections.unmodifiableCollection(key2Entry0.values());
-
-            return (Collection<GridStreamerIndexEntry<E, K, V>>)(cnt == 0 ? vals : F.limit(vals, cnt));
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<K> keySet(int cnt) {
-            A.ensure(cnt >= 0, "cnt >= 0");
-
-            return cnt == 0 ? Collections.unmodifiableSet(key2Entry0.keySet()) :
-                F.limit(key2Entry0.keySet(), cnt);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<V> values(int cnt) {
-            Collection<GridStreamerIndexEntry<E, K, V>> col = entries(cnt);
-
-            return F.viewReadOnly(col, entryToVal);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<E> events(int cnt) {
-            A.ensure(cnt >= 0, "cnt >= 0");
-
-            if (getPolicy() == EVENT_TRACKING_OFF)
-                throw new IllegalStateException("Event tracking is off: " + this);
-
-            Collection<E> evts = new AbstractCollection<E>() {
-                @NotNull @Override public Iterator<E> iterator() {
-                    return new Iterator<E>() {
-                        private final Iterator<Entry<E, K, V>> entryIter = key2Entry0.values().iterator();
-
-                        private Iterator<E> evtIter;
-
-                        private boolean moved = true;
-
-                        private boolean more;
-
-                        @Override public boolean hasNext() {
-                            if (!moved)
-                                return more;
-
-                            moved = false;
-
-                            if (evtIter != null && evtIter.hasNext())
-                                return more = true;
-
-                            while (entryIter.hasNext()) {
-                                evtIter = eventsIterator(entryIter.next());
-
-                                if (evtIter.hasNext())
-                                    return more = true;
-                            }
-
-                            return more = false;
-                        }
-
-                        @Override public E next() {
-                            if (hasNext()) {
-                                moved = true;
-
-                                return evtIter.next();
-                            }
-
-                            throw new NoSuchElementException();
-                        }
-
-                        @Override public void remove() {
-                            assert false;
-                        }
-                    };
-                }
-
-                @Override public int size() {
-                    return evtsCnt;
-                }
-
-                /**
-                 * @param entry Entry.
-                 * @return Events iterator.
-                 */
-                @SuppressWarnings("fallthrough")
-                Iterator<E> eventsIterator(GridStreamerIndexEntry<E,K,V> entry) {
-                    switch (getPolicy()) {
-                        case EVENT_TRACKING_ON:
-                        case EVENT_TRACKING_ON_DEDUP:
-                            Collection<E> evts = entry.events();
-
-                            assert evts != null;
-
-                            return evts.iterator();
-
-                        default:
-                            assert false;
-
-                            throw new IllegalStateException("Event tracking is off: " + Index.this);
-                    }
-                }
-            };
-
-
-            return cnt == 0 ? evts : F.limit(evts, cnt);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<GridStreamerIndexEntry<E, K, V>> entrySet(V val) {
-            return entrySet(true, val, true, val, true);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<GridStreamerIndexEntry<E, K, V>> entrySet(final boolean asc, @Nullable final V fromVal,
-            final boolean fromIncl, @Nullable final V toVal, final boolean toIncl) {
-            throw new UnsupportedOperationException("Operation is not supported on hash index.");
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<K> keySet(V val) {
-            throw new UnsupportedOperationException("Operation is not supported on hash index.");
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<K> keySet(final boolean asc, @Nullable final V fromVal, final boolean fromIncl,
-            @Nullable final V toVal, final boolean toIncl) {
-            throw new UnsupportedOperationException("Operation is not supported on hash index.");
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<V> values(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal,
-            boolean toIncl) {
-            throw new UnsupportedOperationException("Operation is not supported on hash index.");
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<E> events(V val) {
-            throw new UnsupportedOperationException("Operation is not supported on hash index.");
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<E> events(final boolean asc, @Nullable final V fromVal, final boolean fromIncl,
-            @Nullable final V toVal, final boolean toIncl) {
-            throw new UnsupportedOperationException("Operation is not supported on hash index.");
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public GridStreamerIndexEntry<E, K, V> firstEntry() {
-            throw new UnsupportedOperationException("Operation is not supported on hash index.");
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public GridStreamerIndexEntry<E, K, V> lastEntry() {
-            throw new UnsupportedOperationException("Operation is not supported on hash index.");
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator<GridStreamerIndexEntry<E, K, V>> iterator() {
-            return entries(0).iterator();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(Index.class, this, "provider", GridStreamerHashIndexProvider.this);
-        }
-    }
-
-    /**
-     *
-     */
-    private static class State<E, K, V> {
-        /** */
-        private Entry<E, K, V> oldEntry;
-
-        /** */
-        private Entry<E, K, V> newEntry;
-
-        /** */
-        private boolean finished;
-
-        /**
-         * @param oldEntry Old.
-         * @param newEntry New.
-         * @param finished Finished.
-         */
-        private State(@Nullable Entry<E, K, V> oldEntry, @Nullable Entry<E, K, V> newEntry, boolean finished) {
-            this.oldEntry = oldEntry;
-            this.newEntry = newEntry;
-            this.finished = finished;
-        }
-
-        /**
-         * @return Old.
-         */
-        Entry<E, K, V> oldEntry() {
-            return oldEntry;
-        }
-
-        /**
-         * @param oldEntry Old.
-         */
-        void oldEntry(Entry<E, K, V> oldEntry) {
-            this.oldEntry = oldEntry;
-        }
-
-        /**
-         * @return New.
-         */
-        Entry<E, K, V> newEntry() {
-            return newEntry;
-        }
-
-        /**
-         * @param newEntry New.
-         */
-        void newEntry(Entry<E, K, V> newEntry) {
-            this.newEntry = newEntry;
-        }
-
-        /**
-         * @return Finished.
-         */
-        boolean finished() {
-            return finished;
-        }
-
-        /**
-         * @param finished Finished.
-         */
-        void finished(boolean finished) {
-            this.finished = finished;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(State.class, this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/hash/StreamerHashIndexProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/hash/StreamerHashIndexProvider.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/hash/StreamerHashIndexProvider.java
new file mode 100644
index 0000000..8b7abc9
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/hash/StreamerHashIndexProvider.java
@@ -0,0 +1,492 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.streamer.index.hash;
+
+import com.romix.scala.collection.concurrent.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.streamer.index.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+import static org.gridgain.grid.streamer.index.StreamerIndexPolicy.*;
+
+/**
+ * Hash index implementation of a {@link org.gridgain.grid.streamer.index.StreamerIndexProvider}.
+ * <p>
+ * This implementation uses a concurrent hash map, which is extremely
+ * efficient for CRUD operations. It does not, however, maintain the
+ * ordering of entries, so, operations which imply ordering are not
+ * supported.
+ * <p>
+ * If ordering is required, consider using {@link org.gridgain.grid.streamer.index.tree.StreamerTreeIndexProvider}.
+ *
+ * @see org.gridgain.grid.streamer.index.tree.StreamerTreeIndexProvider
+ *
+ */
+public class StreamerHashIndexProvider<E, K, V> extends StreamerIndexProviderAdapter<E, K, V> {
+    /** */
+    private TrieMap<K, Entry<E, K, V>> key2Entry;
+
+    /** */
+    private final ThreadLocal<State<E, K, V>> state = new ThreadLocal<>();
+
+    /** {@inheritDoc} */
+    @Override protected StreamerIndex<E, K, V> index0() {
+        return new Index<>();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize() {
+        key2Entry = new TrieMap<>();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reset0() {
+        // This will recreate maps.
+        initialize();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void add(E evt, K key, StreamerIndexUpdateSync sync) throws GridException {
+        State<E, K, V> state0 = state.get();
+
+        if (state0 != null)
+            throw new IllegalStateException("Previous operation has not been finished: " + state0);
+
+        Entry<E, K, V> oldEntry = trieGet(key, key2Entry);
+
+        StreamerIndexUpdater<E, K, V> updater = getUpdater();
+
+        if (oldEntry == null) {
+            V val = updater.initialValue(evt, key);
+
+            if (val == null)
+                return; // Ignore event.
+
+            state0 = new State<>(null, null, false);
+
+            state.set(state0);
+
+            Entry<E, K, V> newEntry = newEntry(key, val, null, evt);
+
+            // Save new entry to state.
+            state0.newEntry(newEntry);
+
+            // Put new entry.
+            Entry<E, K, V> rmv = key2Entry.put(key, newEntry);
+
+            assert rmv == null;
+
+            // Update passed.
+            state0.finished(true);
+        }
+        else {
+            if (isUnique())
+                throw new GridException("Index unique key violation [evt=" + evt + ", key=" + key + ']');
+
+            V val = updater.onAdded(oldEntry, evt);
+
+            if (val == null) {
+                remove(evt, key, sync);
+
+                return;
+            }
+
+            state0 = new State<>(oldEntry, null, false);
+
+            state.set(state0);
+
+            Entry<E, K, V> newEntry = addEvent(oldEntry, key, val, null, evt);
+
+            // Save new entry to state.
+            state0.newEntry(newEntry);
+
+            // Replace former entry with the new one.
+            Entry<E, K, V> rmv = key2Entry.put(key, newEntry);
+
+            assert rmv != null;
+
+            // Update passed.
+            state0.finished(true);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void remove(E evt, K key, StreamerIndexUpdateSync sync) throws GridException {
+        State<E, K, V> state0 = state.get();
+
+        if (state0 != null)
+            throw new IllegalStateException("Previous operation has not been finished: " + state0);
+
+        Entry<E, K, V> oldEntry = trieGet(key, key2Entry);
+
+        if (oldEntry == null)
+            return;
+
+        StreamerIndexUpdater<E, K, V> updater = getUpdater();
+
+        V val = updater.onRemoved(oldEntry, evt);
+
+        if (val == null) {
+            state0 = new State<>(oldEntry, null, false);
+
+            state.set(state0);
+
+            boolean b = key2Entry.remove(key, oldEntry);
+
+            assert b;
+
+            state0.finished(true);
+        }
+        else {
+            state0 = new State<>(oldEntry, null, false);
+
+            state.set(state0);
+
+            Entry<E, K, V> newEntry = removeEvent(oldEntry, key, val, null, evt);
+
+            // Save new entry to state.
+            state0.newEntry(newEntry);
+
+            // Replace former entry with the new one.
+            Entry<E, K, V> rmv = key2Entry.put(key, newEntry);
+
+            assert rmv != null;
+
+            state0.finished(true);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void endUpdate0(StreamerIndexUpdateSync sync, E evt, K key, boolean rollback) {
+        State<E, K, V> state0 = state.get();
+
+        if (state0 == null)
+            return;
+
+        state.remove();
+
+        if (rollback && state0.finished()) {
+            Entry<E, K, V> oldEntry = state0.oldEntry();
+            Entry<E, K, V> newEntry = state0.newEntry();
+
+            // Rollback after index was updated.
+            if (oldEntry != null && newEntry != null) {
+                boolean b = key2Entry.replace(key, newEntry, oldEntry);
+
+                assert b;
+            }
+            else if (newEntry == null) {
+                // Old was removed. Need to put it back.
+                Entry<E, K, V> old = key2Entry.put(key, oldEntry);
+
+                assert old == null;
+            }
+            else {
+                assert oldEntry == null;
+
+                // New entry was added. Remove it.
+                boolean b = key2Entry.remove(key, newEntry);
+
+                assert b;
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean sorted() {
+        return false;
+    }
+
+    /**
+     *
+     */
+    private class Index<I extends IndexKey<V>> implements StreamerIndex<E, K, V> {
+        /** */
+        private final TrieMap<K, Entry<E, K, V>> key2Entry0 = key2Entry.readOnlySnapshot();
+
+        /** */
+        private final int evtsCnt = eventsCount();
+
+        /** {@inheritDoc} */
+        @Nullable @Override public String name() {
+            return getName();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean unique() {
+            return isUnique();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean sorted() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public StreamerIndexPolicy policy() {
+            return getPolicy();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int size() {
+            return key2Entry0.size();
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public StreamerIndexEntry<E, K, V> entry(K key) {
+            A.notNull(key, "key");
+
+            return trieGet(key, key2Entry0);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<StreamerIndexEntry<E, K, V>> entries(int cnt) {
+            A.ensure(cnt >= 0, "cnt >= 0");
+
+            Collection vals = Collections.unmodifiableCollection(key2Entry0.values());
+
+            return (Collection<StreamerIndexEntry<E, K, V>>)(cnt == 0 ? vals : F.limit(vals, cnt));
+        }
+
+        /** {@inheritDoc} */
+        @Override public Set<K> keySet(int cnt) {
+            A.ensure(cnt >= 0, "cnt >= 0");
+
+            return cnt == 0 ? Collections.unmodifiableSet(key2Entry0.keySet()) :
+                F.limit(key2Entry0.keySet(), cnt);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<V> values(int cnt) {
+            Collection<StreamerIndexEntry<E, K, V>> col = entries(cnt);
+
+            return F.viewReadOnly(col, entryToVal);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<E> events(int cnt) {
+            A.ensure(cnt >= 0, "cnt >= 0");
+
+            if (getPolicy() == EVENT_TRACKING_OFF)
+                throw new IllegalStateException("Event tracking is off: " + this);
+
+            Collection<E> evts = new AbstractCollection<E>() {
+                @NotNull @Override public Iterator<E> iterator() {
+                    return new Iterator<E>() {
+                        private final Iterator<Entry<E, K, V>> entryIter = key2Entry0.values().iterator();
+
+                        private Iterator<E> evtIter;
+
+                        private boolean moved = true;
+
+                        private boolean more;
+
+                        @Override public boolean hasNext() {
+                            if (!moved)
+                                return more;
+
+                            moved = false;
+
+                            if (evtIter != null && evtIter.hasNext())
+                                return more = true;
+
+                            while (entryIter.hasNext()) {
+                                evtIter = eventsIterator(entryIter.next());
+
+                                if (evtIter.hasNext())
+                                    return more = true;
+                            }
+
+                            return more = false;
+                        }
+
+                        @Override public E next() {
+                            if (hasNext()) {
+                                moved = true;
+
+                                return evtIter.next();
+                            }
+
+                            throw new NoSuchElementException();
+                        }
+
+                        @Override public void remove() {
+                            assert false;
+                        }
+                    };
+                }
+
+                @Override public int size() {
+                    return evtsCnt;
+                }
+
+                /**
+                 * @param entry Entry.
+                 * @return Events iterator.
+                 */
+                @SuppressWarnings("fallthrough")
+                Iterator<E> eventsIterator(StreamerIndexEntry<E,K,V> entry) {
+                    switch (getPolicy()) {
+                        case EVENT_TRACKING_ON:
+                        case EVENT_TRACKING_ON_DEDUP:
+                            Collection<E> evts = entry.events();
+
+                            assert evts != null;
+
+                            return evts.iterator();
+
+                        default:
+                            assert false;
+
+                            throw new IllegalStateException("Event tracking is off: " + Index.this);
+                    }
+                }
+            };
+
+
+            return cnt == 0 ? evts : F.limit(evts, cnt);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Set<StreamerIndexEntry<E, K, V>> entrySet(V val) {
+            return entrySet(true, val, true, val, true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Set<StreamerIndexEntry<E, K, V>> entrySet(final boolean asc, @Nullable final V fromVal,
+            final boolean fromIncl, @Nullable final V toVal, final boolean toIncl) {
+            throw new UnsupportedOperationException("Operation is not supported on hash index.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public Set<K> keySet(V val) {
+            throw new UnsupportedOperationException("Operation is not supported on hash index.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public Set<K> keySet(final boolean asc, @Nullable final V fromVal, final boolean fromIncl,
+            @Nullable final V toVal, final boolean toIncl) {
+            throw new UnsupportedOperationException("Operation is not supported on hash index.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<V> values(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal,
+            boolean toIncl) {
+            throw new UnsupportedOperationException("Operation is not supported on hash index.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<E> events(V val) {
+            throw new UnsupportedOperationException("Operation is not supported on hash index.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<E> events(final boolean asc, @Nullable final V fromVal, final boolean fromIncl,
+            @Nullable final V toVal, final boolean toIncl) {
+            throw new UnsupportedOperationException("Operation is not supported on hash index.");
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public StreamerIndexEntry<E, K, V> firstEntry() {
+            throw new UnsupportedOperationException("Operation is not supported on hash index.");
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public StreamerIndexEntry<E, K, V> lastEntry() {
+            throw new UnsupportedOperationException("Operation is not supported on hash index.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<StreamerIndexEntry<E, K, V>> iterator() {
+            return entries(0).iterator();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Index.class, this, "provider", StreamerHashIndexProvider.this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class State<E, K, V> {
+        /** */
+        private Entry<E, K, V> oldEntry;
+
+        /** */
+        private Entry<E, K, V> newEntry;
+
+        /** */
+        private boolean finished;
+
+        /**
+         * @param oldEntry Old.
+         * @param newEntry New.
+         * @param finished Finished.
+         */
+        private State(@Nullable Entry<E, K, V> oldEntry, @Nullable Entry<E, K, V> newEntry, boolean finished) {
+            this.oldEntry = oldEntry;
+            this.newEntry = newEntry;
+            this.finished = finished;
+        }
+
+        /**
+         * @return Old.
+         */
+        Entry<E, K, V> oldEntry() {
+            return oldEntry;
+        }
+
+        /**
+         * @param oldEntry Old.
+         */
+        void oldEntry(Entry<E, K, V> oldEntry) {
+            this.oldEntry = oldEntry;
+        }
+
+        /**
+         * @return New.
+         */
+        Entry<E, K, V> newEntry() {
+            return newEntry;
+        }
+
+        /**
+         * @param newEntry New.
+         */
+        void newEntry(Entry<E, K, V> newEntry) {
+            this.newEntry = newEntry;
+        }
+
+        /**
+         * @return Finished.
+         */
+        boolean finished() {
+            return finished;
+        }
+
+        /**
+         * @param finished Finished.
+         */
+        void finished(boolean finished) {
+            this.finished = finished;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(State.class, this);
+        }
+    }
+}


Mime
View raw message