ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sevdoki...@apache.org
Subject [07/50] [abbrv] incubator-ignite git commit: sp-2 streaming cleanup
Date Sun, 22 Mar 2015 19:46:04 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/index/StreamerIndexUpdateSync.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/index/StreamerIndexUpdateSync.java b/modules/core/src/main/java/org/apache/ignite/streamer/index/StreamerIndexUpdateSync.java
deleted file mode 100644
index 838f9af..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/index/StreamerIndexUpdateSync.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.index;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-/**
- * Streamer index update synchronizer.
- * <p>
- * Used in {@link StreamerIndexProvider} to synchronize
- * operations on window index.
- *
- * @see StreamerIndexProvider
- *
- */
-public class StreamerIndexUpdateSync {
-    /** */
-    private volatile int res;
-
-    /**
-     * Waits for a notification from another thread, which
-     * should call {@link #finish(int)} with an operation result.
-     * That result is returned by this method.
-     *
-     * @return Operation results, passed to {@link #finish(int)}.
-     * @throws InterruptedException If wait was interrupted.
-     */
-    public int await() throws InterruptedException {
-        int res0 = res;
-
-        if (res0 == 0) {
-            synchronized (this) {
-                while ((res0 = res) == 0)
-                    wait();
-            }
-        }
-
-        assert res0 != 0;
-
-        return res0;
-    }
-
-    /**
-     * Notifies all waiting threads to finish waiting.
-     *
-     * @param res Operation result to return from {@link #await()}.
-     */
-    public void finish(int res) {
-        assert res != 0;
-
-        synchronized (this) {
-            this.res = res;
-
-            notifyAll();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(StreamerIndexUpdateSync.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/index/StreamerIndexUpdater.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/index/StreamerIndexUpdater.java b/modules/core/src/main/java/org/apache/ignite/streamer/index/StreamerIndexUpdater.java
deleted file mode 100644
index ebb3a97..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/index/StreamerIndexUpdater.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.index;
-
-import org.apache.ignite.*;
-import org.jetbrains.annotations.*;
-
-/**
- * Index updater. The main responsibility of index updater is to maintain index values
- * up to date whenever events are added or removed from window.
- * <p>
- * Updater is provided to index provider in configuration usually via
- * {@link StreamerIndexProviderAdapter#setUpdater(StreamerIndexUpdater)} method.
- */
-public interface StreamerIndexUpdater<E, K, V> {
-    /**
-     * Given an event, extract index key. For example, if you have a 'Person' object
-     * with field 'age' and need to index based on this field, then this method
-     * should return the value of age field.
-     * <p>
-     * If {@code null} is returned then event will be ignored by the index.
-     *
-     * @param evt Event being added or removed from the window.
-     * @return Index key for this event.
-     */
-    @Nullable public K indexKey(E evt);
-
-    /**
-     * Gets initial value for the index or {@code null} if event should be ignored.
-     * This method is called every time when an entry is added to the window in
-     * order to get initial value for given key.
-     *
-     * @param evt Event being added to or removed from window.
-     * @param key Index key return by {@link #indexKey(Object)} method.
-     * @return Initial value for given key, if {@code null} then event will be
-     *      ignored and index entry will not be created.
-     */
-    @Nullable public V initialValue(E evt, K key);
-
-    /**
-     * Callback invoked whenever an event is being added to the window. Given a key and
-     * a current index value for this key, the implementation should return the new
-     * value for this key. If returned value is {@code null}, then current entry will
-     * be removed from the index.
-     * <p>
-     * If index is sorted, then sorting happens based on the returned value.
-     *
-     * @param entry Current index entry.
-     * @param evt New event.
-     * @return New index value for given key, if {@code null}, then current
-     *      index entry will be removed the index.
-     * @throws IgniteException If entry should not be added to index (e.g. if uniqueness is violated).
-     */
-    @Nullable public V onAdded(StreamerIndexEntry<E, K, V> entry, E evt) throws IgniteException;
-
-    /**
-     * Callback invoked whenever an event is being removed from the window and has
-     * index entry for given key. If there was no entry for given key, then
-     * {@code onRemoved()} will not be called.
-     * <p>
-     * Given a key and a current index value for this key, the implementation should return the new
-     * value for this key. If returned value is {@code null}, then current entry will
-     * be removed from the index.
-     * <p>
-     * If index is sorted, then sorting happens based on the returned value.
-     *
-     * @param entry Current index entry.
-     * @param evt Event being removed from the window.
-     * @return New index value for given key, if {@code null}, then current
-     *      index entry will be removed the index.
-     */
-    @Nullable public V onRemoved(StreamerIndexEntry<E, K, V> entry, E evt);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/index/hash/StreamerHashIndexProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/index/hash/StreamerHashIndexProvider.java b/modules/core/src/main/java/org/apache/ignite/streamer/index/hash/StreamerHashIndexProvider.java
deleted file mode 100644
index 74427c4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/index/hash/StreamerHashIndexProvider.java
+++ /dev/null
@@ -1,500 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.index.hash;
-
-import com.romix.scala.collection.concurrent.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.streamer.index.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-import static org.apache.ignite.streamer.index.StreamerIndexPolicy.*;
-
-/**
- * Hash index implementation of a {@link org.apache.ignite.streamer.index.StreamerIndexProvider}.
- * <p>
- * This implementation uses a concurrent hash map, which is extremely
- * efficient for CRUD operations. It does not, however, maintain the
- * ordering of entries, so, operations which imply ordering are not
- * supported.
- * <p>
- * If ordering is required, consider using {@link org.apache.ignite.streamer.index.tree.StreamerTreeIndexProvider}.
- *
- * @see org.apache.ignite.streamer.index.tree.StreamerTreeIndexProvider
- *
- */
-public class StreamerHashIndexProvider<E, K, V> extends StreamerIndexProviderAdapter<E, K, V> {
-    /** */
-    private TrieMap<K, Entry<E, K, V>> key2Entry;
-
-    /** */
-    private final ThreadLocal<State<E, K, V>> state = new ThreadLocal<>();
-
-    /** {@inheritDoc} */
-    @Override protected StreamerIndex<E, K, V> index0() {
-        return new Index<>();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void initialize() {
-        key2Entry = new TrieMap<>();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void reset0() {
-        // This will recreate maps.
-        initialize();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void add(E evt, K key, StreamerIndexUpdateSync sync) {
-        State<E, K, V> state0 = state.get();
-
-        if (state0 != null)
-            throw new IllegalStateException("Previous operation has not been finished: " + state0);
-
-        Entry<E, K, V> oldEntry = trieGet(key, key2Entry);
-
-        StreamerIndexUpdater<E, K, V> updater = getUpdater();
-
-        if (oldEntry == null) {
-            V val = updater.initialValue(evt, key);
-
-            if (val == null)
-                return; // Ignore event.
-
-            state0 = new State<>(null, null, false);
-
-            state.set(state0);
-
-            Entry<E, K, V> newEntry = newEntry(key, val, null, evt);
-
-            // Save new entry to state.
-            state0.newEntry(newEntry);
-
-            // Put new entry.
-            Entry<E, K, V> rmv = key2Entry.put(key, newEntry);
-
-            assert rmv == null;
-
-            // Update passed.
-            state0.finished(true);
-        }
-        else {
-            if (isUnique())
-                throw new IgniteException("Index unique key violation [evt=" + evt + ", key=" + key + ']');
-
-            V val = updater.onAdded(oldEntry, evt);
-
-            if (val == null) {
-                remove(evt, key, sync);
-
-                return;
-            }
-
-            state0 = new State<>(oldEntry, null, false);
-
-            state.set(state0);
-
-            Entry<E, K, V> newEntry = addEvent(oldEntry, key, val, null, evt);
-
-            // Save new entry to state.
-            state0.newEntry(newEntry);
-
-            // Replace former entry with the new one.
-            Entry<E, K, V> rmv = key2Entry.put(key, newEntry);
-
-            assert rmv != null;
-
-            // Update passed.
-            state0.finished(true);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void remove(E evt, K key, StreamerIndexUpdateSync sync) {
-        State<E, K, V> state0 = state.get();
-
-        if (state0 != null)
-            throw new IllegalStateException("Previous operation has not been finished: " + state0);
-
-        Entry<E, K, V> oldEntry = trieGet(key, key2Entry);
-
-        if (oldEntry == null)
-            return;
-
-        StreamerIndexUpdater<E, K, V> updater = getUpdater();
-
-        V val = updater.onRemoved(oldEntry, evt);
-
-        if (val == null) {
-            state0 = new State<>(oldEntry, null, false);
-
-            state.set(state0);
-
-            boolean b = key2Entry.remove(key, oldEntry);
-
-            assert b;
-
-            state0.finished(true);
-        }
-        else {
-            state0 = new State<>(oldEntry, null, false);
-
-            state.set(state0);
-
-            Entry<E, K, V> newEntry = removeEvent(oldEntry, key, val, null, evt);
-
-            // Save new entry to state.
-            state0.newEntry(newEntry);
-
-            // Replace former entry with the new one.
-            Entry<E, K, V> rmv = key2Entry.put(key, newEntry);
-
-            assert rmv != null;
-
-            state0.finished(true);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void endUpdate0(StreamerIndexUpdateSync sync, E evt, K key, boolean rollback) {
-        State<E, K, V> state0 = state.get();
-
-        if (state0 == null)
-            return;
-
-        state.remove();
-
-        if (rollback && state0.finished()) {
-            Entry<E, K, V> oldEntry = state0.oldEntry();
-            Entry<E, K, V> newEntry = state0.newEntry();
-
-            // Rollback after index was updated.
-            if (oldEntry != null && newEntry != null) {
-                boolean b = key2Entry.replace(key, newEntry, oldEntry);
-
-                assert b;
-            }
-            else if (newEntry == null) {
-                // Old was removed. Need to put it back.
-                Entry<E, K, V> old = key2Entry.put(key, oldEntry);
-
-                assert old == null;
-            }
-            else {
-                assert oldEntry == null;
-
-                // New entry was added. Remove it.
-                boolean b = key2Entry.remove(key, newEntry);
-
-                assert b;
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean sorted() {
-        return false;
-    }
-
-    /**
-     *
-     */
-    private class Index<I extends IndexKey<V>> implements StreamerIndex<E, K, V> {
-        /** */
-        private final TrieMap<K, Entry<E, K, V>> key2Entry0 = key2Entry.readOnlySnapshot();
-
-        /** */
-        private final int evtsCnt = eventsCount();
-
-        /** {@inheritDoc} */
-        @Nullable @Override public String name() {
-            return getName();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean unique() {
-            return isUnique();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean sorted() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public StreamerIndexPolicy policy() {
-            return getPolicy();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int size() {
-            return key2Entry0.size();
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public StreamerIndexEntry<E, K, V> entry(K key) {
-            A.notNull(key, "key");
-
-            return trieGet(key, key2Entry0);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<StreamerIndexEntry<E, K, V>> entries(int cnt) {
-            A.ensure(cnt >= 0, "cnt >= 0");
-
-            Collection vals = Collections.unmodifiableCollection(key2Entry0.values());
-
-            return (Collection<StreamerIndexEntry<E, K, V>>)(cnt == 0 ? vals : F.limit(vals, cnt));
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<K> keySet(int cnt) {
-            A.ensure(cnt >= 0, "cnt >= 0");
-
-            return cnt == 0 ? Collections.unmodifiableSet(key2Entry0.keySet()) :
-                F.limit(key2Entry0.keySet(), cnt);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<V> values(int cnt) {
-            Collection<StreamerIndexEntry<E, K, V>> col = entries(cnt);
-
-            return F.viewReadOnly(col, entryToVal);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<E> events(int cnt) {
-            A.ensure(cnt >= 0, "cnt >= 0");
-
-            if (getPolicy() == EVENT_TRACKING_OFF)
-                throw new IllegalStateException("Event tracking is off: " + this);
-
-            Collection<E> evts = new AbstractCollection<E>() {
-                @NotNull @Override public Iterator<E> iterator() {
-                    return new Iterator<E>() {
-                        private final Iterator<Entry<E, K, V>> entryIter = key2Entry0.values().iterator();
-
-                        private Iterator<E> evtIter;
-
-                        private boolean moved = true;
-
-                        private boolean more;
-
-                        @Override public boolean hasNext() {
-                            if (!moved)
-                                return more;
-
-                            moved = false;
-
-                            if (evtIter != null && evtIter.hasNext())
-                                return more = true;
-
-                            while (entryIter.hasNext()) {
-                                evtIter = eventsIterator(entryIter.next());
-
-                                if (evtIter.hasNext())
-                                    return more = true;
-                            }
-
-                            return more = false;
-                        }
-
-                        @Override public E next() {
-                            if (hasNext()) {
-                                moved = true;
-
-                                return evtIter.next();
-                            }
-
-                            throw new NoSuchElementException();
-                        }
-
-                        @Override public void remove() {
-                            assert false;
-                        }
-                    };
-                }
-
-                @Override public int size() {
-                    return evtsCnt;
-                }
-
-                /**
-                 * @param entry Entry.
-                 * @return Events iterator.
-                 */
-                @SuppressWarnings("fallthrough")
-                Iterator<E> eventsIterator(StreamerIndexEntry<E,K,V> entry) {
-                    switch (getPolicy()) {
-                        case EVENT_TRACKING_ON:
-                        case EVENT_TRACKING_ON_DEDUP:
-                            Collection<E> evts = entry.events();
-
-                            assert evts != null;
-
-                            return evts.iterator();
-
-                        default:
-                            assert false;
-
-                            throw new IllegalStateException("Event tracking is off: " + Index.this);
-                    }
-                }
-            };
-
-
-            return cnt == 0 ? evts : F.limit(evts, cnt);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<StreamerIndexEntry<E, K, V>> entrySet(V val) {
-            return entrySet(true, val, true, val, true);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<StreamerIndexEntry<E, K, V>> entrySet(final boolean asc, @Nullable final V fromVal,
-            final boolean fromIncl, @Nullable final V toVal, final boolean toIncl) {
-            throw new UnsupportedOperationException("Operation is not supported on hash index.");
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<K> keySet(V val) {
-            throw new UnsupportedOperationException("Operation is not supported on hash index.");
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<K> keySet(final boolean asc, @Nullable final V fromVal, final boolean fromIncl,
-            @Nullable final V toVal, final boolean toIncl) {
-            throw new UnsupportedOperationException("Operation is not supported on hash index.");
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<V> values(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal,
-            boolean toIncl) {
-            throw new UnsupportedOperationException("Operation is not supported on hash index.");
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<E> events(V val) {
-            throw new UnsupportedOperationException("Operation is not supported on hash index.");
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<E> events(final boolean asc, @Nullable final V fromVal, final boolean fromIncl,
-            @Nullable final V toVal, final boolean toIncl) {
-            throw new UnsupportedOperationException("Operation is not supported on hash index.");
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public StreamerIndexEntry<E, K, V> firstEntry() {
-            throw new UnsupportedOperationException("Operation is not supported on hash index.");
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public StreamerIndexEntry<E, K, V> lastEntry() {
-            throw new UnsupportedOperationException("Operation is not supported on hash index.");
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator<StreamerIndexEntry<E, K, V>> iterator() {
-            return entries(0).iterator();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(Index.class, this, "provider", StreamerHashIndexProvider.this);
-        }
-    }
-
-    /**
-     *
-     */
-    private static class State<E, K, V> {
-        /** */
-        private Entry<E, K, V> oldEntry;
-
-        /** */
-        private Entry<E, K, V> newEntry;
-
-        /** */
-        private boolean finished;
-
-        /**
-         * @param oldEntry Old.
-         * @param newEntry New.
-         * @param finished Finished.
-         */
-        private State(@Nullable Entry<E, K, V> oldEntry, @Nullable Entry<E, K, V> newEntry, boolean finished) {
-            this.oldEntry = oldEntry;
-            this.newEntry = newEntry;
-            this.finished = finished;
-        }
-
-        /**
-         * @return Old.
-         */
-        Entry<E, K, V> oldEntry() {
-            return oldEntry;
-        }
-
-        /**
-         * @param oldEntry Old.
-         */
-        void oldEntry(Entry<E, K, V> oldEntry) {
-            this.oldEntry = oldEntry;
-        }
-
-        /**
-         * @return New.
-         */
-        Entry<E, K, V> newEntry() {
-            return newEntry;
-        }
-
-        /**
-         * @param newEntry New.
-         */
-        void newEntry(Entry<E, K, V> newEntry) {
-            this.newEntry = newEntry;
-        }
-
-        /**
-         * @return Finished.
-         */
-        boolean finished() {
-            return finished;
-        }
-
-        /**
-         * @param finished Finished.
-         */
-        void finished(boolean finished) {
-            this.finished = finished;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(State.class, this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/index/hash/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/index/hash/package-info.java b/modules/core/src/main/java/org/apache/ignite/streamer/index/hash/package-info.java
deleted file mode 100644
index cdb726b..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/index/hash/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains hash-based streamer index implementation.
- */
-package org.apache.ignite.streamer.index.hash;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/index/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/index/package-info.java b/modules/core/src/main/java/org/apache/ignite/streamer/index/package-info.java
deleted file mode 100644
index b20b74b..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/index/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains APIs for indexing of streamer windows.
- */
-package org.apache.ignite.streamer.index;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/index/tree/StreamerTreeIndexProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/index/tree/StreamerTreeIndexProvider.java b/modules/core/src/main/java/org/apache/ignite/streamer/index/tree/StreamerTreeIndexProvider.java
deleted file mode 100644
index 9c17791..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/index/tree/StreamerTreeIndexProvider.java
+++ /dev/null
@@ -1,953 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.index.tree;
-
-import com.romix.scala.collection.concurrent.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.snaptree.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.streamer.index.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.streamer.index.StreamerIndexPolicy.*;
-
-/**
- * Tree index implementation of a {@link org.apache.ignite.streamer.index.StreamerIndexProvider}.
- * <p>
- * The advantage of a tree index is that it maintains entries in a
- * sorted order, which is invaluable for many kinds of tasks, where
- * event ordering makes sense (like {@code GridStreamingPopularNumbersExample}).
- * The drawback is that the index entry values should be comparable to each other,
- * and you'll are likely to need to implement a custom comparator for values in
- * place of a default one.
- * <p>
- * If ordering is not required, consider using {@link org.apache.ignite.streamer.index.hash.StreamerHashIndexProvider}
- * instead, which is more efficient (O(1) vs. O(log(n))) and does not require
- * comparability.
- *
- * @see org.apache.ignite.streamer.index.hash.StreamerHashIndexProvider
- *
- */
-public class StreamerTreeIndexProvider<E, K, V> extends StreamerIndexProviderAdapter<E, K, V> {
-    /** */
-    private SnapTreeMap<IndexKey<V>, Entry<E, K, V>> idx;
-
-    /** */
-    private TrieMap<K, Entry<E, K, V>> key2Entry;
-
-    /** */
-    private final AtomicLong idxGen = new AtomicLong();
-
-    /** */
-    private Comparator<V> cmp;
-
-    /** */
-    private final ThreadLocal<State<E, K, V>> state = new ThreadLocal<>();
-
-    /**
-     * Sets comparator.
-     *
-     * @param cmp Comparator.
-     */
-    public void setComparator(Comparator<V> cmp) {
-        this.cmp = cmp;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected StreamerIndex<E, K, V> index0() {
-        return new Index<>();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void initialize() {
-        idx = cmp == null ? new SnapTreeMap<IndexKey<V>, Entry<E, K, V>>() :
-            new SnapTreeMap<IndexKey<V>, Entry<E, K, V>>(new Comparator<IndexKey<V>>() {
-                @Override public int compare(IndexKey<V> o1, IndexKey<V> o2) {
-                    int res = cmp.compare(o1.value(), o2.value());
-
-                    return res != 0 || isUnique() ? res :
-                        ((Key<V>)o1).seed > ((Key<V>)o2).seed ? 1 :
-                            ((Key<V>)o1).seed == ((Key<V>)o2).seed ? 0 : -1;
-                }
-            });
-
-        key2Entry = new TrieMap<>();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void reset0() {
-        // This will recreate maps.
-        initialize();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void add(E evt, K key, StreamerIndexUpdateSync sync) {
-        State<E, K, V> state0 = state.get();
-
-        if (state0 != null)
-            throw new IllegalStateException("Previous operation has not been finished: " + state0);
-
-        Entry<E, K, V> oldEntry = trieGet(key, key2Entry);
-
-        StreamerIndexUpdater<E, K, V> updater = getUpdater();
-
-        if (oldEntry == null) {
-            V val = updater.initialValue(evt, key);
-
-            if (val == null)
-                return; // Ignore event.
-
-            IndexKey<V> idxKey = nextKey(val);
-
-            state0 = new State<>(null, null, idxKey, null, false, false);
-
-            if (isUnique())
-                // Lock new key.
-                lockIndexKey(idxKey, sync);
-
-            state.set(state0);
-
-            Entry<E, K, V> newEntry = newEntry(key, val, idxKey, evt);
-
-            // Save new entry to state.
-            state0.newEntry(newEntry);
-
-            // Put new value to index.
-            Entry<E, K, V> old = idx.putIfAbsent(idxKey, newEntry);
-
-            if (isUnique()) {
-                if (old != null)
-                    throw new IgniteException("Index unique key violation [evt=" + evt + ", key=" + key +
-                        ", idxKey=" + idxKey + ']');
-            }
-            else
-                assert old == null;
-
-            // Put new entry.
-            Entry<E, K, V> rmv = key2Entry.put(key, newEntry);
-
-            assert rmv == null;
-
-            // Update passed.
-            state0.finished(true);
-        }
-        else {
-            V val = updater.onAdded(oldEntry, evt);
-
-            if (val == null) {
-                remove(evt, key, sync);
-
-                return;
-            }
-
-            IndexKey<V> newIdxKey = nextKey(val);
-
-            IndexKey<V> oldIdxKey = oldEntry.keyIndex();
-
-            assert oldIdxKey != null; // Shouldn't be null for tree index.
-
-            int order = compareKeys(oldIdxKey, newIdxKey);
-
-            state0 = new State<>(oldIdxKey, oldEntry, newIdxKey, null, false, order == 0);
-
-            if (isUnique()) {
-                if (order == 0)
-                    // Keys are equal.
-                    lockIndexKey(newIdxKey, sync);
-                else
-                    lockKeys(oldIdxKey, newIdxKey, order, sync);
-            }
-
-            state.set(state0);
-
-            Entry<E, K, V> newEntry = addEvent(oldEntry, key, val, newIdxKey, evt);
-
-            // Save new entry to state.
-            state0.newEntry(newEntry);
-
-            if (state0.keysEqual()) {
-                assert isUnique();
-
-                boolean b = idx.replace(newIdxKey, oldEntry, newEntry);
-
-                assert b;
-            }
-            else {
-                // Put new value to index with new key.
-                Entry<E, K, V> old = idx.putIfAbsent(newIdxKey, newEntry);
-
-                if (isUnique()) {
-                    if (old != null)
-                        throw new IgniteException("Index unique key violation [evt=" + evt + ", key=" + key +
-                            ", idxKey=" + newIdxKey + ']');
-                }
-                else
-                    assert old == null;
-
-                boolean rmv0 = idx.remove(oldIdxKey, oldEntry);
-
-                assert rmv0;
-            }
-
-            // Replace former entry with the new one.
-            boolean b = key2Entry.replace(key, oldEntry, newEntry);
-
-            assert b;
-
-            // Update passed.
-            state0.finished(true);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void remove(E evt, K key, StreamerIndexUpdateSync sync) {
-        State<E, K, V> state0 = state.get();
-
-        if (state0 != null)
-            throw new IllegalStateException("Previous operation has not been finished: " + state0);
-
-        Entry<E, K, V> oldEntry = trieGet(key, key2Entry);
-
-        if (oldEntry == null)
-            return;
-
-        StreamerIndexUpdater<E, K, V> updater = getUpdater();
-
-        V val = updater.onRemoved(oldEntry, evt);
-
-        IndexKey<V> oldIdxKey = oldEntry.keyIndex();
-
-        assert oldIdxKey != null; // Shouldn't be null for tree index.
-
-        if (val == null) {
-            state0 = new State<>(oldIdxKey, oldEntry, null, null, false, false);
-
-            if (isUnique())
-                // Lock old key.
-                lockIndexKey(oldIdxKey, sync);
-
-            state.set(state0);
-
-            boolean b = idx.remove(oldIdxKey, oldEntry);
-
-            assert b;
-
-            b = key2Entry.remove(key, oldEntry);
-
-            assert b;
-
-            state0.finished(true);
-        }
-        else {
-            IndexKey<V> newIdxKey = nextKey(val);
-
-            int order = compareKeys(oldIdxKey, newIdxKey);
-
-            state0 = new State<>(oldIdxKey, oldEntry, newIdxKey, null, false, order == 0);
-
-            if (isUnique()) {
-                if (order == 0)
-                    // Keys are equal.
-                    lockIndexKey(newIdxKey, sync);
-                else
-                    lockKeys(oldIdxKey, newIdxKey, order, sync);
-            }
-
-            state.set(state0);
-
-            Entry<E, K, V> newEntry = removeEvent(oldEntry, key, val, newIdxKey, evt);
-
-            // Save new entry to state.
-            state0.newEntry(newEntry);
-
-            if (state0.keysEqual()) {
-                assert isUnique();
-
-                boolean b = idx.replace(newIdxKey, oldEntry, newEntry);
-
-                assert b;
-            }
-            else {
-                // Put new value to index with new key.
-                Entry<E, K, V> old = idx.putIfAbsent(newIdxKey, newEntry);
-
-                if (isUnique()) {
-                    if (old != null)
-                        throw new IgniteException("Index unique key violation [evt=" + evt + ", key=" + key +
-                            ", idxKey=" + newIdxKey + ']');
-                }
-                else
-                    assert old == null;
-
-                boolean rmv0 = idx.remove(oldIdxKey, oldEntry);
-
-                assert rmv0;
-            }
-
-            // Replace former entry with the new one.
-            boolean b = key2Entry.replace(key, oldEntry, newEntry);
-
-            assert b;
-
-            state0.finished(true);
-        }
-    }
-
-    /**
-     * @param key1 Key.
-     * @param key2 Key.
-     * @param order Keys comparison result.
-     * @param sync Sync.
-     * @throws IgniteException If interrupted.
-     */
-    private void lockKeys(IndexKey<V> key1, IndexKey<V> key2, int order, StreamerIndexUpdateSync sync)
-        throws IgniteException {
-        assert isUnique();
-        assert key1 != null;
-        assert key2 != null;
-        assert order != 0;
-
-        boolean success = false;
-
-        try {
-            if (order > 0) {
-                lockIndexKey(key1, sync);
-                lockIndexKey(key2, sync);
-            }
-            else {
-                // Reverse order.
-                lockIndexKey(key2, sync);
-                lockIndexKey(key1, sync);
-            }
-
-            success = true;
-        }
-        finally {
-            if (!success) {
-                unlockIndexKey(key1, sync);
-                unlockIndexKey(key2, sync);
-            }
-        }
-    }
-
-    /**
-     * @param key1 Key.
-     * @param key2 Key.
-     * @return Comparison result.
-     */
-    private int compareKeys(IndexKey<V> key1, IndexKey<V> key2) {
-        assert key1 != null;
-        assert key2 != null;
-
-        return cmp != null ? cmp.compare(key1.value(), key2.value()) :
-            ((Comparable<V>)key1.value()).compareTo(key2.value());
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void endUpdate0(StreamerIndexUpdateSync sync, E evt, K key, boolean rollback) {
-        State<E, K, V> state0 = state.get();
-
-        if (state0 == null)
-            return;
-
-        state.remove();
-
-        IndexKey<V> oldIdxKey = state0.oldIndexKey();
-        Entry<E, K, V> oldEntry = state0.oldEntry();
-        IndexKey<V> newIdxKey = state0.newIndexKey();
-        Entry<E, K, V> newEntry = state0.newEntry();
-
-        if (rollback && state0.finished()) {
-            // Rollback after index was updated.
-            if (oldEntry != null && newEntry != null) {
-                if (state0.keysEqual()) {
-                    assert isUnique();
-
-                    boolean b = idx.replace(oldIdxKey, newEntry, oldEntry);
-
-                    assert b;
-                }
-                else {
-                    boolean b = idx.remove(newIdxKey, newEntry);
-
-                    assert b;
-
-                    Entry<E, K, V> old = idx.put(oldIdxKey, oldEntry);
-
-                    assert old == null;
-
-                    b = key2Entry.replace(key, newEntry, oldEntry);
-
-                    assert b;
-                }
-            }
-            else if (newEntry == null) {
-                // Old was removed. Need to put it back.
-                Entry<E, K, V> old = key2Entry.put(key, oldEntry);
-
-                assert old == null;
-
-                old = idx.put(oldIdxKey, oldEntry);
-
-                assert old == null;
-            }
-            else {
-                assert oldEntry == null;
-
-                // New entry was added. Remove it.
-                boolean b = idx.remove(newIdxKey, newEntry);
-
-                assert b;
-
-                b = key2Entry.remove(key, newEntry);
-
-                assert b;
-            }
-        }
-
-        // Unlock only if unique.
-        if (isUnique()) {
-            if (oldIdxKey != null)
-                unlockIndexKey(oldIdxKey, sync);
-
-            if (state0.keysEqual())
-                // No need to unlock second key.
-                return;
-
-            if (newIdxKey != null)
-                unlockIndexKey(newIdxKey, sync);
-        }
-    }
-
-    /**
-     * @param val Value.
-     * @return Index key.
-     */
-    protected IndexKey<V> nextKey(V val) {
-        return new Key<>(val, isUnique() ? 0 : idxGen.incrementAndGet(), cmp);
-    }
-
-    /**
-     * @param val Value.
-     * @param asc {@code True} if ascending.
-     * @param incl {@code True} if inclusive.
-     * @return Key for search.
-     */
-    private IndexKey<V> searchKeyFrom(V val, boolean asc, boolean incl) {
-        // (asc && incl) || (!asc && !incl) -> asc == incl
-        return new Key<>(val, asc == incl ? Long.MIN_VALUE : Long.MAX_VALUE, cmp);
-    }
-
-    /**
-     * @param val Value.
-     * @param asc {@code True} if ascending.
-     * @param incl {@code True} if inclusive.
-     * @return Key for search.
-     */
-    private IndexKey<V> searchKeyTo(V val, boolean asc, boolean incl) {
-        // (asc && incl) || (!asc && !incl) -> asc == incl
-        return new Key<>(val, asc == incl ? Long.MAX_VALUE : Long.MIN_VALUE, cmp);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean sorted() {
-        return true;
-    }
-
-    /**
-     *
-     */
-    private static class Key<V> implements Comparable<Key<V>>, IndexKey<V> {
-        /** */
-        private final V val;
-
-        /** */
-        private final long seed;
-
-        /** */
-        private final Comparator<V> cmp;
-
-        /**
-         * @param val Value.
-         * @param seed Seed.
-         * @param cmp Comparator.
-         */
-        private Key(V val, long seed, @Nullable Comparator<V> cmp) {
-            assert val != null;
-
-            this.val = val;
-            this.seed = seed;
-            this.cmp = cmp;
-        }
-
-        /** {@inheritDoc} */
-        @Override public V value() {
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int compareTo(Key<V> o) {
-            int res = cmp != null ? cmp.compare(val, o.val) : ((Comparable<V>)val).compareTo(o.val);
-
-            return res == 0 ? (seed < o.seed ? -1 : seed > o.seed ? 1 : 0) : res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return 31 * val.hashCode() + (int)(seed ^ (seed >>> 32));
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            if (obj == null || obj.getClass() != Key.class)
-                return false;
-
-            Key key = (Key)obj;
-
-            return seed == key.seed && val.equals(key.val);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(Key.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    private static class State<E, K, V> {
-        /** */
-        private IndexKey<V> oldIdxKey;
-
-        /** */
-        private Entry<E, K, V> oldEntry;
-
-        /** */
-        private IndexKey<V> newIdxKey;
-
-        /** */
-        private Entry<E, K, V> newEntry;
-
-        /** */
-        private boolean finished;
-
-        /** */
-        private final boolean keysEqual;
-
-        /**
-         * @param oldIdxKey Old index key.
-         * @param oldEntry Old entry.
-         * @param newIdxKey New Index key.
-         * @param newEntry New entry.
-         * @param finished Finished.
-         * @param keysEqual {@code True} if keys are equal.
-         */
-        private State(@Nullable IndexKey<V> oldIdxKey, @Nullable Entry<E, K, V> oldEntry, @Nullable IndexKey<V> newIdxKey,
-            @Nullable Entry<E, K, V> newEntry, boolean finished, boolean keysEqual) {
-            this.oldIdxKey = oldIdxKey;
-            this.oldEntry = oldEntry;
-            this.newIdxKey = newIdxKey;
-            this.newEntry = newEntry;
-            this.finished = finished;
-            this.keysEqual = keysEqual;
-        }
-
-        /**
-         * @return Old index entry.
-         */
-        IndexKey<V> oldIndexKey() {
-            return oldIdxKey;
-        }
-
-        /**
-         * @param oldIdxKey Old index key.
-         */
-        void oldIndexKey(IndexKey<V> oldIdxKey) {
-            this.oldIdxKey = oldIdxKey;
-        }
-
-        /**
-         * @return Old.
-         */
-        Entry<E, K, V> oldEntry() {
-            return oldEntry;
-        }
-
-        /**
-         * @param oldEntry Old.
-         */
-        void oldEntry(Entry<E, K, V> oldEntry) {
-            this.oldEntry = oldEntry;
-        }
-
-        /**
-         * @return New index key.
-         */
-        IndexKey<V> newIndexKey() {
-            return newIdxKey;
-        }
-
-        /**
-         * @param newIdxKey New index key.
-         */
-        void newIndexKey(IndexKey<V> newIdxKey) {
-            this.newIdxKey = newIdxKey;
-        }
-
-        /**
-         * @return New.
-         */
-        Entry<E, K, V> newEntry() {
-            return newEntry;
-        }
-
-        /**
-         * @param newEntry New.
-         */
-        void newEntry(Entry<E, K, V> newEntry) {
-            this.newEntry = newEntry;
-        }
-
-        /**
-         * @return Finished.
-         */
-        boolean finished() {
-            return finished;
-        }
-
-        /**
-         * @param finished Finished.
-         */
-        void finished(boolean finished) {
-            this.finished = finished;
-        }
-
-        /**
-         * @return {@code True} if both keys are not null and are equal (as comparables).
-         */
-        boolean keysEqual() {
-            return keysEqual;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(State.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    private class Index<I extends IndexKey<V>> implements StreamerIndex<E, K, V> {
-        /** */
-        private final TrieMap<K, Entry<E, K, V>> key2Entry0 = key2Entry.readOnlySnapshot();
-
-        /** */
-        private final SnapTreeMap<IndexKey<V>, Entry<E, K, V>> idx0 = idx.clone();
-
-        /** */
-        private final int evtsCnt = eventsCount();
-
-        /** {@inheritDoc} */
-        @Nullable @Override public String name() {
-            return getName();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean unique() {
-            return isUnique();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean sorted() {
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public StreamerIndexPolicy policy() {
-            return getPolicy();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int size() {
-            return key2Entry0.size();
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public StreamerIndexEntry<E, K, V> entry(K key) {
-            A.notNull(key, "key");
-
-            return trieGet(key, key2Entry0);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<StreamerIndexEntry<E, K, V>> entries(int cnt) {
-            Collection col = cnt >= 0 ? idx0.values() : idx0.descendingMap().values();
-
-            return (Collection<StreamerIndexEntry<E, K, V>>)(cnt == 0 ? Collections.unmodifiableCollection(col) :
-                F.limit(col, U.safeAbs(cnt)));
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<K> keySet(final int cnt) {
-            Set<K> col = new AbstractSet<K>() {
-                private Collection<K> entries = F.viewReadOnly(
-                    cnt >= 0 ? idx0.values() : idx0.descendingMap().values(),
-                    entryToKey);
-
-                @NotNull @Override public Iterator<K> iterator() {
-                    return entries.iterator();
-                }
-
-                @Override public int size() {
-                    return entries.size();
-                }
-            };
-
-            return cnt == 0 ? col : F.limit(col, U.safeAbs(cnt));
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<V> values(int cnt) {
-            Collection<StreamerIndexEntry<E, K, V>> col = entries(cnt);
-
-            return F.viewReadOnly(col, entryToVal);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<E> events(int cnt) {
-            Collection<E> evts = events(cnt >= 0, null, false, null, false);
-
-            return cnt == 0 ? evts : F.limit(evts, U.safeAbs(cnt));
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<StreamerIndexEntry<E, K, V>> entrySet(V val) {
-            return entrySet(true, val, true, val, true);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<StreamerIndexEntry<E, K, V>> entrySet(final boolean asc, @Nullable final V fromVal,
-            final boolean fromIncl, @Nullable final V toVal, final boolean toIncl) {
-            Set<StreamerIndexEntry<E, K, V>> set = new AbstractSet<StreamerIndexEntry<E, K, V>>() {
-                private Map<IndexKey<V>, Entry<E, K, V>> map = subMap(asc, fromVal, fromIncl, toVal, toIncl);
-
-                @NotNull @Override public Iterator<StreamerIndexEntry<E, K, V>> iterator() {
-                    Collection vals = map.values();
-
-                    return (Iterator<StreamerIndexEntry<E, K, V>>)vals.iterator();
-                }
-
-                @Override public int size() {
-                    return map.size();
-                }
-            };
-
-            return Collections.unmodifiableSet(set);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<K> keySet(V val) {
-            return keySet(true, val, true, val, true);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Set<K> keySet(final boolean asc, @Nullable final V fromVal, final boolean fromIncl,
-            @Nullable final V toVal, final boolean toIncl) {
-            Set<K> set = new AbstractSet<K>() {
-                private Map<IndexKey<V>, Entry<E, K, V>> map = subMap(asc, fromVal, fromIncl, toVal, toIncl);
-
-                @NotNull @Override public Iterator<K> iterator() {
-                    return F.iterator(map.values(), entryToKey, true);
-                }
-
-                @Override public int size() {
-                    return map.size();
-                }
-            };
-
-            return Collections.unmodifiableSet(set);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<V> values(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal,
-            boolean toIncl) {
-            Map<IndexKey<V>, Entry<E, K, V>> map = subMap(asc, fromVal, fromIncl, toVal, toIncl);
-
-            return F.viewReadOnly(map.values(), entryToVal);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<E> events(V val) {
-            A.notNull(val, "val");
-
-            return events(true, val, true, val, true);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<E> events(final boolean asc, @Nullable final V fromVal, final boolean fromIncl,
-            @Nullable final V toVal, final boolean toIncl) {
-            if (getPolicy() == EVENT_TRACKING_OFF)
-                throw new IllegalStateException("Event tracking is off: " + this);
-
-            Collection<E> evts = new AbstractCollection<E>() {
-                private final Map<IndexKey<V>, Entry<E, K, V>> map = subMap(asc, fromVal, fromIncl, toVal, toIncl);
-
-                private int size = -1;
-
-                @NotNull @Override public Iterator<E> iterator() {
-                    return new Iterator<E>() {
-                        private final Iterator<Entry<E, K, V>> entryIter = map.values().iterator();
-
-                        private Iterator<E> evtIter;
-
-                        private boolean moved = true;
-
-                        private boolean more;
-
-                        @Override public boolean hasNext() {
-                            if (!moved)
-                                return more;
-
-                            moved = false;
-
-                            if (evtIter != null && evtIter.hasNext())
-                                return more = true;
-
-                            while (entryIter.hasNext()) {
-                                evtIter = eventsIterator(entryIter.next());
-
-                                if (evtIter.hasNext())
-                                    return more = true;
-                            }
-
-                            return more = false;
-                        }
-
-                        @Override public E next() {
-                            if (hasNext()) {
-                                moved = true;
-
-                                return evtIter.next();
-                            }
-
-                            throw new NoSuchElementException();
-                        }
-
-                        @Override public void remove() {
-                            assert false;
-                        }
-                    };
-                }
-
-                @Override public int size() {
-                    return size != -1 ? size :
-                        fromVal == null && toVal == null ? (size = evtsCnt) : (size = F.size(iterator()));
-                }
-
-                /**
-                 * @param entry Entry.
-                 * @return Events iterator.
-                 */
-                @SuppressWarnings("fallthrough")
-                Iterator<E> eventsIterator(StreamerIndexEntry<E,K,V> entry) {
-                    switch (getPolicy()) {
-                        case EVENT_TRACKING_ON:
-                        case EVENT_TRACKING_ON_DEDUP:
-                            Collection<E> evts = entry.events();
-
-                            assert evts != null;
-
-                            return evts.iterator();
-
-                        default:
-                            assert false;
-
-                            throw new IllegalStateException("Event tracking is off: " + Index.this);
-                    }
-                }
-            };
-
-            return Collections.unmodifiableCollection(evts);
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public StreamerIndexEntry<E, K, V> firstEntry() {
-            return idx0.firstEntry().getValue();
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public StreamerIndexEntry<E, K, V> lastEntry() {
-            return idx0.lastEntry().getValue();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator<StreamerIndexEntry<E, K, V>> iterator() {
-            return entries(0).iterator();
-        }
-
-        /**
-         * @param asc Ascending.
-         * @param fromVal From.
-         * @param fromIncl Include from.
-         * @param toVal To.
-         * @param toIncl Include to.
-         * @return Map.
-         */
-        private Map<IndexKey<V>, Entry<E, K, V>> subMap(boolean asc, @Nullable V fromVal, boolean fromIncl,
-            @Nullable V toVal, boolean toIncl) {
-            if (fromVal != null && toVal != null) {
-                int cmpRes = cmp != null ? cmp.compare(toVal, fromVal) : ((Comparable<V>)toVal).compareTo(fromVal);
-
-                if ((asc && cmpRes < 0) || (!asc && cmpRes > 0))
-                    throw new IllegalArgumentException("Boundaries are invalid [asc=" + asc + ", fromVal=" + fromVal +
-                        ", toVal=" + toVal + ']');
-            }
-
-            if (idx0.isEmpty())
-                return Collections.emptyMap();
-
-            ConcurrentNavigableMap<IndexKey<V>,Entry<E,K,V>> map = asc ? idx0 : idx0.descendingMap();
-
-            if (fromVal == null) {
-                fromVal = map.firstKey().value();
-
-                fromIncl = true;
-            }
-
-            if (toVal == null) {
-                toVal = map.lastKey().value();
-
-                toIncl = true;
-            }
-
-            return map.subMap(searchKeyFrom(fromVal, asc, fromIncl), fromIncl, searchKeyTo(toVal, asc, toIncl), toIncl);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(Index.class, this, "provider", StreamerTreeIndexProvider.this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/index/tree/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/index/tree/package-info.java b/modules/core/src/main/java/org/apache/ignite/streamer/index/tree/package-info.java
deleted file mode 100644
index 65f533c..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/index/tree/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains tree-based streamer index implementation.
- */
-package org.apache.ignite.streamer.index.tree;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/package-info.java b/modules/core/src/main/java/org/apache/ignite/streamer/package-info.java
deleted file mode 100644
index b4aeb02..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains main <b>Streaming APIs.</b>
- */
-package org.apache.ignite.streamer;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerAffinityEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerAffinityEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerAffinityEventRouter.java
deleted file mode 100644
index dee5a19..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerAffinityEventRouter.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.router;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.streamer.*;
-
-import java.util.*;
-
-/**
- * Router used to colocate identical streamer events or events with identical affinity
- * key on the same node. Such collocation is often required to perform computations on
- * multiple events together, for example, find number of occurrences of a word in some
- * text. In this case you would collocate identical words together to make sure that
- * you can update their counts.
- * <h1 class="header">Affinity Key</h1>
- * Affinity key for collocation of event together on the same node is specified
- * via {@link AffinityEvent#affinityKey()} method. If event does not implement
- * {@link AffinityEvent} interface, then event itself will be used to determine affinity.
- */
-public class StreamerAffinityEventRouter extends StreamerEventRouterAdapter {
-    /** */
-    public static final int REPLICA_CNT = 128;
-
-    /**
-     * All events that implement this interface will be routed based on key affinity.
-     */
-    @SuppressWarnings("PublicInnerClass")
-    public interface AffinityEvent {
-        /**
-         * @return Affinity route key for the event.
-         */
-        public Object affinityKey();
-    }
-
-    /** Grid instance. */
-    @IgniteInstanceResource
-    private Ignite ignite;
-
-    /** */
-    private final GridConsistentHash<UUID> nodeHash = new GridConsistentHash<>();
-
-    /** */
-    private Collection<UUID> addedNodes = new GridConcurrentHashSet<>();
-
-    /** {@inheritDoc} */
-    @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
-        return node(evt instanceof AffinityEvent ? ((AffinityEvent) evt).affinityKey() :
-            evt, ctx);
-    }
-
-    /**
-     * @param obj Object.
-     * @param ctx Context.
-     * @return Rich node.
-     */
-    private ClusterNode node(Object obj, StreamerContext ctx) {
-        while (true) {
-            Collection<ClusterNode> nodes = ctx.projection().nodes();
-
-            assert nodes != null;
-            assert !nodes.isEmpty();
-
-            int nodesSize = nodes.size();
-
-            if (nodesSize == 1) { // Minor optimization.
-                ClusterNode ret = F.first(nodes);
-
-                assert ret != null;
-
-                return ret;
-            }
-
-            final Collection<UUID> lookup = U.newHashSet(nodesSize);
-
-            // Store nodes in map for fast lookup.
-            for (ClusterNode n : nodes)
-                // Add nodes into hash circle, if absent.
-                lookup.add(resolveNode(n));
-
-            // Cleanup circle.
-            if (lookup.size() != addedNodes.size()) {
-                Collection<UUID> rmv = null;
-
-                for (Iterator<UUID> iter = addedNodes.iterator(); iter.hasNext(); ) {
-                    UUID id = iter.next();
-
-                    if (!lookup.contains(id)) {
-                        iter.remove();
-
-                        if (rmv == null)
-                            rmv = new ArrayList<>();
-
-                        rmv.add(id);
-                    }
-                }
-
-                if (!F.isEmpty(rmv))
-                    nodeHash.removeNodes(rmv);
-            }
-
-            UUID nodeId = nodeHash.node(obj, lookup);
-
-            assert nodeId != null;
-
-            ClusterNode node = ctx.projection().node(nodeId);
-
-            if (node != null)
-                return node;
-        }
-    }
-
-    /**
-     * Add node to hash circle if this is the first node invocation.
-     *
-     * @param n Node to get info for.
-     * @return Node ID.
-     */
-    private UUID resolveNode(ClusterNode n) {
-        UUID nodeId = n.id();
-
-        if (!addedNodes.contains(nodeId)) {
-            addedNodes.add(nodeId);
-
-            nodeHash.addNode(nodeId, REPLICA_CNT);
-        }
-
-        return nodeId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerCacheAffinityEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerCacheAffinityEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerCacheAffinityEventRouter.java
deleted file mode 100644
index ddc8059..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerCacheAffinityEventRouter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.router;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.streamer.*;
-import org.jetbrains.annotations.*;
-
-/**
- * Router used to colocate streamer events with data stored in a partitioned cache.
- * <h1 class="header">Affinity Key</h1>
- * Affinity key for collocation of event together on the same node is specified
- * via {@link CacheAffinityEvent#affinityKey()} method. If event does not implement
- * {@link CacheAffinityEvent} interface, then event will be routed always to local node.
- */
-public class StreamerCacheAffinityEventRouter extends StreamerEventRouterAdapter {
-    /**
-     * All events that implement this interface will be routed based on key affinity.
-     */
-    @SuppressWarnings("PublicInnerClass")
-    public interface CacheAffinityEvent {
-        /**
-         * @return Affinity route key for the event.
-         */
-        public Object affinityKey();
-
-        /**
-         * @return Cache name, if {@code null}, the default cache is used.
-         */
-        @Nullable public String cacheName();
-    }
-
-    /** Grid instance. */
-    @IgniteInstanceResource
-    private Ignite ignite;
-
-    /** {@inheritDoc} */
-    @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
-        if (evt instanceof CacheAffinityEvent) {
-            CacheAffinityEvent e = (CacheAffinityEvent)evt;
-
-            GridCache<Object, Object> c = ((IgniteEx) ignite).cachex(e.cacheName());
-
-            assert c != null;
-
-            return c.affinity().mapKeyToNode(e.affinityKey());
-        }
-
-        return ignite.cluster().localNode();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerLocalEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerLocalEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerLocalEventRouter.java
deleted file mode 100644
index e9a2f8d..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerLocalEventRouter.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.router;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.streamer.*;
-
-import java.util.*;
-
-/**
- * Local router. Always routes event to local node.
- */
-public class StreamerLocalEventRouter implements StreamerEventRouter {
-    /** Grid instance. */
-    @IgniteInstanceResource
-    private Ignite ignite;
-
-    /** {@inheritDoc} */
-    @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
-        return ignite.cluster().localNode();
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx, String stageName,
-        Collection<T> evts) {
-        return F.asMap(ignite.cluster().localNode(), evts);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRandomEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRandomEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRandomEventRouter.java
deleted file mode 100644
index ce41944..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRandomEventRouter.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.router;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.streamer.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Random router. Routes event to random node.
- */
-public class StreamerRandomEventRouter extends StreamerEventRouterAdapter {
-    /** Optional predicates to exclude nodes from routing. */
-    private IgnitePredicate<ClusterNode>[] predicates;
-
-    /**
-     * Empty constructor for spring.
-     */
-    public StreamerRandomEventRouter() {
-        this((IgnitePredicate<ClusterNode>[])null);
-    }
-
-    /**
-     * Constructs random event router with optional set of filters to apply to streamer projection.
-     *
-     * @param predicates Node predicates.
-     */
-    public StreamerRandomEventRouter(@Nullable IgnitePredicate<ClusterNode>... predicates) {
-        this.predicates = predicates;
-    }
-
-    /**
-     * Constructs random event router with optional set of filters to apply to streamer projection.
-     *
-     * @param predicates Node predicates.
-     */
-    @SuppressWarnings("unchecked")
-    public StreamerRandomEventRouter(Collection<IgnitePredicate<ClusterNode>> predicates) {
-        if (!F.isEmpty(predicates)) {
-            this.predicates = new IgnitePredicate[predicates.size()];
-
-            predicates.toArray(this.predicates);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public ClusterNode route(StreamerContext ctx, String stageName, Object evt) {
-        Collection<ClusterNode> nodes = F.view(ctx.projection().nodes(), predicates);
-
-        if (F.isEmpty(nodes))
-            return null;
-
-        int idx = ThreadLocalRandom8.current().nextInt(nodes.size());
-
-        int i = 0;
-
-        Iterator<ClusterNode> iter = nodes.iterator();
-
-        while (true) {
-            if (!iter.hasNext())
-                iter = nodes.iterator();
-
-            ClusterNode node = iter.next();
-
-            if (idx == i++)
-                return node;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRoundRobinEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRoundRobinEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRoundRobinEventRouter.java
deleted file mode 100644
index 2471846..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRoundRobinEventRouter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.router;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.streamer.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Round robin router.
- */
-public class StreamerRoundRobinEventRouter extends StreamerEventRouterAdapter {
-    /** */
-    private final AtomicLong lastOrder = new AtomicLong();
-
-    /** {@inheritDoc} */
-    @Override public ClusterNode route(StreamerContext ctx, String stageName, Object evt) {
-        Collection<ClusterNode> nodes = ctx.projection().nodes();
-
-        int idx = (int)(lastOrder.getAndIncrement() % nodes.size());
-
-        int i = 0;
-
-        Iterator<ClusterNode> iter = nodes.iterator();
-
-        while (true) {
-            if (!iter.hasNext())
-                iter = nodes.iterator();
-
-            ClusterNode node = iter.next();
-
-            if (idx == i++)
-                return node;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/router/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/package-info.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/package-info.java
deleted file mode 100644
index ad139e6..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/router/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains streamer event router implementations.
- */
-package org.apache.ignite.streamer.router;
\ No newline at end of file


Mime
View raw message