ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [01/16] incubator-ignite git commit: sp-2 streaming cleanup
Date Fri, 20 Mar 2015 10:03:38 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-45 fbe025f68 -> 2be7cf033


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerIndexLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerIndexLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerIndexLoadTest.java
deleted file mode 100644
index 9c34085..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerIndexLoadTest.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.streamer;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.streamer.index.*;
-import org.apache.ignite.streamer.index.hash.*;
-import org.apache.ignite.streamer.index.tree.*;
-import org.apache.ignite.streamer.window.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.loadtests.util.GridLoadTestArgs.*;
-import static org.apache.ignite.testframework.GridTestUtils.*;
-
-/**
- * Load test for streamer index.
- */
-public class GridStreamerIndexLoadTest {
-    /**
-     * Window index configuration.
-     */
-    private enum IndexConfiguration {
-        /**
-         * Tree index with non-unique elements and no event tracking.
-         */
-        TREE_INDEX_NOT_UNIQUE {
-            /** {@inheritDoc} */
-            @Override
-            StreamerIndexProvider<Integer, Integer, Long> indexProvider() {
-                StreamerTreeIndexProvider<Integer, Integer, Long> idx = new StreamerTreeIndexProvider<>();
-
-                idx.setUpdater(new IndexUpdater());
-                idx.setUnique(false);
-                idx.setPolicy(StreamerIndexPolicy.EVENT_TRACKING_OFF);
-
-                return idx;
-            }
-        },
-
-        /**
-         * Hash index with non-unique elements and no event tracking.
-         */
-        HASH_INDEX_NOT_UNIQUE {
-            /** {@inheritDoc} */
-            @Override
-            StreamerIndexProvider<Integer, Integer, Long> indexProvider() {
-                StreamerHashIndexProvider<Integer, Integer, Long> idx = new StreamerHashIndexProvider<>();
-
-                idx.setUpdater(new IndexUpdater());
-                idx.setUnique(false);
-                idx.setPolicy(StreamerIndexPolicy.EVENT_TRACKING_OFF);
-
-                return idx;
-            }
-        };
-
-        /**
-         * @return Index provider for this index configuration.
-         */
-        abstract StreamerIndexProvider<Integer, Integer, Long> indexProvider();
-    }
-
-    /**
-     * @param args Command line arguments.
-     * @throws Exception If error occurs.
-     */
-    public static void main(String[] args) throws Exception {
-        for (IndexConfiguration idxCfg : EnumSet.allOf(IndexConfiguration.class)) {
-            X.println(">>> Running benchmark for configuration: " + idxCfg);
-
-            runBenchmark(idxCfg);
-        }
-    }
-
-    /**
-     * Runs the benchmark for the specified index configuration.
-     *
-     * @param idxCfg Index configuration.
-     * @throws Exception If error occurs.
-     */
-    public static void runBenchmark(IndexConfiguration idxCfg) throws Exception {
-        int thrCnt = getIntProperty(THREADS_CNT, 1);
-        int dur = getIntProperty(TEST_DUR_SEC, 60);
-        int winSize = getIntProperty("IGNITE_WIN_SIZE", 5000);
-
-        dumpProperties(System.out);
-
-        final StreamerBoundedSizeWindow<Integer> win = new StreamerBoundedSizeWindow<>();
-
-        win.setMaximumSize(winSize);
-        win.setIndexes(idxCfg.indexProvider());
-
-        win.start();
-
-        final AtomicLong enqueueCntr = new AtomicLong();
-
-        IgniteInternalFuture<Long> enqueueFut = runMultiThreadedAsync(new CAX() {
-            @Override public void applyx() {
-                Random rnd = new Random();
-
-                while (!Thread.currentThread().isInterrupted()) {
-                    win.enqueue(rnd.nextInt());
-
-                    enqueueCntr.incrementAndGet();
-                }
-            }
-        }, thrCnt, "generator");
-
-        final AtomicLong evictCntr = new AtomicLong();
-
-        IgniteInternalFuture<Long> evictFut = runMultiThreadedAsync(new CAX() {
-            @Override public void applyx() {
-                while (!Thread.currentThread().isInterrupted()) {
-                    win.pollEvicted();
-
-                    evictCntr.incrementAndGet();
-                }
-            }
-        }, thrCnt, "evictor");
-
-        IgniteInternalFuture<Long> collFut = runMultiThreadedAsync(new CAX() {
-            @Override public void applyx() {
-                int nSec = 0;
-                long prevEnqueue = enqueueCntr.get();
-                long prevEvict = evictCntr.get();
-
-                try {
-                    while (!Thread.currentThread().isInterrupted()) {
-                        U.sleep(1000);
-                        nSec++;
-
-                        long curEnqueue = enqueueCntr.get();
-                        long curEvict = evictCntr.get();
-
-                        X.println("Stats [enqueuePerSec=" + (curEnqueue - prevEnqueue) +
-                            ", evictPerSec=" + (curEvict - prevEvict) + ']');
-
-                        prevEnqueue = curEnqueue;
-                        prevEvict = curEvict;
-                    }
-                }
-                catch (IgniteInterruptedCheckedException ignored) {
-                    // No-op.
-                }
-
-                X.println("Final results [enqueuePerSec=" + (enqueueCntr.get() / nSec) +
-                    ", evictPerSec=" + (evictCntr.get() / nSec) + ']');
-            }
-        }, 1, "collector");
-
-        U.sleep(dur * 1000);
-
-        X.println("Finishing test.");
-
-        collFut.cancel();
-        enqueueFut.cancel();
-        evictFut.cancel();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerLoad.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerLoad.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerLoad.java
deleted file mode 100644
index 583ed56..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/GridStreamerLoad.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.streamer;
-
-import org.apache.ignite.*;
-import org.apache.ignite.lang.*;
-
-import java.util.*;
-
-/**
- * Configurable streamer load.
- */
-public class GridStreamerLoad {
-    /** Steamer name. */
-    private String name;
-
-    /** Load closures. */
-    private List<IgniteInClosure<IgniteStreamer>> clos;
-
-    /**
-     * @return Steamer name.
-     */
-    public String getName() {
-        return name;
-    }
-
-    /**
-     * @param name Steamer name.
-     */
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    /**
-     * @return Query closure.
-     */
-    public List<IgniteInClosure<IgniteStreamer>> getClosures() {
-        return clos;
-    }
-
-    /**
-     * @param clos Query closure.
-     */
-    public void setClosures(List<IgniteInClosure<IgniteStreamer>> clos) {
-        this.clos = clos;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/IndexUpdater.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/IndexUpdater.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/IndexUpdater.java
deleted file mode 100644
index 95d76f5..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/IndexUpdater.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.streamer;
-
-import org.apache.ignite.streamer.index.*;
-import org.jetbrains.annotations.*;
-
-/**
- * Streamer benchmark window index updater.
- */
-class IndexUpdater implements StreamerIndexUpdater<Integer, Integer, Long> {
-    /** {@inheritDoc} */
-    @Override public Integer indexKey(Integer evt) {
-        return evt;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public Long onAdded(StreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) {
-        return entry.value() + 1;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public Long onRemoved(StreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) {
-        return entry.value() - 1 == 0 ? null : entry.value() - 1;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Long initialValue(Integer evt, Integer key) {
-        return 1L;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/QueryClosure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/QueryClosure.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/QueryClosure.java
deleted file mode 100644
index a8f0d70..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/QueryClosure.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.streamer;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-
-/**
- * Closure for events generation.
- */
-class QueryClosure implements IgniteInClosure<IgniteStreamer> {
-    /** Sleep period (seconds). */
-    private static final int SLEEP_PERIOD_SEC = 3;
-
-    /** Random range. */
-    private int rndRange = 100;
-
-    /** Warmup time. */
-    private long warmup = 60000;
-
-    /** {@inheritDoc} */
-    @Override public void apply(IgniteStreamer streamer) {
-        X.println("Pefromrming warmup: " + warmup + "ms...");
-
-        try {
-            U.sleep(warmup);
-        }
-        catch (IgniteInterruptedCheckedException ignore) {
-            return;
-        }
-
-        long initTime = System.currentTimeMillis();
-        long initExecs = streamer.metrics().stageTotalExecutionCount();
-
-        long prevExecs = initExecs;
-
-        while (!Thread.interrupted()) {
-            try {
-                U.sleep(SLEEP_PERIOD_SEC * 1000);
-            }
-            catch (IgniteInterruptedCheckedException ignore) {
-                return;
-            }
-
-            long curTime = System.currentTimeMillis();
-            long curExecs = streamer.metrics().stageTotalExecutionCount();
-
-            long deltaExecs = curExecs - prevExecs;
-            long deltaThroughput = deltaExecs/SLEEP_PERIOD_SEC;
-
-            long totalTimeSec = (curTime - initTime) / 1000;
-            long totalExecs = curExecs - initExecs;
-            long totalThroughput = totalExecs/totalTimeSec;
-
-            X.println("Measurement: [throughput=" + deltaThroughput + " execs/sec, totalThroughput=" +
-                totalThroughput + " execs/sec]");
-
-            prevExecs = curExecs;
-        }
-    }
-
-    /**
-     * @return Random range.
-     */
-    public int getRandomRange() {
-        return rndRange;
-    }
-
-    /**
-     * @param rndRange Random range.
-     */
-    public void setRandomRange(int rndRange) {
-        this.rndRange = rndRange;
-    }
-
-    /**
-     * @return Warmup time (milliseconds)
-     */
-    public long getWarmup() {
-        return warmup;
-    }
-
-    /**
-     * @param warmup Warmup time (milliseconds)
-     */
-    public void setWarmup(long warmup) {
-        this.warmup = warmup;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestAverage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestAverage.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestAverage.java
deleted file mode 100644
index fdf2aa7..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestAverage.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.streamer.average;
-
-/**
- * Average helper class.
- */
-class TestAverage {
-    /** */
-    private int total;
-
-    /** */
-    private int cnt;
-
-    /**
-     * @param avg Average.
-     */
-    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
-    public void increment(TestAverage avg) {
-        int total;
-        int cnt;
-
-        synchronized (avg) {
-            total = avg.total;
-            cnt = avg.cnt;
-        }
-
-        increment(total, cnt);
-    }
-
-    /**
-     * @param total Increment total.
-     * @param cnt Increment count.
-     */
-    public synchronized void increment(int total, int cnt) {
-        this.total += total;
-        this.cnt += cnt;
-    }
-
-    /**
-     * @param total Total.
-     * @param cnt Count.
-     */
-    public synchronized void set(int total, int cnt) {
-        this.total = total;
-        this.cnt = cnt;
-    }
-
-    /**
-     * @return Running average.
-     */
-    public synchronized double average() {
-        return (double)total / cnt;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestStage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestStage.java b/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestStage.java
deleted file mode 100644
index 23c30fc..0000000
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/streamer/average/TestStage.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.streamer.average;
-
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.streamer.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Stage for average benchmark.
- */
-class TestStage implements StreamerStage<Integer> {
-    /** {@inheritDoc} */
-    @Override public String name() {
-        return "stage";
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Integer> evts) {
-        ConcurrentMap<String, TestAverage> loc = ctx.localSpace();
-
-        TestAverage avg = loc.get("avg");
-
-        if (avg == null)
-            avg = F.addIfAbsent(loc, "avg", new TestAverage());
-
-        for (Integer e : evts)
-            avg.increment(e, 1);
-
-        StreamerWindow<Integer> win = ctx.window();
-
-        win.enqueueAll(evts);
-
-        while (true) {
-            Integer e = win.pollEvicted();
-
-            if (e == null)
-                break;
-
-            // Subtract evicted events from running total.
-            avg.increment(-e, -1);
-        }
-
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerAbstractTest.java
index e4326bf..3eb2909 100644
--- a/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerAbstractTest.java
@@ -27,13 +27,10 @@ import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.executor.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.service.*;
-import org.apache.ignite.internal.processors.streamer.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.p2p.*;
-import org.apache.ignite.streamer.*;
-import org.apache.ignite.streamer.window.*;
 import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.junits.common.*;
 import org.jetbrains.annotations.*;
@@ -102,7 +99,6 @@ public abstract class GridMarshallerAbstractTest extends GridCommonAbstractTest
         namedCache.setAtomicityMode(TRANSACTIONAL);
 
         cfg.setMarshaller(marshaller());
-        cfg.setStreamerConfiguration(streamerConfiguration());
         cfg.setCacheConfiguration(new CacheConfiguration(), namedCache);
 
         return cfg;
@@ -113,32 +109,6 @@ public abstract class GridMarshallerAbstractTest extends GridCommonAbstractTest
      */
     protected abstract Marshaller marshaller();
 
-    /**
-     * @return Streamer configuration.
-     */
-    private static StreamerConfiguration streamerConfiguration() {
-        Collection<StreamerStage> stages = F.<StreamerStage>asList(new StreamerStage() {
-            @Override
-            public String name() {
-                return "name";
-            }
-
-            @Nullable
-            @Override
-            public Map<String, Collection<?>> run(StreamerContext ctx, Collection evts) {
-                return null;
-            }
-        });
-
-        StreamerConfiguration cfg = new StreamerConfiguration();
-
-        cfg.setAtLeastOnce(true);
-        cfg.setWindows(F.asList((StreamerWindow) new StreamerUnboundedWindow()));
-        cfg.setStages(stages);
-
-        return cfg;
-    }
-
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         marsh = grid().configuration().getMarshaller();
@@ -797,32 +767,6 @@ public abstract class GridMarshallerAbstractTest extends GridCommonAbstractTest
     }
 
     /**
-     * @throws Exception If failed.
-     */
-    public void testStreamer() throws Exception {
-        IgniteStreamer streamer = grid().streamer(null);
-
-        streamer.addEvent("test");
-
-        GridMarshallerTestBean inBean = newTestBean(streamer);
-
-        byte[] buf = marshal(inBean);
-
-        GridMarshallerTestBean outBean = unmarshal(buf);
-
-        assert inBean.getObjectField() != null;
-        assert outBean.getObjectField() != null;
-
-        assert inBean.getObjectField().getClass().equals(IgniteStreamerImpl.class);
-        assert outBean.getObjectField().getClass().equals(IgniteStreamerImpl.class);
-
-        assert inBean != outBean;
-        assert inBean.equals(outBean);
-
-        outBean.checkNullResources();
-    }
-
-    /**
      * @param obj Object field to use.
      * @return New test bean.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/streamer/index/GridStreamerIndexSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/streamer/index/GridStreamerIndexSelfTest.java b/modules/core/src/test/java/org/apache/ignite/streamer/index/GridStreamerIndexSelfTest.java
deleted file mode 100644
index 869dd94..0000000
--- a/modules/core/src/test/java/org/apache/ignite/streamer/index/GridStreamerIndexSelfTest.java
+++ /dev/null
@@ -1,686 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.index;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.streamer.index.hash.*;
-import org.apache.ignite.streamer.index.tree.*;
-import org.apache.ignite.streamer.window.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.streamer.index.StreamerIndexPolicy.*;
-import static org.apache.ignite.testframework.GridTestUtils.*;
-
-/**
- * Tests for Streamer window index.
- */
-public class GridStreamerIndexSelfTest extends GridCommonAbstractTest {
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTreeIndex() throws Exception {
-        for (StreamerIndexPolicy plc : StreamerIndexPolicy.values()) {
-            checkUniqueIndex(indexProvider(true, "idx", new UniqueStringIndexUpdater(), plc, true));
-
-            checkNonUniqueIndex(indexProvider(true, "idx", new IndexUpdater(), plc, false));
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testHashIndex() throws Exception {
-        for (StreamerIndexPolicy plc : StreamerIndexPolicy.values()) {
-            checkUniqueIndex(indexProvider(false, "idx", new UniqueStringIndexUpdater(), plc, true));
-
-            checkNonUniqueIndex(indexProvider(false, "idx", new IndexUpdater(), plc, false));
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testMultipleIndexUpdate() throws Exception {
-        StreamerIndexProvider<String, String, Integer> idxProvider =
-            indexProvider(true, "idx", new IndexUpdater(), EVENT_TRACKING_ON, false);
-
-        StreamerIndexProvider<String, String, String> idxProvider1 =
-            indexProvider(true, "idx1", new UniqueStringIndexUpdater(), EVENT_TRACKING_ON, true);
-
-        StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
-
-        win.setMaximumSize(5);
-        win.setIndexes(idxProvider, idxProvider1);
-
-        win.start();
-
-        win.enqueue("A");
-        win.enqueue("B");
-        win.enqueue("C");
-        win.enqueue("D");
-
-        // Snapshot both indexes.
-        StreamerIndex<String, String, Integer> idx = win.index("idx");
-        StreamerIndex<String, String, String> idx1 = win.index("idx1");
-
-        info("Idx: " + idx.entries(0));
-        info("Idx1: " + idx1.entries(0));
-
-        try {
-            win.enqueue("A");
-
-            fail("Exception should have been thrown.");
-        }
-        catch (IgniteException e) {
-            info("Caught expected exception: " + e);
-        }
-
-        StreamerIndex<String, String, Integer> idxAfter = win.index("idx");
-        StreamerIndex<String, String, String> idx1After = win.index("idx1");
-
-        info("Idx (after): " + idxAfter.entries(0));
-        info("Idx1 (after): " + idx1After.entries(0));
-
-        assertEquals(4, idx.entries(0).size());
-        assertEquals(4, idx1.entries(0).size());
-
-        assertTrue(F.eqOrdered(idx.entries(0), idxAfter.entries(0)));
-        assertTrue(F.eqOrdered(idx1.entries(0), idx1After.entries(0)));
-
-        idxProvider.reset();
-
-        assertEquals(4, idx.entries(0).size());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSortedIndexMultithreaded() throws Exception {
-        checkSortedIndexMultithreaded(32, 500, false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSortedIndexMultithreadedWithConcurrentPollEvicted() throws Exception {
-        checkSortedIndexMultithreaded(32, 500, true);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testUniqueHashIndexMultithreaded() throws Exception {
-        checkUniqueHashIndexMultithreaded(32, 500);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testUpdaterIndexKeyNull() throws Exception {
-        checkIndexUpdater(new IndexUpdater() {
-            @Nullable @Override public String indexKey(String evt) {
-                return "A".equals(evt) ? null : evt;
-            }
-        });
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testUpdaterInitialValueNull() throws Exception {
-        checkIndexUpdater(new IndexUpdater() {
-            @Nullable @Override public Integer initialValue(String evt, String key) {
-                return "A".equals(evt) ? null : 1;
-            }
-        });
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testUpdaterOnAddedNull() throws Exception {
-        checkIndexUpdater(new IndexUpdater() {
-            @Nullable @Override
-            public Integer onAdded(StreamerIndexEntry<String, String, Integer> entry, String evt) {
-                return "A".equals(evt) ? null : entry.value() + 1;
-            }
-        });
-    }
-
-    /**
-     * Checks the correct behaviour of {@link StreamerIndexUpdater}, given that
-     * it discards event "A" and accepts event "B".
-     *
-     * @param updater Index updater.
-     */
-    private void checkIndexUpdater(StreamerIndexUpdater<String, String, Integer> updater) {
-        List<StreamerIndexProvider<String, String, Integer>> idxps = Arrays.asList(
-            indexProvider(true, "tree", updater, StreamerIndexPolicy.EVENT_TRACKING_ON, false),
-            indexProvider(false, "hash", updater, StreamerIndexPolicy.EVENT_TRACKING_ON, false));
-
-        for (StreamerIndexProvider<String, String, Integer> idxp : idxps) {
-            StreamerUnboundedWindow<String> win = new StreamerUnboundedWindow<>();
-
-            win.setIndexes(idxp);
-
-            win.start();
-
-            win.enqueue("A");
-            win.enqueue("A");
-            win.enqueue("B");
-
-            StreamerIndex<String, Object, Object> idx = win.index(idxp.getName());
-
-            assertNotNull(idx);
-
-            assertNull(idx.entry("A"));
-
-            assertNotNull(idx.entry("B"));
-        }
-    }
-
-    /**
-     * @param treeIdx {@code True} to create tree index.
-     * @param name Name.
-     * @param updater Updater.
-     * @param plc Policy.
-     * @param unique Unique.
-     * @return Index provider.
-     */
-    private <E, K, V> StreamerIndexProvider<E, K, V> indexProvider(boolean treeIdx, String name,
-        StreamerIndexUpdater<E, K, V> updater, StreamerIndexPolicy plc, boolean unique) {
-        if (treeIdx) {
-            StreamerTreeIndexProvider<E, K, V> idx = new StreamerTreeIndexProvider<>();
-
-            idx.setName(name);
-            idx.setUpdater(updater);
-            idx.setUnique(unique);
-            idx.setPolicy(plc);
-
-            return idx;
-        }
-        else {
-            StreamerHashIndexProvider<E, K, V> idx = new StreamerHashIndexProvider<>();
-
-            idx.setName(name);
-            idx.setUpdater(updater);
-            idx.setUnique(unique);
-            idx.setPolicy(plc);
-
-            return idx;
-        }
-    }
-
-    /**
-     * @param threadCnt Thread count.
-     * @param iters Number of iterations for each worker thread.
-     * @throws Exception If failed.
-     */
-    private void checkUniqueHashIndexMultithreaded(int threadCnt, final int iters)
-        throws Exception {
-        StreamerIndexProvider<String, String, Integer> idxProvider =
-            indexProvider(false, "idx", new IndexUpdater(), EVENT_TRACKING_ON_DEDUP, true);
-
-        for (int i = 0; i < iters && !Thread.currentThread().isInterrupted(); i++) {
-            final StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
-
-            win.setMaximumSize(threadCnt * 2);
-            win.setIndexes(idxProvider);
-
-            win.start();
-
-            final String evt = "evt" + i;
-            final AtomicInteger nIdxErrors = new AtomicInteger();
-
-            // Submit the same event in multiple threads.
-            runMultiThreaded(new CAX() {
-                @Override public void applyx() {
-                    try {
-                        win.enqueue(evt);
-                    }
-                    catch (IgniteException e) {
-                        if (e.getMessage().contains("Index unique key violation"))
-                            nIdxErrors.incrementAndGet();
-                        else
-                            throw e;
-                    }
-                }
-            }, threadCnt, "put");
-
-            // Only one thread should succeed, because the index is unique.
-            assertEquals(threadCnt - 1, nIdxErrors.get());
-
-            StreamerIndex<String, String, Integer> idx = win.index("idx");
-
-            // Only one event should be present and have value 1.
-            assertEquals(1, idx.entries(0).size());
-            assertEquals((Integer)1, idx.entry(evt).value());
-        }
-    }
-
-    /**
-     * @param threadCnt Thread count.
-     * @param iters Number of iterations for each worker thread.
-     * @param pollEvicted Poll evicted events concurrently, if true.
-     * @throws Exception If failed.
-     */
-    public void checkSortedIndexMultithreaded(final int threadCnt, final int iters, final boolean pollEvicted)
-        throws Exception {
-        final StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
-
-        win.setMaximumSize(threadCnt * 2);
-        win.setIndexes(indexProvider(true, "idx", new IndexUpdater(), EVENT_TRACKING_ON_DEDUP, false));
-
-        win.start();
-
-        IgniteInternalFuture<Long> pollFut = null;
-
-        if (pollEvicted) {
-            // These threads poll evicted events from the window if it doesn't break
-            // the test invariant.
-            pollFut = runMultiThreadedAsync(new CAX() {
-                @Override public void applyx() {
-                    try {
-                        while (!Thread.currentThread().isInterrupted()) {
-                            StreamerIndex<String, String, Integer> idx = win.index("idx");
-
-                            boolean canPoll = F.forAll(
-                                idx.entries(-1 * threadCnt),
-                                new P1<StreamerIndexEntry<String, String, Integer>>() {
-                                    @Override public boolean apply(StreamerIndexEntry<String, String, Integer> e) {
-                                        return e.value() > 2;
-                                    }
-                                });
-
-                            if (!canPoll || win.pollEvicted() == null)
-                                U.sleep(50);
-                        }
-                    }
-                    catch (IgniteInterruptedCheckedException ignored) {
-                        // No-op.
-                    }
-                }
-            }, threadCnt / 4, "test-poll");
-        }
-
-        try {
-            // Each of these threads generates a single event repeatedly and checks
-            // if it is still present in the window. In the tested index events are
-            // sorted by value and the value is a number of repeated events, so, this
-            // should be invariant.
-            IgniteInternalFuture<Long> fut1 = runMultiThreadedAsync(new CAX() {
-                @Override public void applyx() {
-                    final String evt = Thread.currentThread().getName();
-                    int cntr = 1;
-
-                    for (int i = 0; i < iters && !Thread.currentThread().isInterrupted(); i++) {
-                        win.enqueue(evt);
-
-                        StreamerIndex<String, String, Integer> idx = win.index("idx");
-                        StreamerIndexEntry<String, String, Integer> entry = idx.entry(evt);
-
-                        assertNotNull(entry);
-
-                        // If concurrent eviction is disabled, check if the
-                        // value grows each time we enqueue a new event.
-                        if (!pollEvicted)
-                            assertEquals((Integer)cntr++, entry.value());
-
-                        // If queued event more than once, the first threadCnt entries
-                        // in descending order should contain an entry with this thread's event.
-                        if (i > 0)
-                            assert idx.entries(-1 * threadCnt).contains(entry);
-                    }
-                }
-            }, threadCnt / 2, "test-multi");
-
-            // This thread generates a set of single non-repeating events from 0 to iters.
-            IgniteInternalFuture<Long> fut2 = runMultiThreadedAsync(new CAX() {
-                @Override public void applyx() {
-                    for (int i = 0; i < iters && !Thread.currentThread().isInterrupted(); i++)
-                        win.enqueue(String.valueOf(i));
-                }
-            }, 1, "test-single");
-
-            fut2.get(getTestTimeout());
-            fut1.get(getTestTimeout());
-        }
-        finally {
-            if (pollFut != null)
-                pollFut.cancel();
-        }
-    }
-
-    /**
-     * @param idx Index.
-     */
-    private void checkNonUniqueIndex(StreamerIndexProvider<String, String, Integer> idx) {
-        assert !idx.isUnique();
-
-        StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
-
-        win.setMaximumSize(5);
-        win.setIndexes(idx);
-
-        win.start();
-
-        for (int i = 0; i < 20; ) {
-            win.enqueue("A" + i); i++;
-            win.enqueue("B"); i++;
-            win.enqueue("C"); i++;
-            win.enqueue("D"); i++;
-        }
-
-        StreamerIndex<String, String, Integer> idx0 = win.index("idx");
-
-        String s;
-
-        while ((s = win.pollEvicted()) != null)
-            info("Evicted String: " + s);
-
-        StreamerIndex<String, String, Integer> idx1 = win.index("idx");
-
-        if (idx instanceof StreamerTreeIndexProvider) { // Tree index.
-            assert idx0.sorted();
-
-            // Users with unique names.
-            for (StreamerIndexEntry<String, String, Integer> e : idx0.entrySet(1)) {
-                info("Entry [e=" + e + ", evts=" + e.events() + ']');
-
-                if (idx.getPolicy() == EVENT_TRACKING_ON || idx.getPolicy() == EVENT_TRACKING_ON_DEDUP) {
-                    assertEquals(1, e.events().size());
-                    assertEquals('A', F.first(e.events()).charAt(0));
-                }
-            }
-
-            assertTrue(idx0.entrySet(2).isEmpty());
-
-            for (StreamerIndexEntry<String, String, Integer> e : idx0.entrySet(5)) {
-                info("Entry [e=" + e + ", evts=" + e.events() + ']');
-
-                if (idx.getPolicy() == EVENT_TRACKING_ON)
-                    assertEquals(5, e.events().size());
-
-                else if (idx.getPolicy() == EVENT_TRACKING_ON_DEDUP)
-                    assertEquals(1, e.events().size());
-
-                else
-                    assertNull(e.events());
-            }
-
-            assertEquals(5, idx0.entrySet(1).size());
-
-            List<StreamerIndexEntry<String, String, Integer>> asc =
-                new ArrayList<>(idx0.entrySet(true, null, true, null, true));
-            List<StreamerIndexEntry<String, String, Integer>> desc =
-                new ArrayList<>(idx0.entrySet(false, null, true, null, true));
-
-            assertEquals(8, asc.size());
-            assertEquals(8, desc.size());
-
-            for (int i = 0; i < asc.size(); i++)
-                assertEquals(asc.get(i), desc.get(desc.size() - i - 1));
-
-            try {
-                idx0.entrySet(true, 10, true, -10, true);
-
-                assert false;
-            }
-            catch (IllegalArgumentException e) {
-                info("Caught expected exception: " + e);
-            }
-
-            try {
-                idx0.entrySet(false, -10, true, 10, true);
-
-                assert false;
-            }
-            catch (IllegalArgumentException e) {
-                info("Caught expected exception: " + e);
-            }
-        }
-        else
-            assert !idx0.sorted();
-
-        assertEquals(4, idx1.size());
-
-        for (StreamerIndexEntry<String, String, Integer> e : idx1.entries(0)) {
-            Collection<String> evts = e.events();
-
-            info("Entry [e=" + e + ", evts=" + evts + ']');
-
-            if (idx.getPolicy() == EVENT_TRACKING_ON) {
-                assert evts != null;
-
-                switch (evts.size()) {
-                    case 1:
-                        assert F.containsAny(evts, "A16", "B", "C") : "Wrong tracked event: " + F.first(evts);
-
-                        break;
-
-                    case 2:
-                        Collection<String> dedup = F.dedup(evts);
-
-                        assert dedup.size() == 1 && "D".equals(F.first(dedup)) : "Wrong tracked events: " + evts;
-
-                        break;
-
-                    default:
-                        fail("Wrong tracked events: " + evts);
-                }
-            }
-            else if (idx.getPolicy() == EVENT_TRACKING_ON_DEDUP)
-                assert evts != null && evts.size() == 1 && F.containsAny(evts, "A16", "B", "C", "D") :
-                    "Wrong tracked events: " + evts;
-            else if (idx.getPolicy() == EVENT_TRACKING_OFF)
-                assert evts == null;
-        }
-
-        // Check that idx0 is unaffected.
-        assertEquals(8, idx0.size());
-
-        idx.reset();
-
-        assertEquals(0, idx.index().size());
-        assertEquals(8, idx0.size());
-    }
-
-    /**
-     * @param idx Index.
-     */
-    private void checkUniqueIndex(StreamerIndexProvider<String, String, String> idx) {
-        assert idx.isUnique();
-
-        StreamerBoundedSizeWindow<String> win = new StreamerBoundedSizeWindow<>();
-
-        win.setMaximumSize(5);
-        win.setIndexes(idx);
-
-        win.start();
-
-        for (int i = 0; i < 20; i++)
-            win.enqueue("A" + i);
-
-        for (int i = 0; i < 20; i++) {
-            try {
-                win.enqueue("A" + i);
-
-                fail("Exception should have been thrown.");
-            }
-            catch (IgniteException e) {
-                info("Caught expected exception: " + e);
-            }
-        }
-
-        StreamerIndex<String, String, String> idx0 = win.index("idx");
-
-        String s;
-
-        while ((s = win.pollEvicted()) != null)
-            info("Evicted string: " + s);
-
-        StreamerIndex<String, String, String> idx1 = win.index("idx");
-
-        if (idx instanceof StreamerTreeIndexProvider) { // Tree index.
-            assert idx0.sorted();
-
-            // Users with unique names.
-            for (StreamerIndexEntry<String, String, String> e : idx0.entrySet("A0")) {
-                info("Entry [e=" + e + ", evts=" + e.events() + ']');
-
-                if (idx.getPolicy() == EVENT_TRACKING_ON || idx.getPolicy() == EVENT_TRACKING_ON_DEDUP) {
-                    assertEquals(1, e.events().size());
-                    assertEquals('A', F.first(e.events()).charAt(0));
-                }
-            }
-
-            assertTrue(idx0.entrySet("B").isEmpty());
-
-            assertEquals(1, idx0.entrySet("A0").size());
-
-            List<StreamerIndexEntry<String, String, String>> asc =
-                new ArrayList<>(idx0.entrySet(true, null, true, null, true));
-            List<StreamerIndexEntry<String, String, String>> desc =
-                new ArrayList<>(idx0.entrySet(false, null, true, null, true));
-
-            assertEquals(20, asc.size());
-            assertEquals(20, desc.size());
-
-            for (int i = 0; i < asc.size(); i++)
-                assertEquals(asc.get(i), desc.get(desc.size() - i - 1));
-        }
-        else
-            assert !idx0.sorted();
-
-        assertEquals(5, idx1.size());
-
-        for (StreamerIndexEntry<String, String, String> e : idx1.entries(0)) {
-            Collection<String> evts = e.events();
-
-            info("Entry [e=" + e + ", evts=" + evts + ']');
-
-            if (idx.getPolicy() == EVENT_TRACKING_ON || idx.getPolicy() == EVENT_TRACKING_ON_DEDUP) {
-                assert evts != null && evts.size() == 1 : "Wrong tracked events: " + evts;
-
-                int i = Integer.parseInt(F.first(evts).substring(1));
-
-                assert i >= 15 && i < 20 : "Wrong event: " + F.first(evts);
-            }
-            else if (idx.getPolicy() == EVENT_TRACKING_OFF)
-                assert evts == null;
-        }
-
-        // Check that idx0 is unaffected.
-        assertEquals(20, idx0.size());
-
-        idx.reset();
-
-        assertEquals(0, idx.index().size());
-        assertEquals(20, idx0.size());
-    }
-
-    /**
-     * Name index updater.
-     */
-    private static class IndexUpdater implements StreamerIndexUpdater<String, String, Integer> {
-        /** {@inheritDoc} */
-        @Nullable @Override public String indexKey(String evt) {
-            return evt;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Integer initialValue(String evt, String key) {
-            return 1;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Integer onAdded(StreamerIndexEntry<String, String, Integer> entry, String evt) {
-            return entry.value() + 1;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Integer onRemoved(StreamerIndexEntry<String, String, Integer> entry,
-            String evt) {
-            int res = entry.value() - 1;
-
-            return res == 0 ? null : res;
-        }
-    }
-
-    /**
-     * Name index updater.
-     */
-    private static class HashIndexUpdater implements StreamerIndexUpdater<String, String, Integer> {
-        /** {@inheritDoc} */
-        @Nullable @Override public String indexKey(String evt) {
-            return evt;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Integer initialValue(String evt, String key) {
-            return 1;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Integer onAdded(StreamerIndexEntry<String, String, Integer> entry, String evt) {
-            return entry.value() + 1;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Integer onRemoved(StreamerIndexEntry<String, String, Integer> entry,
-            String evt) {
-            int res = entry.value() - 1;
-
-            return res == 0 ? null : res;
-        }
-    }
-
-    /**
-     * Name index updater.
-     */
-    private static class UniqueStringIndexUpdater implements StreamerIndexUpdater<String, String, String> {
-        /** {@inheritDoc} */
-        @Nullable @Override public String indexKey(String evt) {
-            return evt;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public String initialValue(String evt, String key) {
-            return evt;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public String onAdded(StreamerIndexEntry<String, String, String> entry, String evt) {
-            throw new IgniteException("Unique key violation: " + evt);
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public String onRemoved(StreamerIndexEntry<String, String, String> entry,
-            String evt) {
-            // On remove we return null as index is unique.
-            return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/streamer/window/GridStreamerWindowSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/streamer/window/GridStreamerWindowSelfTest.java b/modules/core/src/test/java/org/apache/ignite/streamer/window/GridStreamerWindowSelfTest.java
deleted file mode 100644
index 27ce309..0000000
--- a/modules/core/src/test/java/org/apache/ignite/streamer/window/GridStreamerWindowSelfTest.java
+++ /dev/null
@@ -1,911 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer.window;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.streamer.*;
-import org.apache.ignite.testframework.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Streamer window self test.
- */
-public class GridStreamerWindowSelfTest extends GridCommonAbstractTest {
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedSizeWindowValidation() throws Exception {
-        final StreamerBoundedSizeWindow win = new StreamerBoundedSizeWindow();
-
-        win.start();
-
-        win.setMaximumSize(-1);
-
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                win.start();
-
-                return null;
-            }
-        }, IgniteException.class, null);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedTimeWindowValidation() throws Exception {
-        final StreamerBoundedTimeWindow win = new StreamerBoundedTimeWindow();
-
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                win.start();
-
-                return null;
-            }
-        }, IgniteException.class, null);
-
-        win.setTimeInterval(1);
-
-        win.start();
-
-        win.setMaximumSize(-1);
-
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                win.start();
-
-                return null;
-            }
-        }, IgniteException.class, null);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedSizeBatchWindowValidation() throws Exception {
-        final StreamerBoundedSizeBatchWindow win = new StreamerBoundedSizeBatchWindow();
-
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                win.start();
-
-                return null;
-            }
-        }, IgniteException.class, null);
-
-        win.setBatchSize(1);
-
-        win.start();
-
-        win.setMaximumBatches(-1);
-
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                win.start();
-
-                return null;
-            }
-        }, IgniteException.class, null);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedTimeBatchWindowValidation() throws Exception {
-        final StreamerBoundedTimeBatchWindow win = new StreamerBoundedTimeBatchWindow();
-
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                win.start();
-
-                return null;
-            }
-        }, IgniteException.class, null);
-
-        win.setBatchSize(1);
-
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                win.start();
-
-                return null;
-            }
-        }, IgniteException.class, null);
-
-        win.setBatchTimeInterval(1);
-        win.setBatchSize(-1);
-
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                win.start();
-
-                return null;
-            }
-        }, IgniteException.class, null);
-
-        win.setBatchSize(1);
-
-        win.start();
-
-        win.setMaximumBatches(-1);
-
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                win.start();
-
-                return null;
-            }
-        }, IgniteException.class, null);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedWindow() throws Exception {
-        final StreamerBoundedSizeWindow<Integer> win = new StreamerBoundedSizeWindow<>();
-
-        win.setMaximumSize(50);
-
-        win.start();
-
-        for (int i = 0; i < 50; i++)
-            win.enqueue(i);
-
-        assertNull(win.pollEvicted());
-
-        for(int i = 50; i < 60; i++)
-            win.enqueue(i);
-
-        for (int i = 0; i < 10; i++)
-            assert i == win.pollEvicted();
-
-        assertNull(win.pollEvicted());
-
-        checkIterator(win);
-
-        win.setMaximumSize(2);
-
-        win.start();
-
-        win.enqueue(3, 2, 1);
-
-        checkSnapshot(win.snapshot(true), 3, 2, 1);
-        checkSnapshot(win.snapshot(false), 2, 1);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedWindowUnique() throws Exception {
-        final StreamerBoundedSizeWindow<Integer> win = new StreamerBoundedSizeWindow<>();
-
-        win.setMaximumSize(50);
-        win.setUnique(true);
-
-        win.start();
-
-        for (int i = 0; i < 50; i++)
-            win.enqueue(i);
-
-        for (int i = 0; i < 50; i++)
-            win.enqueue(i);
-
-        assertNull(win.pollEvicted());
-
-        int idx = 0;
-
-        for (Object evt : win) {
-            Integer next = (Integer)evt;
-
-            assertEquals((Integer)idx++, next);
-        }
-
-        checkIterator(win);
-
-        win.setMaximumSize(2);
-
-        win.start();
-
-        win.enqueue(3, 2, 1, 3);
-
-        checkSnapshot(win.snapshot(true), 3, 2, 1);
-        checkSnapshot(win.snapshot(false), 2, 1);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedSortedWindow() throws Exception {
-        final StreamerBoundedSizeSortedWindow<Integer> win = new StreamerBoundedSizeSortedWindow<>();
-
-        win.setMaximumSize(60);
-
-        win.start();
-
-        for (int i = 59; i >= 0; i--)
-            win.enqueue(i);
-
-        assertNull(win.pollEvicted());
-
-        for (int i = 59; i >= 0; i--)
-            win.enqueue(i);
-
-        for (int i = 59; i >= 30; i--) {
-            assert i == win.pollEvicted();
-            assert i == win.pollEvicted();
-        }
-
-        assertNull(win.pollEvicted());
-
-        checkIterator(win);
-
-        win.setMaximumSize(2);
-
-        win.start();
-
-        win.enqueue(3, 2, 1, 4);
-
-        checkSnapshot(win.snapshot(true), 1, 2, 3, 4);
-        checkSnapshot(win.snapshot(false), 3, 4);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedSortedWindowUnique() throws Exception {
-        final StreamerBoundedSizeSortedWindow<Integer> win = new StreamerBoundedSizeSortedWindow<>();
-
-        win.setMaximumSize(-1);
-
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                win.start();
-
-                return null;
-            }
-        }, IgniteException.class, null);
-
-        win.setMaximumSize(60);
-        win.setUnique(true);
-
-        win.start();
-
-        for (int i = 59; i >= 0; i--)
-            win.enqueue(i);
-
-        assertNull(win.pollEvicted());
-
-        for (int i = 59; i >= 0; i--)
-            win.enqueue(i);
-
-        assertNull(win.pollEvicted());
-
-        for (int i = 99; i >= 60; i--)
-            win.enqueue(i);
-
-        for (int i = 99; i >= 60; i--)
-            assert i == win.pollEvicted();
-
-        assertNull(win.pollEvicted());
-
-        checkIterator(win);
-
-        win.setMaximumSize(2);
-
-        win.start();
-
-        win.enqueue(3, 2, 1, 3, 4);
-
-        checkSnapshot(win.snapshot(true), 1, 2, 3, 4);
-        checkSnapshot(win.snapshot(false), 3, 4);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedSizeBatchDequeueWindow() throws Exception {
-        final StreamerBoundedSizeBatchWindow<Integer> win = new StreamerBoundedSizeBatchWindow<>();
-
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                win.start();
-
-                return null;
-            }
-        }, IgniteException.class, null);
-
-        win.setBatchSize(10);
-        win.setMaximumBatches(2);
-
-        win.start();
-
-        for (int i = 0; i < 20; i++)
-            win.enqueue(i);
-
-        assertNull(win.pollEvicted());
-        assertEquals(0, win.pollEvictedBatch().size());
-
-        win.enqueue(20);
-
-        Collection<Integer> evicted = win.pollEvictedBatch();
-
-        assertEquals(10, evicted.size());
-
-        Iterator<Integer> it = evicted.iterator();
-
-        for (int i = 0; i < 10; i++)
-            assert i == it.next();
-
-        assertNull(win.pollEvicted());
-        assertEquals(0, win.pollEvictedBatch().size());
-
-        for (int i = 21; i < 30; i++)
-            win.enqueue(i);
-
-        assertNull(win.pollEvicted());
-        assertEquals(0, win.pollEvictedBatch().size());
-
-        win.enqueue(30);
-
-        assert 10 == win.pollEvicted();
-
-        evicted = win.pollEvictedBatch();
-
-        assertEquals(9, evicted.size());
-
-        it = evicted.iterator();
-
-        for (int i = 11; i < 20; i++)
-            assert i == it.next();
-
-        assertNull(win.pollEvicted());
-        assertEquals(0, win.pollEvictedBatch().size());
-
-        checkIterator(win);
-
-        win.setMaximumBatches(2);
-        win.setBatchSize(2);
-
-        win.start();
-
-        win.enqueue(1, 2, 3, 4, 5, 6, 7);
-
-        // We expect that the first two batches will be evicted.
-        checkSnapshot(win.snapshot(true), 1, 2, 3, 4, 5, 6, 7);
-        checkSnapshot(win.snapshot(false), 5, 6, 7);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedTimeDequeueWindow() throws Exception {
-        final StreamerBoundedTimeWindow<Integer> win = new StreamerBoundedTimeWindow<>();
-
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                win.start();
-
-                return null;
-            }
-        }, IgniteException.class, null);
-
-        win.setMaximumSize(60);
-        win.setTimeInterval(40);
-
-        win.start();
-
-        for (int i = 59; i >= 0; i--)
-            win.enqueue(i);
-
-        assertNull(win.pollEvicted());
-
-        for (int i = 59; i >= 0; i--)
-            win.enqueue(i);
-
-        for (int i = 59; i >= 0; i--)
-            assert i == win.pollEvicted();
-
-        assertNull(win.pollEvicted());
-
-        checkIterator(win);
-
-        win.setMaximumSize(2);
-        win.setTimeInterval(200);
-
-        win.start();
-
-        win.enqueue(1, 2, 3);
-
-        checkSnapshot(win.snapshot(true), 1, 2, 3);
-        checkSnapshot(win.snapshot(false), 2, 3);
-
-        U.sleep(400);
-
-        win.enqueue(4);
-
-        checkSnapshot(win.snapshot(true), 1, 2, 3, 4);
-        checkSnapshot(win.snapshot(false), 4);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedTimeBatchDequeueWindow() throws Exception {
-        final StreamerBoundedTimeBatchWindow<Integer> win = new StreamerBoundedTimeBatchWindow<>();
-
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                win.start();
-
-                return null;
-            }
-        }, IgniteException.class, null);
-
-        win.setBatchSize(50);
-        win.setBatchTimeInterval(500);
-        win.setMaximumBatches(2);
-
-        win.start();
-
-        for (int i = 0; i < 25; i++)
-            win.enqueue(i);
-
-        U.sleep(1000);
-
-        Collection<Integer> evicted = win.pollEvictedBatch();
-
-        assertNotNull(evicted);
-        assertEquals(25, evicted.size());
-
-        for (int i = 0; i < 101; i++)
-            win.enqueue(i);
-
-        evicted = win.pollEvictedBatch();
-
-        assertNotNull(evicted);
-        assertEquals(50, evicted.size());
-
-        U.sleep(1000);
-
-        evicted = win.pollEvictedBatch();
-
-        assertNotNull(evicted);
-        assertEquals(50, evicted.size());
-
-        evicted = win.pollEvictedBatch();
-
-        assertNotNull(evicted);
-        assertEquals(1, evicted.size());
-
-        checkIterator(win);
-
-        win.setMaximumBatches(2);
-        win.setBatchSize(2);
-        win.setBatchTimeInterval(200);
-
-        win.start();
-
-        win.enqueue(1, 2, 3, 4, 5, 6, 7);
-
-        // We expect that the first two batches will be evicted.
-        checkSnapshot(win.snapshot(true), 1, 2, 3, 4, 5, 6, 7);
-        checkSnapshot(win.snapshot(false), 5, 6, 7);
-
-        U.sleep(400);
-
-        checkSnapshot(win.snapshot(true), 1, 2, 3, 4, 5, 6, 7);
-        checkSnapshot(win.snapshot(false));
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testUnboundedDequeueWindow() throws Exception {
-        final StreamerUnboundedWindow<Integer> win = new StreamerUnboundedWindow<>();
-
-        win.start();
-
-        for (int i = 0; i < 50; i++)
-            win.enqueue(i);
-
-        assertNull(win.pollEvicted());
-
-        assert win.size() == 50;
-
-        checkIterator(win);
-
-        win.reset();
-
-        win.enqueue(3, 1, 2);
-
-        checkSnapshot(win.snapshot(true), 3, 1, 2);
-        checkSnapshot(win.snapshot(false), 3, 1, 2);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedSizeDequeueWindowMultithreaded() throws Exception {
-        StreamerBoundedSizeWindow<Integer> win = new StreamerBoundedSizeWindow<>();
-
-        win.setMaximumSize(500);
-        win.setUnique(false);
-
-        win.start();
-
-        checkWindowMultithreaded(win, 100000, 10, 1000);
-
-        win.consistencyCheck();
-
-        finalChecks(win, 500);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedSizeDequeueWindowUniqueMultithreaded() throws Exception {
-        StreamerBoundedSizeWindow<Integer> win = new StreamerBoundedSizeWindow<>();
-
-        win.setMaximumSize(500);
-        win.setUnique(true);
-
-        win.start();
-
-        checkWindowMultithreaded(win, 100000, 10, 1000);
-
-        win.consistencyCheck();
-
-        finalChecks(win, 500);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedSizeBatchDequeueWindowMultithreaded() throws Exception {
-        StreamerBoundedSizeBatchWindow<Integer> win = new StreamerBoundedSizeBatchWindow<>();
-
-        win.setMaximumBatches(10);
-        win.setBatchSize(50);
-
-        win.start();
-
-        checkWindowMultithreaded(win, 100000, 10, 1000);
-
-        win.consistencyCheck();
-
-        finalChecks(win, 500);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedSizeSortedDequeueWindowMultithreaded() throws Exception {
-        StreamerBoundedSizeSortedWindow<Integer> win = new StreamerBoundedSizeSortedWindow<>();
-
-        win.setMaximumSize(500);
-        win.setUnique(false);
-
-        win.start();
-
-        checkWindowMultithreaded(win, 100000, 10, 1000);
-
-        win.consistencyCheck();
-
-        finalChecks(win, 500);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedSizeSortedDequeueWindowUniqueMultithreaded() throws Exception {
-        StreamerBoundedSizeSortedWindow<Integer> win = new StreamerBoundedSizeSortedWindow<>();
-
-        win.setMaximumSize(500);
-        win.setUnique(true);
-
-        win.start();
-
-        checkWindowMultithreaded(win, 100000, 10, 1000);
-
-        win.consistencyCheck();
-
-        finalChecks(win, 500);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedTimeDequeueWindowMultithreaded() throws Exception {
-        StreamerBoundedTimeWindow<Integer> win = new StreamerBoundedTimeWindow<>();
-
-        win.setMaximumSize(500);
-        win.setTimeInterval(40); // 40ms time interval.
-        win.setUnique(false);
-
-        win.start();
-
-        checkWindowMultithreaded(win, 100000, 10, 1000);
-
-        win.consistencyCheck();
-
-        finalChecks(win, 500);
-
-        U.sleep(1000);
-
-        finalChecks(win, 0);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedTimeDequeueWindowUniqueMultithreaded() throws Exception {
-        StreamerBoundedTimeWindow<Integer> win = new StreamerBoundedTimeWindow<>();
-
-        win.setMaximumSize(500);
-        win.setTimeInterval(40); // 40ms time interval.
-        win.setUnique(true);
-
-        win.start();
-
-        checkWindowMultithreaded(win, 100000, 10, 1000);
-
-        win.consistencyCheck();
-
-        finalChecks(win, 500);
-
-        U.sleep(1000);
-
-        finalChecks(win, 0);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBoundedTimeBatchDequeueWindowMultithreaded() throws Exception {
-        StreamerBoundedTimeBatchWindow<Integer> win = new StreamerBoundedTimeBatchWindow<>();
-
-        win.setMaximumBatches(10);
-        win.setBatchTimeInterval(100);
-        win.setBatchSize(50);
-
-        win.start();
-
-        checkWindowMultithreaded(win, 100000, 10, 1000);
-
-        win.consistencyCheck();
-
-        finalChecks(win, 500);
-
-        U.sleep(1000);
-
-        finalChecks(win, 0);
-    }
-
-    /**
-     * Check iterator behaviour.
-     *
-     * @param win Window.
-     * @throws Exception If failed.
-     */
-    private void checkIterator(StreamerWindow<Integer> win) throws Exception {
-        win.reset();
-
-        assert win.size() == 0;
-
-        win.enqueue(1);
-
-        assert win.size() == 1;
-
-        final Iterator<Integer> iter = win.iterator();
-
-        win.enqueue(2);
-
-        assert win.size() == 2;
-
-        assert iter.hasNext();
-
-        GridTestUtils.assertThrows(log(), new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                iter.remove();
-
-                return null;
-            }
-        }, IllegalStateException.class, null);
-
-        assert iter.next() == 1;
-
-        iter.remove();
-
-        assert !iter.hasNext();
-
-        GridTestUtils.assertThrows(log(), new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                iter.next();
-
-                return null;
-            }
-        }, NoSuchElementException.class, null);
-
-        assert win.size() == 1;
-    }
-
-    /**
-     * Final checks.
-     *
-     * @param win Window to check.
-     * @param maxSize Max window size.
-     */
-    private void finalChecks(StreamerWindow<Integer> win, int maxSize) {
-        int evictQueueSize = win.evictionQueueSize();
-
-        info("Eviction queue size for final checks: " + evictQueueSize);
-
-        Collection<Integer> evicted = win.pollEvictedAll();
-
-        info("Evicted entries in final checks: " + evicted.size());
-
-        int winSize = win.size();
-
-        win.pollEvictedAll();
-
-        assertTrue("Unexpected window size [winSize=" + winSize + " maxSize=" + maxSize + ']', winSize <= maxSize);
-    }
-
-    /**
-     * @param win Window to check.
-     * @param iterCnt Iteration count.
-     * @param threadCnt Thread count.
-     * @param range Range for key generation.
-     * @throws Exception If failed.
-     */
-    private void checkWindowMultithreaded(
-        final StreamerWindow<Integer> win,
-        final int iterCnt,
-        int threadCnt,
-        final int range
-    ) throws Exception {
-        final AtomicInteger polled = new GridAtomicInteger();
-
-        final AtomicInteger added = new GridAtomicInteger();
-
-        IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                Random rnd = new Random();
-
-                for (int i = 0; i < iterCnt; i++) {
-                    if (i > 0 && i % 10000 == 0)
-                        info("Finished " + i + " iterations");
-
-                    int op = rnd.nextInt(8);
-
-                    switch (op) {
-                        case 0: {
-                            // Add.
-                            for (int j = 0; j < 30; j++)
-                                win.enqueue(rnd.nextInt(range));
-
-                            added.addAndGet(30);
-
-                            break;
-                        }
-
-                        case 1: {
-                            // Add bunch.
-                            for (int j = 0; j < 10; j++)
-                                win.enqueue(rnd.nextInt(range), rnd.nextInt(range), rnd.nextInt(range),
-                                    rnd.nextInt(range), rnd.nextInt(range), rnd.nextInt(range));
-
-                            added.addAndGet(10 * 6);
-
-                            break;
-                        }
-
-                        case 2: {
-                            Object o = win.pollEvicted();
-
-                            if (o != null)
-                                polled.incrementAndGet();
-
-                            break;
-                        }
-
-                        case 3: {
-                            Collection<Integer> p0 = win.pollEvicted(50);
-
-                            polled.addAndGet(p0.size());
-
-                            break;
-                        }
-
-                        case 4: {
-                            Collection<Integer> p0 = win.pollEvictedBatch();
-
-                            polled.addAndGet(p0.size());
-
-                            break;
-                        }
-
-                        case 5: {
-                            Object o = win.dequeue();
-
-                            if (o != null)
-                                polled.incrementAndGet();
-
-                            break;
-                        }
-
-                        case 6: {
-                            Collection<Integer> p0 = win.dequeue(50);
-
-                            polled.addAndGet(p0.size());
-
-                            break;
-                        }
-
-                        case 7: {
-                            Iterator<Integer> it = win.iterator();
-
-                            while (it.hasNext()) {
-                                it.next();
-
-                                if (rnd.nextInt(10) == 5) {
-                                    it.remove();
-
-                                    polled.incrementAndGet();
-                                }
-                            }
-
-                            break;
-                        }
-                    }
-                }
-
-                return null;
-            }
-        }, threadCnt);
-
-        fut.get();
-
-        // Cannot assert on added, polled and window size because iterator does not return status.
-        info("Window size: " + win.size());
-        info("Added=" + added.get() + ", polled=" + polled.get());
-    }
-
-    /**
-     * Check snapshto content.
-     *
-     * @param snapshot Snapshot.
-     * @param vals Expected values.
-     */
-    private void checkSnapshot(Collection<Integer> snapshot, Object... vals) {
-        assert snapshot.size() == vals.length;
-
-        int i = 0;
-
-        for (Object evt : snapshot)
-            assertTrue(F.eq(evt, vals[i++]));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
index 014f229..8ae4596 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
@@ -186,16 +186,6 @@ public class IgniteMock implements Ignite {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteStreamer streamer(@Nullable String name) {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgniteStreamer> streamers() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
     @Override public <T extends IgnitePlugin> T plugin(String name) throws PluginNotFoundException {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 6091338..7ae237f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -70,9 +70,6 @@ public class IgniteBasicTestSuite extends TestSuite {
         suite.addTestSuite(GridLifecycleAwareSelfTest.class);
         suite.addTestSuite(GridMessageListenSelfTest.class);
 
-        // Streamer.
-        suite.addTest(IgniteStreamerSelfTestSuite.suite());
-
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamerSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamerSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamerSelfTestSuite.java
deleted file mode 100644
index 0245b1c..0000000
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamerSelfTestSuite.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.testsuites;
-
-import junit.framework.*;
-import org.apache.ignite.internal.processors.streamer.*;
-import org.apache.ignite.streamer.index.*;
-import org.apache.ignite.streamer.window.*;
-
-/**
- * Streamer test suite.
- */
-public class IgniteStreamerSelfTestSuite {
-    /**
-     * @return Test suite.
-     * @throws Exception If failed.
-     */
-    public static TestSuite suite() throws Exception {
-        TestSuite suite = new TestSuite("Ignite Streamer Test Suite.");
-
-        // Streamer.
-        suite.addTestSuite(GridStreamerWindowSelfTest.class);
-        suite.addTestSuite(GridStreamerEvictionSelfTest.class);
-        suite.addTestSuite(GridStreamerSelfTest.class);
-        suite.addTestSuite(GridStreamerFailoverSelfTest.class);
-        suite.addTestSuite(GridStreamerIndexSelfTest.class);
-        suite.addTestSuite(GridStreamerLifecycleAwareSelfTest.class);
-
-        return suite;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
index 705fa27..1b9c4d2 100644
--- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
+++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
@@ -135,13 +135,6 @@ public class IgniteSpringBean implements Ignite, DisposableBean, InitializingBea
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgniteStreamer> streamers() {
-        assert g != null;
-
-        return g.streamers();
-    }
-
-    /** {@inheritDoc} */
     @Override public IgniteCompute compute() {
         assert g != null;
 
@@ -268,13 +261,6 @@ public class IgniteSpringBean implements Ignite, DisposableBean, InitializingBea
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public IgniteStreamer streamer(@Nullable String name) {
-        assert g != null;
-
-        return g.streamer(name);
-    }
-
-    /** {@inheritDoc} */
     @Override public <T extends IgnitePlugin> T plugin(String name) throws PluginNotFoundException {
         assert g != null;
 


Mime
View raw message