ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [02/22] incubator-ignite git commit: sp-2 streaming cleanup
Date Fri, 20 Mar 2015 12:07:37 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerUnboundedWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerUnboundedWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerUnboundedWindow.java
deleted file mode 100644
index ef7900d..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerUnboundedWindow.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.window;
-
-import org.apache.ignite.internal.processors.streamer.*;
-import org.jdk8.backport.*;
-
-import java.util.*;
-
-/**
- * Unbounded window which holds all events. Events can be evicted manually from window
- * via any of the {@code dequeue(...)} methods.
- */
-public class StreamerUnboundedWindow<E> extends StreamerWindowAdapter<E> {
-    /** Events. */
-    private ConcurrentLinkedDeque8<E> evts = new ConcurrentLinkedDeque8<>();
-
-    /** {@inheritDoc} */
-    @Override protected void stop0() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void checkConfiguration() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void reset0() {
-        evts.clear();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        return evts.sizex();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected GridStreamerWindowIterator<E> iterator0() {
-        final ConcurrentLinkedDeque8.IteratorEx<E> it = (ConcurrentLinkedDeque8.IteratorEx<E>)evts.iterator();
-
-        return new GridStreamerWindowIterator<E>() {
-            private E lastRet;
-
-            @Override public boolean hasNext() {
-                return it.hasNext();
-            }
-
-            @Override public E next() {
-                lastRet = it.next();
-
-                return lastRet;
-            }
-
-            @Override public E removex() {
-                return (it.removex()) ? lastRet : null;
-            }
-        };
-    }
-
-    /** {@inheritDoc} */
-    @Override public int evictionQueueSize() {
-        return 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean enqueue0(E evt) {
-        return evts.add(evt);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Collection<E> dequeue0(int cnt) {
-        Collection<E> res = new ArrayList<>(cnt);
-
-        for (int i = 0; i < cnt; i++) {
-            E evicted = evts.pollLast();
-
-            if (evicted != null)
-                res.add(evicted);
-            else
-                break;
-        }
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Collection<E> pollEvicted0(int cnt) {
-        return Collections.emptyList();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Collection<E> pollEvictedBatch0() {
-        return Collections.emptyList();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerWindowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerWindowAdapter.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerWindowAdapter.java
deleted file mode 100644
index 963671a..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerWindowAdapter.java
+++ /dev/null
@@ -1,537 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.window;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.streamer.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.lifecycle.*;
-import org.apache.ignite.streamer.*;
-import org.apache.ignite.streamer.index.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Streamer window adapter.
- */
-public abstract class StreamerWindowAdapter<E> implements LifecycleAware, StreamerWindow<E>,
-    StreamerWindowMBean {
-    /** Default window name. */
-    private String name = getClass().getSimpleName();
-
-    /** Filter predicate. */
-    private IgnitePredicate<Object> filter;
-
-    /** Indexes. */
-    private Map<String, StreamerIndexProvider<E, ?, ?>> idxsAsMap;
-
-    /** */
-    private StreamerIndexProvider<E, ?, ?>[] idxs;
-
-    /** Lock for updates and snapshot. */
-    private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
-
-    /** {@inheritDoc} */
-    @Override public String getClassName() {
-        return U.compact(getClass().getName());
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getSize() {
-        return size();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getEvictionQueueSize() {
-        return evictionQueueSize();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String name() {
-        return name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getName() {
-        return name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterator<E> iterator() {
-        return new BoundedIterator(iterator0());
-    }
-
-    /**
-     * Returns an iterator over a set of elements of type T without check for iteration limit. That is,
-     * in case concurrent thread constantly adding new elements to the window we could iterate forever.
-     *
-     * @return Iterator.
-     */
-    protected abstract GridStreamerWindowIterator<E> iterator0();
-
-    /** {@inheritDoc} */
-    @Override public boolean enqueue(E evt) {
-        lock.readLock();
-
-        try {
-            boolean res = (filter == null || filter.apply(evt));
-
-            if (res) {
-                updateIndexes(evt, false);
-
-                if (!enqueue0(evt))
-                    updateIndexes(evt, true);
-            }
-
-            return res;
-        }
-        finally {
-            lock.readUnlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean enqueue(E... evts) {
-        return enqueueAll(Arrays.asList(evts));
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean enqueueAll(Collection<E> evts) {
-        lock.readLock();
-
-        try {
-            boolean ignoreFilter = filter == null || F.isAlwaysTrue(filter);
-
-            boolean res = true;
-
-            for (E evt : evts) {
-                if (ignoreFilter || filter.apply(evt)) {
-                    updateIndexes(evt, false);
-
-                    boolean added = enqueue0(evt);
-
-                    if (!added)
-                        updateIndexes(evt, true);
-
-                    res &= added;
-                }
-            }
-
-            return res;
-        }
-        finally {
-            lock.readUnlock();
-        }
-    }
-
-    /**
-     * Adds event to window.
-     *
-     * @param evt Event.
-     * @return {@code True} if event added.
-     */
-    protected abstract boolean enqueue0(E evt);
-
-    /** {@inheritDoc} */
-    @Override public E dequeue() {
-        return F.first(dequeue(1));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<E> dequeueAll() {
-        return dequeue(size());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<E> dequeue(int cnt) {
-        lock.readLock();
-
-        try {
-            Collection<E> evts = dequeue0(cnt);
-
-            if (!evts.isEmpty() && idxs != null) {
-                for (E evt : evts)
-                    updateIndexes(evt, true);
-            }
-
-            return evts;
-        }
-        finally {
-            lock.readUnlock();
-        }
-    }
-
-    /**
-     * Dequeues up to cnt elements from window. If current window size is less than cnt, will dequeue all elements
-     * from window.
-     *
-     * @param cnt Count.
-     * @return Dequeued elements.
-     */
-    protected abstract Collection<E> dequeue0(int cnt);
-
-    /** {@inheritDoc} */
-    @Override public E pollEvicted() {
-        return F.first(pollEvicted(1));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<E> pollEvictedAll() {
-        return pollEvicted(evictionQueueSize());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<E> pollEvicted(int cnt) {
-        lock.readLock();
-
-        try {
-            Collection<E> evts = pollEvicted0(cnt);
-
-            if (!evts.isEmpty() && idxs != null) {
-                for (E evt : evts)
-                    updateIndexes(evt, true);
-            }
-
-            return evts;
-        }
-        finally {
-            lock.readUnlock();
-        }
-    }
-
-    /**
-     * If window supports eviction, this method will return up to cnt evicted elements.
-     *
-     * @param cnt Count.
-     * @return Evicted elements.
-     */
-    protected abstract Collection<E> pollEvicted0(int cnt);
-
-    /** {@inheritDoc} */
-    @Override public Collection<E> pollEvictedBatch() {
-        lock.readLock();
-
-        try {
-            Collection<E> evts = pollEvictedBatch0();
-
-            if (!evts.isEmpty() && idxs != null) {
-                for (E evt : evts)
-                    updateIndexes(evt, true);
-            }
-
-            return evts;
-        }
-        finally {
-            lock.readUnlock();
-        }
-    }
-
-    /**
-     * If window supports batch eviction, this method will poll next evicted batch from window. If windows does not
-     * support batch eviction but supports eviction, will return collection of single last evicted element. If window
-     * does not support eviction, will return empty collection.
-     *
-     * @return Elements from evicted batch.
-     */
-    protected abstract Collection<E> pollEvictedBatch0();
-
-    /** {@inheritDoc} */
-    @Override public final void start() {
-        checkConfiguration();
-
-        if (idxs != null) {
-            for (StreamerIndexProvider<E, ?, ?> idx : idxs)
-                idx.initialize();
-        }
-
-        reset();
-    }
-
-    /** {@inheritDoc} */
-    @Override public final void reset(){
-        lock.writeLock();
-
-        try {
-            if (idxs != null) {
-                for (StreamerIndexProvider<E, ?, ?> idx : idxs)
-                    idx.reset();
-            }
-
-            reset0();
-        }
-        finally {
-            lock.writeUnlock();
-        }
-    }
-
-    /**
-     * Check window configuration.
-     *
-     * @throws IgniteException If failed.
-     */
-    protected abstract void checkConfiguration() throws IgniteException;
-
-    /**
-     * Reset routine.
-     */
-    protected abstract void reset0();
-
-    /** {@inheritDoc} */
-    @Override public void stop() {
-        lock.writeLock();
-
-        try {
-            stop0();
-        }
-        finally {
-            lock.writeUnlock();
-        }
-    }
-
-    /**
-     * Dispose window.
-     */
-    protected abstract void stop0();
-
-    /** {@inheritDoc} */
-    @Override public Collection<E> snapshot(boolean includeEvicted) {
-        lock.writeLock();
-
-        try {
-            int skip = includeEvicted ? 0 : evictionQueueSize();
-
-            List<E> res = new ArrayList<>(size() - skip);
-
-            Iterator<E> iter = iterator();
-
-            int i = 0;
-
-            while (iter.hasNext()) {
-                E next = iter.next();
-
-                if (i++ >= skip)
-                    res.add(next);
-            }
-
-            return Collections.unmodifiableList(res);
-        }
-        finally {
-            lock.writeUnlock();
-        }
-    }
-
-    /**
-     * Sets window name.
-     *
-     * @param name Window name.
-     */
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    /**
-     * Gets optional event filter.
-     *
-     * @return Optional event filter.
-     */
-    @Nullable public IgnitePredicate<Object> getFilter() {
-        return filter;
-    }
-
-    /**
-     * Sets event filter.
-     *
-     * @param filter Event filter.
-     */
-    public void setFilter(@Nullable IgnitePredicate<Object> filter) {
-        this.filter = filter;
-    }
-
-    /** {@inheritDoc} */
-    @Override public <K, V> StreamerIndex<E, K, V> index() {
-        return index(null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <K, V> StreamerIndex<E, K, V> index(@Nullable String name) {
-        if (idxsAsMap != null) {
-            StreamerIndexProvider<E, K, V> idx = (StreamerIndexProvider<E, K, V>)idxsAsMap.get(name);
-
-            if (idx == null)
-                throw new IllegalArgumentException("Streamer index is not configured: " + name);
-
-            return idx.index();
-        }
-
-        throw new IllegalArgumentException("Streamer index is not configured: " + name);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<StreamerIndex<E, ?, ?>> indexes() {
-        if (idxs != null) {
-            Collection<StreamerIndex<E, ?, ?>> res = new ArrayList<>(idxs.length);
-
-            for (StreamerIndexProvider<E, ?, ?> idx : idxs)
-                res.add(idx.index());
-
-            return res;
-        }
-        else
-            return Collections.emptyList();
-    }
-
-    /**
-     * Get array of index providers.
-     *
-     * @return Index providers.
-     */
-    public StreamerIndexProvider<E, ?, ?>[] indexProviders() {
-        return idxs;
-    }
-
-    /**
-     * Set indexes.
-     *
-     * @param idxs Indexes.
-     * @throws IllegalArgumentException If some index names are not unique.
-     */
-    @SuppressWarnings("unchecked")
-    public void setIndexes(StreamerIndexProvider<E, ?, ?>... idxs) throws IllegalArgumentException {
-        A.ensure(!F.isEmpty(idxs), "!F.isEmpty(idxs)");
-
-        idxsAsMap = new HashMap<>(idxs.length, 1.0f);
-        this.idxs = new StreamerIndexProvider[idxs.length];
-
-        int i = 0;
-
-        for (StreamerIndexProvider<E, ?, ?> idx : idxs) {
-            StreamerIndexProvider<E, ?, ?> old = idxsAsMap.put(idx.getName(), idx);
-
-            if (old != null)
-                throw new IllegalArgumentException("Index name is not unique [idx1=" + old + ", idx2=" + idx + ']');
-
-            this.idxs[i++] = idx;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void clearEvicted() {
-        pollEvictedAll();
-    }
-
-    /**
-     * Update indexes.
-     *
-     * @param evt Event.
-     * @param rmv Remove flag.
-     * @throws IgniteException If index update failed.
-     */
-    protected void updateIndexes(E evt, boolean rmv) throws IgniteException {
-        if (idxs != null) {
-            StreamerIndexUpdateSync sync = new StreamerIndexUpdateSync();
-
-            boolean rollback = true;
-
-            try {
-                for (StreamerIndexProvider<E, ?, ?> idx : idxs) {
-                    if (rmv)
-                        idx.remove(sync, evt);
-                    else
-                        idx.add(sync, evt);
-                }
-
-                rollback = false;
-            }
-            finally {
-                for (StreamerIndexProvider<E, ?, ?> idx : idxs)
-                    idx.endUpdate(sync, evt, rollback, rmv);
-
-                sync.finish(1);
-            }
-        }
-    }
-
-    /**
-     * Window iterator wrapper which prevent returning more elements that existed in the underlying collection by the
-     * time of iterator creation.
-     */
-    private class BoundedIterator implements Iterator<E> {
-        /** Iterator. */
-        private final GridStreamerWindowIterator<E> iter;
-
-        /** How many elements to return left (at most). */
-        private int left;
-
-        /**
-         * Constructor.
-         *
-         * @param iter Iterator.
-         */
-        private BoundedIterator(GridStreamerWindowIterator<E> iter) {
-            assert iter != null;
-            assert lock != null;
-
-            this.iter = iter;
-
-            left = size();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasNext() {
-            return left > 0 && iter.hasNext();
-        }
-
-        /** {@inheritDoc} */
-        @Override public E next() {
-            left--;
-
-            if (left < 0)
-                throw new NoSuchElementException();
-
-            return iter.next();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void remove() {
-            if (left < 0)
-                throw new IllegalStateException();
-
-            lock.readLock();
-
-            try {
-                E evt = iter.removex();
-
-                if (evt != null) {
-                    try {
-                        updateIndexes(evt, true);
-                    }
-                    catch (IgniteException e) {
-                        throw new IgniteException("Failed to remove event: " + evt, e);
-                     }
-                }
-            }
-            finally {
-                lock.readUnlock();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/window/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/package-info.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/package-info.java
deleted file mode 100644
index 7d50c46..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/window/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains streamer window implementations.
- */
-package org.apache.ignite.streamer.window;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerEvictionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerEvictionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerEvictionSelfTest.java
deleted file mode 100644
index 8985a91..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerEvictionSelfTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.marshaller.optimized.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.streamer.*;
-import org.apache.ignite.streamer.window.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static java.util.concurrent.TimeUnit.*;
-
-/**
- * Tests for streamer eviction logic.
- */
-public class GridStreamerEvictionSelfTest extends GridCommonAbstractTest {
-    /** IP finder. */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** Number of events used in test. */
-    private static final int EVENTS_COUNT = 10;
-
-    /** Test stages. */
-    private Collection<StreamerStage> stages;
-
-    /** Event router. */
-    private StreamerEventRouter router;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setStreamerConfiguration(streamerConfiguration());
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
-        discoSpi.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(discoSpi);
-
-        cfg.setMarshaller(new OptimizedMarshaller(false));
-
-        return cfg;
-    }
-
-    /**
-     * @return Streamer configuration.
-     */
-    private StreamerConfiguration streamerConfiguration() {
-        StreamerConfiguration cfg = new StreamerConfiguration();
-
-        cfg.setRouter(router);
-
-        StreamerBoundedTimeWindow window = new StreamerBoundedTimeWindow();
-
-        window.setName("window1");
-        window.setTimeInterval(60000);
-
-        cfg.setWindows(F.asList((StreamerWindow)window));
-
-        cfg.setStages(stages);
-
-        return cfg;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testContextNextStage() throws Exception {
-        router = new GridTestStreamerEventRouter();
-
-        final CountDownLatch finishLatch = new CountDownLatch(EVENTS_COUNT);
-        final AtomicReference<AssertionError> err = new AtomicReference<>();
-
-        SC stage = new SC() {
-            @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
-                Collection<Object> evts) {
-                assert evts.size() == 1;
-
-                if (ctx.nextStageName() == null) {
-                    finishLatch.countDown();
-
-                    return null;
-                }
-
-                StreamerWindow win = ctx.window("window1");
-
-                // Add new events to the window.
-                win.enqueueAll(evts);
-
-                try {
-                    assertEquals(0, win.evictionQueueSize());
-                }
-                catch (AssertionError e) {
-                    err.compareAndSet(null, e);
-                }
-
-                // Evict outdated events from the window.
-                Collection evictedEvts = win.pollEvictedAll();
-
-                try {
-                    assertEquals(0, evictedEvts.size());
-                }
-                catch (AssertionError e) {
-                    err.compareAndSet(null, e);
-                }
-
-                Integer val = (Integer)F.first(evts);
-
-                return (Map)F.asMap(ctx.nextStageName(), F.asList(++val));
-            }
-        };
-
-        stages = F.asList((StreamerStage)new GridTestStage("0", stage), new GridTestStage("1", stage));
-
-        startGrids(2);
-
-        try {
-            GridTestStreamerEventRouter router = (GridTestStreamerEventRouter)this.router;
-
-            router.put("0", grid(0).localNode().id());
-            router.put("1", grid(1).localNode().id());
-
-            for (int i = 0; i < EVENTS_COUNT; i++)
-                grid(0).streamer(null).addEvent(i);
-
-            boolean await = finishLatch.await(5, SECONDS);
-
-            if (err.get() != null)
-                throw err.get();
-
-            if (!await)
-                fail("Some events didn't finished.");
-        }
-        finally {
-            stopAllGrids(false);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerFailoverSelfTest.java
deleted file mode 100644
index a7f99d8..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerFailoverSelfTest.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.streamer.*;
-import org.apache.ignite.streamer.window.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jdk8.backport.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-/**
- *
- */
-public class GridStreamerFailoverSelfTest extends GridCommonAbstractTest {
-    /** IP finder. */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** Event router. */
-    private TestRandomRouter router;
-
-    /** Maximum number of concurrent sessions for test. */
-    private int maxConcurrentSess;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setStreamerConfiguration(streamerConfiguration());
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
-        discoSpi.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(discoSpi);
-
-        cfg.setPeerClassLoadingEnabled(false);
-
-        return cfg;
-    }
-
-    /**
-     * @return Streamer configuration.
-     */
-    private StreamerConfiguration streamerConfiguration() {
-        StreamerConfiguration cfg = new StreamerConfiguration();
-
-        cfg.setAtLeastOnce(true);
-
-        cfg.setRouter(router);
-
-        StreamerBoundedSizeWindow window = new StreamerBoundedSizeWindow();
-
-        window.setMaximumSize(100);
-
-        cfg.setWindows(F.asList((StreamerWindow)window));
-
-        cfg.setMaximumConcurrentSessions(maxConcurrentSess);
-
-        SC pass = new SC() {
-            @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
-                Collection<Object> objects) {
-                assert ctx.nextStageName() != null;
-
-                // Pass to next stage.
-                return (Map)F.asMap(ctx.nextStageName(), objects);
-            }
-        };
-
-        SC put = new SC() {
-            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
-                Collection<Object> evts) {
-                ConcurrentMap<Object, AtomicInteger> cntrs = ctx.localSpace();
-
-                for (Object evt : evts) {
-                    AtomicInteger cnt = cntrs.get(evt);
-
-                    if (cnt == null)
-                        cnt = F.addIfAbsent(cntrs, evt, new AtomicInteger());
-
-                    cnt.incrementAndGet();
-                }
-
-                return null;
-            }
-        };
-
-        cfg.setStages(F.asList((StreamerStage)new GridTestStage("pass", pass), new GridTestStage("put", put)));
-
-        return cfg;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testEventFailover() throws Exception {
-        checkEventFailover(500);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void checkEventFailover(int max) throws Exception {
-        router = new TestRandomRouter();
-        maxConcurrentSess = max;
-
-        startGrids(6);
-
-        try {
-            router.sourceNodeId(grid(0).localNode().id());
-            router.destinationNodeId(grid(5).localNode().id());
-
-            final AtomicBoolean done = new AtomicBoolean(false);
-
-            IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    Random rnd = new Random();
-
-                    while (!done.get()) {
-                        // Pick a random grid to restart.
-                        int idx = rnd.nextInt(4) + 1;
-
-                        info(">>>>> Stopping grid " + grid(idx).localNode().id());
-
-                        stopGrid(idx, true);
-
-                        U.sleep(1000);
-
-                        startGrid(idx);
-
-                        info(">>>>>> Started grid " + grid(idx).localNode().id());
-
-                        U.sleep(500);
-                    }
-
-                    return null;
-                }
-            }, 1, "restarter");
-
-            final Collection<Object> failed = new ConcurrentLinkedQueue<>();
-
-            IgniteStreamer streamer = grid(0).streamer(null);
-
-            streamer.addStreamerFailureListener(new StreamerFailureListener() {
-                @Override public void onFailure(String stageName, Collection<Object> evts, Throwable err) {
-                    info("Unable to failover events [stageName=" + stageName + ", err=" + err + ']');
-
-                    failed.addAll(evts);
-                }
-            });
-
-            final int evtsCnt = 300000;
-
-            // Now we are ready to process events.
-            for (int i = 0; i < evtsCnt; i++) {
-                if (i > 0 && i % 10000 == 0)
-                    info("Processed: " + i);
-
-                streamer.addEvent(i);
-            }
-
-            done.set(true);
-
-            fut.get();
-
-            // Do not cancel and wait for all tasks to finish.
-            G.stop(getTestGridName(0), false);
-
-            ConcurrentMap<Integer, AtomicInteger> finSpace = grid(5).streamer(null).context().localSpace();
-
-            for (int i = 0; i < evtsCnt; i++) {
-                AtomicInteger cnt = finSpace.get(i);
-
-                if (cnt == null) {
-                    assertTrue("Missing counter for key both in result map and in failover failed map: " + i,
-                        failed.contains(i));
-                }
-                else
-                    assertTrue(cnt.get() > 0);
-            }
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * Test random router.
-     */
-    private static class TestRandomRouter extends StreamerEventRouterAdapter {
-        /** Source node ID. */
-        private UUID srcNodeId;
-
-        /** Destination node ID. */
-        private UUID destNodeId;
-
-        /** {@inheritDoc} */
-        @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
-            if ("put".equals(stageName))
-                return ctx.projection().node(destNodeId);
-
-            // Route to random node different from srcNodeId.
-            Collection<ClusterNode> nodes = ctx.projection().forPredicate(new P1<ClusterNode>() {
-                @Override public boolean apply(ClusterNode n) {
-                    return !srcNodeId.equals(n.id()) && !destNodeId.equals(n.id());
-                }
-            }).nodes();
-
-            int idx = ThreadLocalRandom8.current().nextInt(nodes.size());
-
-            int i = 0;
-
-            Iterator<ClusterNode> iter = nodes.iterator();
-
-            while (true) {
-                if (!iter.hasNext())
-                    iter = nodes.iterator();
-
-                ClusterNode node = iter.next();
-
-                if (idx == i++)
-                    return node;
-            }
-        }
-
-        /**
-         * @param srcNodeId New source node ID.
-         */
-        public void sourceNodeId(UUID srcNodeId) {
-            this.srcNodeId = srcNodeId;
-        }
-
-        /**
-         * @param destNodeId New destination node ID.
-         */
-        public void destinationNodeId(UUID destNodeId) {
-            this.destNodeId = destNodeId;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
deleted file mode 100644
index 4141948..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.streamer.*;
-import org.apache.ignite.streamer.index.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Test for {@link org.apache.ignite.lifecycle.LifecycleAware} support in {@link org.apache.ignite.streamer.StreamerConfiguration}.
- */
-public class GridStreamerLifecycleAwareSelfTest extends GridAbstractLifecycleAwareSelfTest {
-    /**
-     */
-    private static class TestEventRouter extends TestLifecycleAware implements StreamerEventRouter {
-        /**
-         */
-        TestEventRouter() {
-            super(null);
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx,
-            String stageName, Collection<T> evts) {
-            return null;
-        }
-    }
-
-    /**
-     */
-    private static class TestStage extends TestLifecycleAware implements StreamerStage {
-        /**
-         */
-        TestStage() {
-            super(null);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String name() {
-            return "dummy";
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection evts) {
-            return null;
-        }
-    }
-
-    /**
-     */
-    private static class TestWindow extends TestLifecycleAware implements StreamerWindow {
-        /**
-         */
-        TestWindow() {
-            super(null);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String name() {
-            return "dummy";
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public StreamerIndex index() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public StreamerIndex index(@Nullable String name) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<StreamerIndex> indexes() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void reset() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public int size() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int evictionQueueSize() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean enqueue(Object evt) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean enqueue(Object... evts) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean enqueueAll(Collection evts) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Object dequeue() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection dequeue(int cnt) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection dequeueAll() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Object pollEvicted() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection pollEvicted(int cnt) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection pollEvictedBatch() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection pollEvictedAll() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void clearEvicted() {
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection snapshot(boolean includeIvicted) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator iterator() {
-            return null;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected final IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        StreamerConfiguration streamerCfg = new StreamerConfiguration();
-
-        TestEventRouter router = new TestEventRouter();
-
-        streamerCfg.setRouter(router);
-
-        lifecycleAwares.add(router);
-
-        TestStage stage = new TestStage();
-
-        streamerCfg.setStages(F.asList((StreamerStage)stage));
-
-        lifecycleAwares.add(stage);
-
-        TestWindow window = new TestWindow();
-
-        streamerCfg.setWindows(F.asList((StreamerWindow)window));
-
-        lifecycleAwares.add(window);
-
-        cfg.setStreamerConfiguration(streamerCfg);
-
-        return cfg;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerSelfTest.java
deleted file mode 100644
index 19b9c67..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridStreamerSelfTest.java
+++ /dev/null
@@ -1,796 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.optimized.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.streamer.*;
-import org.apache.ignite.streamer.router.*;
-import org.apache.ignite.streamer.window.*;
-import org.apache.ignite.testframework.*;
-import org.apache.ignite.testframework.config.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jetbrains.annotations.*;
-
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.configuration.DeploymentMode.*;
-
-/**
- * Basic streamer test.
- */
-public class GridStreamerSelfTest extends GridCommonAbstractTest {
-    /** IP finder. */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private boolean atLeastOnce = true;
-
-    /** Test stages. */
-    private Collection<StreamerStage> stages;
-
-    /** Event router. */
-    private StreamerEventRouter router;
-
-    /** P2P enabled flag. */
-    private boolean p2pEnabled;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setStreamerConfiguration(streamerConfiguration());
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
-        discoSpi.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(discoSpi);
-
-        cfg.setPeerClassLoadingEnabled(p2pEnabled);
-
-        if (p2pEnabled)
-            cfg.setDeploymentMode(SHARED);
-
-        cfg.setMarshaller(new OptimizedMarshaller(false));
-
-        return cfg;
-    }
-
-    /**
-     * @return Streamer configuration.
-     */
-    private StreamerConfiguration streamerConfiguration() {
-        StreamerConfiguration cfg = new StreamerConfiguration();
-
-        cfg.setAtLeastOnce(atLeastOnce);
-
-        cfg.setRouter(router);
-
-        cfg.setWindows(F.asList((StreamerWindow)new StreamerUnboundedWindow()));
-
-        cfg.setStages(stages);
-
-        return cfg;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testInjections() throws Exception {
-        final int evtCnt = 100;
-
-        final CountDownLatch finishLatch = new CountDownLatch(evtCnt);
-
-        stages = F.<StreamerStage>asList(new StreamerStage() {
-            @IgniteInstanceResource
-            private Ignite g;
-
-            @LoggerResource
-            private IgniteLogger log;
-
-            @Override public String name() {
-                return "name";
-            }
-
-            @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection evts) {
-                assert g != null;
-                assert log != null;
-
-                log.info("Processing events: " + evts);
-
-                finishLatch.countDown();
-
-                return null;
-            }
-        });
-
-        try {
-            final Ignite ignite0 = startGrid(0);
-
-            IgniteStreamer streamer = ignite0.streamer(null);
-
-            for (int i = 0; i < evtCnt; i++)
-                streamer.addEvent("event1");
-
-            assert finishLatch.await(10, SECONDS);
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testStreamerMetrics() throws Exception {
-        atLeastOnce = true;
-        p2pEnabled = false;
-        router = new GridTestStreamerEventRouter();
-
-        final int evtCnt = 100;
-
-        final CountDownLatch finishLatch = new CountDownLatch(evtCnt);
-
-        SC stage = new SC() {
-            @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
-                Collection<Object> evts)
-                throws IgniteCheckedException {
-                String nextStage = ctx.nextStageName();
-
-                U.sleep(50);
-
-                if (nextStage == null) {
-                    finishLatch.countDown();
-
-                    return null;
-                }
-
-                return (Map)F.asMap(nextStage, evts);
-            }
-        };
-
-        stages = F.asList((StreamerStage)new GridTestStage("a", stage), new GridTestStage("b", stage),
-            new GridTestStage("c", stage));
-
-        startGrids(4);
-
-        try {
-            final Ignite ignite0 = grid(0);
-            final Ignite ignite1 = grid(1);
-            final Ignite ignite2 = grid(2);
-            final Ignite ignite3 = grid(3);
-
-            System.out.println("Grid 0: " + ignite0.cluster().localNode().id());
-            System.out.println("Grid 1: " + ignite1.cluster().localNode().id());
-            System.out.println("Grid 2: " + ignite2.cluster().localNode().id());
-            System.out.println("Grid 3: " + ignite3.cluster().localNode().id());
-
-            GridTestStreamerEventRouter router0 = (GridTestStreamerEventRouter)router;
-
-            router0.put("a", ignite1.cluster().localNode().id());
-            router0.put("b", ignite2.cluster().localNode().id());
-            router0.put("c", ignite3.cluster().localNode().id());
-
-            IgniteStreamer streamer = ignite0.streamer(null);
-
-            for (int i = 0; i < evtCnt; i++)
-                streamer.addEvent("event1");
-
-            finishLatch.await();
-
-            // No stages should be executed on grid0.
-            checkZeroMetrics(ignite0, "a", "b", "c");
-            checkZeroMetrics(ignite1, "b", "c");
-            checkZeroMetrics(ignite2, "a", "c");
-            checkZeroMetrics(ignite3, "a", "b");
-
-            checkMetrics(ignite1, "a", evtCnt, false);
-            checkMetrics(ignite2, "b", evtCnt, false);
-            checkMetrics(ignite3, "c", evtCnt, true);
-
-            // Wait until all acks are received.
-            GridTestUtils.retryAssert(log, 100, 50, new CA() {
-                @Override public void apply() {
-                    StreamerMetrics metrics = ignite0.streamer(null).metrics();
-
-                    assertEquals(0, metrics.currentActiveSessions());
-                }
-            });
-
-            StreamerMetrics metrics = ignite0.streamer(null).metrics();
-
-            assertTrue(metrics.maximumActiveSessions() > 0);
-
-            ignite0.streamer(null).context().query(new IgniteClosure<StreamerContext, Object>() {
-                @Override public Object apply(StreamerContext ctx) {
-                    try {
-                        U.sleep(1000);
-                    }
-                    catch (IgniteInterruptedCheckedException ignore) {
-                        // No-op.
-                    }
-
-                    return null;
-                }
-            });
-
-            metrics = ignite0.streamer(null).metrics();
-
-            assert metrics.queryMaximumExecutionNodes() == 4;
-            assert metrics.queryMinimumExecutionNodes() == 4;
-            assert metrics.queryAverageExecutionNodes() == 4;
-
-            assert metrics.queryMaximumExecutionTime() > 0;
-            assert metrics.queryMinimumExecutionTime() > 0;
-            assert metrics.queryAverageExecutionTime() > 0;
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testContextNextStage() throws Exception {
-        atLeastOnce = true;
-        router = new GridTestStreamerEventRouter();
-        p2pEnabled = false;
-
-        final CountDownLatch finishLatch = new CountDownLatch(1);
-        final AtomicReference<IgniteCheckedException> err = new AtomicReference<>();
-
-        SC stage = new SC() {
-            @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
-                Collection<Object> evts) {
-                String nextStage = ctx.nextStageName();
-
-                if (nextStage == null) {
-                    finishLatch.countDown();
-
-                    return null;
-                }
-
-                assert evts.size() == 1;
-
-                Integer val = (Integer)F.first(evts);
-
-                val++;
-
-                if (!String.valueOf(val).equals(ctx.nextStageName()))
-                    err.compareAndSet(null, new IgniteCheckedException("Stage name comparison failed [exp=" + val +
-                        ", actual=" + ctx.nextStageName() + ']'));
-
-                return (Map)F.asMap(ctx.nextStageName(), F.asList(val));
-            }
-        };
-
-        stages = F.asList((StreamerStage)new GridTestStage("0", stage), new GridTestStage("1", stage),
-            new GridTestStage("2", stage), new GridTestStage("3", stage), new GridTestStage("4", stage));
-
-        startGrids(4);
-
-        try {
-            GridTestStreamerEventRouter router0 = (GridTestStreamerEventRouter)router;
-
-            router0.put("0", grid(1).localNode().id());
-            router0.put("1", grid(2).localNode().id());
-            router0.put("2", grid(3).localNode().id());
-            router0.put("3", grid(0).localNode().id());
-            router0.put("4", grid(1).localNode().id());
-
-            grid(0).streamer(null).addEvent(0);
-
-            finishLatch.await();
-
-            if (err.get() != null)
-                throw err.get();
-        }
-        finally {
-            stopAllGrids(false);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAddEventWithNullStageName() throws Exception {
-        atLeastOnce = true;
-        router = new GridTestStreamerEventRouter();
-        p2pEnabled = false;
-
-        SC stage = new SC() {
-            @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
-                Collection<Object> evts) {
-                String nextStage = ctx.nextStageName();
-
-                if (nextStage == null)
-                    return null;
-
-                Integer val = (Integer)F.first(evts);
-
-                return (Map)F.asMap(ctx.nextStageName(), F.asList(++val));
-            }
-        };
-
-        stages = F.asList((StreamerStage)new GridTestStage("0", stage), new GridTestStage("1", stage));
-
-        startGrids(2);
-
-        try {
-            GridTestStreamerEventRouter router0 = (GridTestStreamerEventRouter)router;
-
-            router0.put("0", grid(0).localNode().id());
-            router0.put("1", grid(1).localNode().id());
-
-            try {
-                grid(0).streamer(null).addEventToStage(null, 0);
-
-                fail();
-            }
-            catch (NullPointerException e) {
-                assertTrue(e.getMessage().contains("Argument cannot be null: stageName"));
-
-                info("Caught expected exception: " + e.getMessage());
-            }
-
-            try {
-                grid(0).streamer(null).addEventsToStage(null, Collections.singletonList(0));
-
-                fail();
-            }
-            catch (NullPointerException e) {
-                assertTrue(e.getMessage().contains("Argument cannot be null: stageName"));
-
-                info("Caught expected exception: " + e.getMessage());
-            }
-        }
-        finally {
-            stopAllGrids(false);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testNullStageNameInResultMap() throws Exception {
-        atLeastOnce = true;
-        router = new GridTestStreamerEventRouter();
-        p2pEnabled = false;
-
-        SC stage = new SC() {
-            @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
-                Collection<Object> evts) {
-                String nextStage = ctx.nextStageName();
-
-                if (nextStage == null)
-                    return null;
-
-                Integer val = (Integer)F.first(evts);
-
-                Map<String, Collection<?>> res = new HashMap<>();
-
-                res.put(null, F.asList(++val));
-
-                return res;
-            }
-        };
-
-        stages = F.asList((StreamerStage)new GridTestStage("0", stage), new GridTestStage("1", stage));
-
-        startGrids(2);
-
-        try {
-            GridTestStreamerEventRouter router0 = (GridTestStreamerEventRouter)router;
-
-            final CountDownLatch errLatch = new CountDownLatch(1);
-
-            grid(0).streamer(null).addStreamerFailureListener(new StreamerFailureListener() {
-                @Override public void onFailure(String stageName, Collection<Object> evts, Throwable err) {
-                    info("Expected failure: " + err.getMessage());
-
-                    errLatch.countDown();
-                }
-            });
-
-            router0.put("0", grid(0).localNode().id());
-            router0.put("1", grid(1).localNode().id());
-
-            grid(0).streamer(null).addEvent(0);
-
-            assert errLatch.await(5, TimeUnit.SECONDS);
-        }
-        finally {
-            stopAllGrids(false);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPeerDeployment() throws Exception {
-        URL[] urls = new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))};
-
-        GridTestExternalClassLoader ldr = new GridTestExternalClassLoader(urls);
-
-        Class<?> cls = ldr.loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestKey");
-
-        assert cls != null;
-
-        final int evtCnt = 100;
-
-        final CountDownLatch finishLatch = new CountDownLatch(evtCnt);
-
-        SC stage = new SC() {
-            @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
-                Collection<Object> evts) {
-                String nextStage = ctx.nextStageName();
-
-                ctx.window().enqueueAll(evts);
-
-                if (nextStage == null) {
-                    finishLatch.countDown();
-
-                    return null;
-                }
-
-                return (Map)F.asMap(nextStage, evts);
-            }
-        };
-
-        stages = F.asList((StreamerStage)new GridTestStage("a", stage), new GridTestStage("b", stage),
-            new GridTestStage("c", stage));
-        router = new GridTestStreamerEventRouter();
-        atLeastOnce = true;
-        p2pEnabled = true;
-
-        startGrids(4);
-
-        try {
-            final Ignite ignite0 = grid(0);
-            final Ignite ignite1 = grid(1);
-            final Ignite ignite2 = grid(2);
-            final Ignite ignite3 = grid(3);
-
-            System.out.println("Grid 0: " + ignite0.cluster().localNode().id());
-            System.out.println("Grid 1: " + ignite1.cluster().localNode().id());
-            System.out.println("Grid 2: " + ignite2.cluster().localNode().id());
-            System.out.println("Grid 3: " + ignite3.cluster().localNode().id());
-
-            GridTestStreamerEventRouter router0 = (GridTestStreamerEventRouter)router;
-
-            router0.put("a", ignite1.cluster().localNode().id());
-            router0.put("b", ignite2.cluster().localNode().id());
-            router0.put("c", ignite3.cluster().localNode().id());
-
-            IgniteStreamer streamer = ignite0.streamer(null);
-
-            for (int i = 0; i < evtCnt; i++)
-                streamer.addEvent(cls.newInstance());
-
-            // Wait for all events to be processed.
-            finishLatch.await();
-
-            for (int i = 1; i < 4; i++)
-                assertEquals(evtCnt, grid(i).streamer(null).context().window().size());
-
-            // Check undeploy.
-            stopGrid(0, false);
-
-            GridTestUtils.retryAssert(log, 50, 50, new CA() {
-                @Override public void apply() {
-                    for (int i = 1; i < 4; i++)
-                        assertEquals(0, grid(i).streamer(null).context().window().size());
-                }
-            });
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testQuery() throws Exception {
-        atLeastOnce = true;
-        router = new StreamerRandomEventRouter();
-        p2pEnabled = false;
-
-        final int evtCnt = 1000;
-
-        final CountDownLatch finishLatch = new CountDownLatch(evtCnt);
-
-        SC stage = new SC() {
-            @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
-                Collection<Object> evts) {
-                ConcurrentMap<String, AtomicInteger> space = ctx.localSpace();
-
-                AtomicInteger cntr = space.get(stageName);
-
-                if (cntr == null)
-                    cntr = F.addIfAbsent(space, stageName, new AtomicInteger());
-
-                for (Object val : evts)
-                    cntr.addAndGet((Integer)val);
-
-                String next = ctx.nextStageName();
-
-                if (next == null) {
-                    finishLatch.countDown();
-
-                    return null;
-                }
-
-                return (Map)F.asMap(next, evts);
-            }
-        };
-
-        stages = F.asList((StreamerStage)new GridTestStage("a", stage), new GridTestStage("b", stage),
-            new GridTestStage("c", stage), new GridTestStage("d", stage));
-
-        startGrids(4);
-
-        try {
-            int sum = 0;
-
-            int range = 1000;
-
-            Random rnd = new Random();
-
-            for (int i = 0; i < evtCnt; i++) {
-                int val = rnd.nextInt(range);
-
-                grid(0).streamer(null).addEvent(val);
-
-                sum += val;
-            }
-
-            finishLatch.await();
-
-            Map<String, Integer> stagesSum = new HashMap<>(4);
-
-            final String[] stages = {"a", "b", "c", "d"};
-
-            // Check all stages local map.
-            for (int i = 0; i < 4; i++) {
-                Ignite ignite = grid(i);
-
-                ConcurrentMap<String, AtomicInteger> locSpace = ignite.streamer(null).context().localSpace();
-
-                for (String stageName : stages) {
-                    AtomicInteger val = locSpace.get(stageName);
-
-                    assertNotNull(val);
-
-                    info(">>>>> grid=" + ignite.cluster().localNode().id() + ", s=" + stageName + ", val=" + val.get());
-
-                    Integer old = stagesSum.get(stageName);
-
-                    if (old == null)
-                        stagesSum.put(stageName, val.get());
-                    else
-                        stagesSum.put(stageName, old + val.get());
-                }
-            }
-
-            for (String s : stages)
-                assertEquals((Integer)sum, stagesSum.get(s));
-
-            StreamerContext streamerCtx = grid(0).streamer(null).context();
-
-            // Check query.
-            for (final String s : stages) {
-                Collection<Integer> res = streamerCtx.query(new C1<StreamerContext, Integer>() {
-                    @Override public Integer apply(StreamerContext ctx) {
-                        AtomicInteger cntr = ctx.<String, AtomicInteger>localSpace().get(s);
-
-                        return cntr.get();
-                    }
-                });
-
-                assertEquals(sum, F.sumInt(res));
-            }
-
-            // Check broadcast.
-            streamerCtx.broadcast(new CI1<StreamerContext>() {
-                @Override public void apply(StreamerContext ctx) {
-                    int sum = 0;
-
-                    ConcurrentMap<String, AtomicInteger> space = ctx.localSpace();
-
-                    for (String s : stages) {
-                        AtomicInteger cntr = space.get(s);
-
-                        sum += cntr.get();
-                    }
-
-                    space.put("bcast", new AtomicInteger(sum));
-                }
-            });
-
-            int bcastSum = 0;
-
-            for (int i = 0; i < 4; i++) {
-                Ignite ignite = grid(i);
-
-                ConcurrentMap<String, AtomicInteger> locSpace = ignite.streamer(null).context().localSpace();
-
-                bcastSum += locSpace.get("bcast").get();
-            }
-
-            assertEquals(sum * stages.length, bcastSum);
-
-            // Check reduce.
-            for (final String s : stages) {
-                Integer res = streamerCtx.reduce(
-                    new C1<StreamerContext, Integer>() {
-                        @Override public Integer apply(StreamerContext ctx) {
-                            AtomicInteger cntr = ctx.<String, AtomicInteger>localSpace().get(s);
-
-                            return cntr.get();
-                        }
-                    },
-                    F.sumIntReducer());
-
-                assertEquals((Integer)sum, res);
-            }
-        }
-        finally {
-            stopAllGrids(false);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testRandomRouterWithEmptyTopology() throws Exception {
-        atLeastOnce = true;
-        router = new StreamerRandomEventRouter(new IgnitePredicate<ClusterNode>() {
-            @Override public boolean apply(ClusterNode node) {
-                return false;
-            }
-        });
-        p2pEnabled = false;
-
-        SC stage = new SC() {
-            @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
-                Collection<Object> evts) {
-                return ctx.nextStageName() == null ? null : (Map)F.asMap(ctx.nextStageName(), F.asList(0));
-            }
-        };
-
-        stages = F.asList((StreamerStage)new GridTestStage("0", stage),new GridTestStage("1", stage),
-            new GridTestStage("2", stage));
-
-        startGrids(1);
-
-        try {
-            final int errCnt = 10;
-
-            final CountDownLatch errLatch = new CountDownLatch(errCnt);
-
-            grid(0).streamer(null).addStreamerFailureListener(new StreamerFailureListener() {
-                @Override public void onFailure(String stageName, Collection<Object> evts, Throwable err) {
-                    info("Expected failure: " + err.getMessage());
-
-                    errLatch.countDown();
-                }
-            });
-
-            for (int i = 0; i < errCnt; i++)
-                grid(0).streamer(null).addEvent(0);
-
-            assert errLatch.await(5, TimeUnit.SECONDS);
-        }
-        finally {
-            stopAllGrids(false);
-        }
-    }
-
-    /**
-     * @param ignite Grid to check metrics on.
-     * @param stage Stage name.
-     * @param evtCnt Event count.
-     * @param pipeline Pipeline.
-     */
-    private void checkMetrics(Ignite ignite, String stage, int evtCnt, boolean pipeline) {
-        IgniteStreamer streamer = ignite.streamer(null);
-
-        StreamerMetrics metrics = streamer.metrics();
-
-        assertEquals(evtCnt, metrics.stageTotalExecutionCount());
-        assertEquals(0, metrics.stageWaitingExecutionCount());
-        assertEquals(0, metrics.currentActiveSessions());
-        assertEquals(0, metrics.maximumActiveSessions());
-        assertEquals(0, metrics.failuresCount());
-
-        if (pipeline) {
-            assertEquals(4, metrics.pipelineMaximumExecutionNodes());
-            assertEquals(4, metrics.pipelineMinimumExecutionNodes());
-            assertEquals(4, metrics.pipelineAverageExecutionNodes());
-
-            assertTrue(metrics.pipelineMaximumExecutionTime() > 0);
-            assertTrue(metrics.pipelineMinimumExecutionTime() > 0);
-            assertTrue(metrics.pipelineAverageExecutionTime() > 0);
-        }
-        else {
-            assertEquals(0, metrics.pipelineMaximumExecutionNodes());
-            assertEquals(0, metrics.pipelineMinimumExecutionNodes());
-            assertEquals(0, metrics.pipelineAverageExecutionNodes());
-
-            assertEquals(0, metrics.pipelineMaximumExecutionTime());
-            assertEquals(0, metrics.pipelineMinimumExecutionTime());
-            assertEquals(0, metrics.pipelineAverageExecutionTime());
-        }
-
-        StreamerStageMetrics stageMetrics = streamer.metrics().stageMetrics(stage);
-
-        assertNotNull(stageMetrics);
-
-        assertTrue(stageMetrics.averageExecutionTime() > 0);
-        assertTrue(stageMetrics.minimumExecutionTime() > 0);
-        assertTrue(stageMetrics.maximumExecutionTime() > 0);
-        assertEquals(evtCnt, stageMetrics.totalExecutionCount());
-        assertEquals(0, stageMetrics.failuresCount());
-        assertFalse(stageMetrics.executing());
-    }
-
-    /**
-     * @param ignite Grid to check streamer on.
-     * @param stages Stages to check.
-     */
-    private void checkZeroMetrics(Ignite ignite, String... stages) {
-        for (String stage : stages) {
-            IgniteStreamer streamer = ignite.streamer(null);
-
-            StreamerStageMetrics metrics = streamer.metrics().stageMetrics(stage);
-
-            assertNotNull(metrics);
-
-            assertEquals(0, metrics.failuresCount());
-            assertEquals(0, metrics.averageExecutionTime());
-            assertEquals(0, metrics.minimumExecutionTime());
-            assertEquals(0, metrics.maximumExecutionTime());
-            assertFalse(metrics.executing());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridTestStage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridTestStage.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridTestStage.java
deleted file mode 100644
index b4968c8..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridTestStage.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.streamer.*;
-
-import java.util.*;
-
-/**
- * Test stage.
- */
-class GridTestStage implements StreamerStage<Object> {
-    /** Stage name. */
-    private String name;
-
-    /** Stage closure. */
-    private SC stageClos;
-
-    /**
-     * @param name Stage name.
-     * @param stageClos Stage closure to execute.
-     */
-    GridTestStage(String name, SC stageClos) {
-        this.name = name;
-        this.stageClos = stageClos;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String name() {
-        return name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Object> evts) {
-        return stageClos.apply(name(), ctx, evts);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridTestStreamerEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridTestStreamerEventRouter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridTestStreamerEventRouter.java
deleted file mode 100644
index ac81086..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/GridTestStreamerEventRouter.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.streamer.*;
-
-import java.util.*;
-
-/**
- * Test router.
- */
-class GridTestStreamerEventRouter extends StreamerEventRouterAdapter {
-    /** Route table. */
-    private Map<String, UUID> routeTbl = new HashMap<>();
-
-    /**
-     * @param stageName Stage name.
-     * @param nodeId Node id.
-     */
-    public void put(String stageName, UUID nodeId) {
-        routeTbl.put(stageName, nodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
-        UUID nodeId = routeTbl.get(stageName);
-
-        if (nodeId == null)
-            return null;
-
-        return ctx.projection().node(nodeId);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/SC.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/SC.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/SC.java
deleted file mode 100644
index ff457dd..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/streamer/SC.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.streamer.*;
-
-import java.util.*;
-
-/**
- * Typedef for generic closure.
- */
-abstract class SC
-    extends GridClosure3X<String, StreamerContext, Collection<Object>, Map<String, Collection<?>>> {
-    // No-op.
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/EventClosure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/EventClosure.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/EventClosure.java
deleted file mode 100644
index a2e2a04..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/EventClosure.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.streamer;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.lang.*;
-
-import java.util.*;
-
-/**
- * Closure for events generation.
- */
-class EventClosure implements IgniteInClosure<IgniteStreamer> {
-    /** Random range. */
-    private int rndRange = 100;
-
-    /** {@inheritDoc} */
-    @Override public void apply(IgniteStreamer streamer) {
-        Random rnd = new Random();
-
-        while (!Thread.interrupted()) {
-            try {
-                streamer.addEvent(rnd.nextInt(rndRange));
-            }
-            catch (IgniteException e) {
-                X.println("Failed to add streamer event: " + e);
-            }
-        }
-    }
-
-    /**
-     * @return Random range.
-     */
-    public int getRandomRange() {
-        return rndRange;
-    }
-
-    /**
-     * @param rndRange Random range.
-     */
-    public void setRandomRange(int rndRange) {
-        this.rndRange = rndRange;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerBenchmark.java
deleted file mode 100644
index 432ef47..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerBenchmark.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.streamer;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.springframework.beans.factory.xml.*;
-import org.springframework.context.support.*;
-import org.springframework.core.io.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Streamer benchmark.
- */
-public class GridStreamerBenchmark {
-
-    /**
-     * Entry point. Expects configuration URL to be provided.
-     *
-     * @param args Arguments. First argument is grid configuration. Second optional argument "-w" - stands for
-     *    "worker", in this case no load will be generated on the node.
-     * @throws Exception In case of any error.
-     */
-    public static void main(String[] args) throws Exception{
-        if (args.length == 0)
-            throw new IllegalArgumentException("Configuration path is not provided.");
-
-        String cfgPath = args.length > 0 ? args[0] :
-            "modules/core/src/test/config/streamer/average/spring-streamer-average-local.xml";
-
-        boolean worker = args.length > 1 && "-w".equalsIgnoreCase(args[1]);
-
-        // Get load definitions.
-        Collection<GridStreamerLoad> loads = worker ? null : loads(cfgPath);
-
-        // Start the grid.
-        Ignite ignite = G.start(cfgPath);
-
-        // Start load threads.
-        Collection<Thread> loadThreads = new HashSet<>();
-
-        if (loads != null && !loads.isEmpty()) {
-            for (GridStreamerLoad load : loads) {
-                final IgniteStreamer streamer = ignite.streamer(load.getName());
-
-                if (streamer == null)
-                    throw new Exception("Steamer is not found: " + load.getName());
-
-                List<IgniteInClosure<IgniteStreamer>> clos = load.getClosures();
-
-                if (clos != null && !clos.isEmpty()) {
-                    for (final IgniteInClosure<IgniteStreamer> clo : clos) {
-                        Thread t = new Thread(new Runnable() {
-                            @Override public void run() {
-                                try {
-                                    clo.apply(streamer);
-                                }
-                                catch (Exception e) {
-                                    X.println("Exception during execution of closure for streamer " +
-                                        "[streamer=" + streamer.name() + ", closure=" + clo + ", err=" +
-                                        e.getMessage() + ']');
-
-                                    e.printStackTrace();
-                                }
-                            }
-                        });
-
-                        loadThreads.add(t);
-
-                        t.start();
-                    }
-                }
-            }
-        }
-
-        // Once all loads are started, simply join them.
-        System.out.println("Press enter to stop running benchmark.");
-
-        try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) {
-            in.readLine();
-        }
-
-        for (Thread t : loadThreads)
-            t.interrupt();
-
-        for (Thread t : loadThreads)
-            t.join();
-    }
-
-    /**
-     * Get loads from the Spring context.
-     *
-     * @param cfgPath Configuration path.
-     * @return Collection of loads, if any.
-     * @throws Exception If failed.
-     */
-    private static Collection<GridStreamerLoad> loads(String cfgPath) throws Exception {
-        URL cfgUrl;
-
-        try {
-            cfgUrl = new URL(cfgPath);
-        }
-        catch (MalformedURLException ignore) {
-            cfgUrl = U.resolveIgniteUrl(cfgPath);
-        }
-
-        if (cfgUrl == null)
-            throw new Exception("Spring XML configuration path is invalid: " + cfgPath);
-
-        GenericApplicationContext springCtx = new GenericApplicationContext();
-
-        new XmlBeanDefinitionReader(springCtx).loadBeanDefinitions(new UrlResource(cfgUrl));
-
-        springCtx.refresh();
-
-        Map<String, GridStreamerLoad> cfgMap = springCtx.getBeansOfType(GridStreamerLoad.class);
-
-        return cfgMap.values();
-    }
-}


Mime
View raw message