ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [55/65] [abbrv] incubator-ignite git commit: # ignite-63
Date Thu, 22 Jan 2015 21:27:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/continuous/GridEventConsumeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/continuous/GridEventConsumeSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/continuous/GridEventConsumeSelfTest.java
deleted file mode 100644
index 564c876..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/continuous/GridEventConsumeSelfTest.java
+++ /dev/null
@@ -1,1079 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.continuous;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.optimized.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.gridgain.testframework.junits.common.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.events.IgniteEventType.*;
-import static org.gridgain.grid.kernal.processors.continuous.GridContinuousProcessor.*;
-
-/**
- * Event consume test.
- */
-public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
-    /** */
-    private static final String PRJ_PRED_CLS_NAME = "org.gridgain.grid.tests.p2p.GridEventConsumeProjectionPredicate";
-
-    /** */
-    private static final String FILTER_CLS_NAME = "org.gridgain.grid.tests.p2p.GridEventConsumeFilter";
-
-    /** Grids count. */
-    private static final int GRID_CNT = 3;
-
-    /** Number of created consumes per thread in multithreaded test. */
-    private static final int CONSUME_CNT = 500;
-
-    /** Consume latch. */
-    private static volatile CountDownLatch consumeLatch;
-
-    /** Consume counter. */
-    private static volatile AtomicInteger consumeCnt;
-
-    /** Include node flag. */
-    private boolean include;
-
-    /** No automatic unsubscribe flag. */
-    private boolean noAutoUnsubscribe;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        if (include)
-            cfg.setUserAttributes(F.asMap("include", true));
-
-        cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        assertTrue(GRID_CNT > 1);
-
-        include = true;
-
-        startGridsMultiThreaded(GRID_CNT - 1);
-
-        include = false;
-
-        startGrid(GRID_CNT - 1);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        assertEquals(GRID_CNT, grid(0).nodes().size());
-
-        for (int i = 0; i < GRID_CNT; i++) {
-            GridKernal grid = (GridKernal)grid(i);
-
-            GridContinuousProcessor proc = grid.context().continuous();
-
-            if (noAutoUnsubscribe) {
-                localRoutines(proc).clear();
-
-                U.<Map>field(proc, "rmtInfos").clear();
-            }
-
-            assertEquals(0, localRoutines(proc).size());
-            assertEquals(0, U.<Map>field(proc, "rmtInfos").size());
-            assertEquals(0, U.<Map>field(proc, "startFuts").size());
-            assertEquals(0, U.<Map>field(proc, "waitForStartAck").size());
-            assertEquals(0, U.<Map>field(proc, "stopFuts").size());
-            assertEquals(0, U.<Map>field(proc, "waitForStopAck").size());
-            assertEquals(0, U.<Map>field(proc, "pending").size());
-        }
-    }
-
-    /**
-     * @param proc Continuous processor.
-     * @return Local event routines.
-     */
-    private Collection<LocalRoutineInfo> localRoutines(GridContinuousProcessor proc) {
-        return F.view(U.<Map<UUID, LocalRoutineInfo>>field(proc, "locInfos").values(),
-            new IgnitePredicate<LocalRoutineInfo>() {
-                @Override public boolean apply(LocalRoutineInfo info) {
-                    return info.handler().isForEvents();
-                }
-            });
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testApi() throws Exception {
-        try {
-            grid(0).events().stopRemoteListen(null);
-        }
-        catch (NullPointerException ignored) {
-            // No-op.
-        }
-
-        grid(0).events().stopRemoteListen(UUID.randomUUID());
-
-        UUID consumeId = null;
-
-        try {
-            consumeId = grid(0).events().remoteListen(
-                new P2<UUID, IgniteDiscoveryEvent>() {
-                    @Override public boolean apply(UUID uuid, IgniteDiscoveryEvent evt) {
-                        return false;
-                    }
-                },
-                new P1<IgniteDiscoveryEvent>() {
-                    @Override public boolean apply(IgniteDiscoveryEvent e) {
-                        return false;
-                    }
-                },
-                EVTS_DISCOVERY
-            );
-
-            assertNotNull(consumeId);
-        }
-        finally {
-            grid(0).events().stopRemoteListen(consumeId);
-        }
-
-        try {
-            consumeId = grid(0).events().remoteListen(
-                new P2<UUID, IgniteDiscoveryEvent>() {
-                    @Override public boolean apply(UUID uuid, IgniteDiscoveryEvent evt) {
-                        return false;
-                    }
-                },
-                new P1<IgniteDiscoveryEvent>() {
-                    @Override public boolean apply(IgniteDiscoveryEvent e) {
-                        return false;
-                    }
-                }
-            );
-
-            assertNotNull(consumeId);
-        }
-        finally {
-            grid(0).events().stopRemoteListen(consumeId);
-        }
-
-        try {
-            consumeId = grid(0).events().remoteListen(
-                new P2<UUID, IgniteEvent>() {
-                    @Override public boolean apply(UUID uuid, IgniteEvent evt) {
-                        return false;
-                    }
-                },
-                new P1<IgniteEvent>() {
-                    @Override public boolean apply(IgniteEvent e) {
-                        return false;
-                    }
-                }
-            );
-
-            assertNotNull(consumeId);
-        }
-        finally {
-            grid(0).events().stopRemoteListen(consumeId);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAllEvents() throws Exception {
-        final Collection<UUID> nodeIds = new HashSet<>();
-        final AtomicInteger cnt = new AtomicInteger();
-        final CountDownLatch latch = new CountDownLatch(GRID_CNT);
-
-        UUID consumeId = grid(0).events().remoteListen(
-            new P2<UUID, IgniteEvent>() {
-                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
-                    info("Event from " + nodeId + " [" + evt.shortDisplay() + ']');
-
-                    if (evt.type() == EVT_JOB_STARTED) {
-                        nodeIds.add(nodeId);
-                        cnt.incrementAndGet();
-                        latch.countDown();
-                    }
-
-                    return true;
-                }
-            },
-            null
-        );
-
-        try {
-            assertNotNull(consumeId);
-
-            grid(0).compute().broadcast(F.noop());
-
-            assert latch.await(2, SECONDS);
-
-            assertEquals(GRID_CNT, nodeIds.size());
-            assertEquals(GRID_CNT, cnt.get());
-        }
-        finally {
-            grid(0).events().stopRemoteListen(consumeId);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testEventsByType() throws Exception {
-        final Collection<UUID> nodeIds = new HashSet<>();
-        final AtomicInteger cnt = new AtomicInteger();
-        final CountDownLatch latch = new CountDownLatch(GRID_CNT);
-
-        UUID consumeId = grid(0).events().remoteListen(
-            new P2<UUID, IgniteEvent>() {
-                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
-                    info("Event from " + nodeId + " [" + evt.shortDisplay() + ']');
-
-                    assertEquals(EVT_JOB_STARTED, evt.type());
-
-                    nodeIds.add(nodeId);
-                    cnt.incrementAndGet();
-                    latch.countDown();
-
-                    return true;
-                }
-            },
-            null,
-            EVT_JOB_STARTED
-        );
-
-        try {
-            assertNotNull(consumeId);
-
-            grid(0).compute().broadcast(F.noop());
-
-            assert latch.await(2, SECONDS);
-
-            assertEquals(GRID_CNT, nodeIds.size());
-            assertEquals(GRID_CNT, cnt.get());
-        }
-        finally {
-            grid(0).events().stopRemoteListen(consumeId);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testEventsByFilter() throws Exception {
-        final Collection<UUID> nodeIds = new HashSet<>();
-        final AtomicInteger cnt = new AtomicInteger();
-        final CountDownLatch latch = new CountDownLatch(GRID_CNT);
-
-        UUID consumeId = grid(0).events().remoteListen(
-            new P2<UUID, IgniteEvent>() {
-                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
-                    info("Event from " + nodeId + " [" + evt.shortDisplay() + ']');
-
-                    assertEquals(EVT_JOB_STARTED, evt.type());
-
-                    nodeIds.add(nodeId);
-                    cnt.incrementAndGet();
-                    latch.countDown();
-
-                    return true;
-                }
-            },
-            new P1<IgniteEvent>() {
-                @Override public boolean apply(IgniteEvent evt) {
-                    return evt.type() == EVT_JOB_STARTED;
-                }
-            }
-        );
-
-        try {
-            assertNotNull(consumeId);
-
-            grid(0).compute().broadcast(F.noop());
-
-            assert latch.await(2, SECONDS);
-
-            assertEquals(GRID_CNT, nodeIds.size());
-            assertEquals(GRID_CNT, cnt.get());
-        }
-        finally {
-            grid(0).events().stopRemoteListen(consumeId);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testEventsByTypeAndFilter() throws Exception {
-        final Collection<UUID> nodeIds = new HashSet<>();
-        final AtomicInteger cnt = new AtomicInteger();
-        final CountDownLatch latch = new CountDownLatch(GRID_CNT);
-
-        UUID consumeId = grid(0).events().remoteListen(
-            new P2<UUID, IgniteJobEvent>() {
-                @Override public boolean apply(UUID nodeId, IgniteJobEvent evt) {
-                    info("Event from " + nodeId + " [" + evt.shortDisplay() + ']');
-
-                    assertEquals(EVT_JOB_STARTED, evt.type());
-
-                    nodeIds.add(nodeId);
-                    cnt.incrementAndGet();
-                    latch.countDown();
-
-                    return true;
-                }
-            },
-            new P1<IgniteJobEvent>() {
-                @Override public boolean apply(IgniteJobEvent evt) {
-                    return !"exclude".equals(evt.taskName());
-                }
-            },
-            EVT_JOB_STARTED
-        );
-
-        try {
-            assertNotNull(consumeId);
-
-            grid(0).compute().broadcast(F.noop());
-            grid(0).compute().withName("exclude").run(F.noop());
-
-            assert latch.await(2, SECONDS);
-
-            assertEquals(GRID_CNT, nodeIds.size());
-            assertEquals(GRID_CNT, cnt.get());
-        }
-        finally {
-            grid(0).events().stopRemoteListen(consumeId);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testRemoteProjection() throws Exception {
-        final Collection<UUID> nodeIds = new HashSet<>();
-        final AtomicInteger cnt = new AtomicInteger();
-        final CountDownLatch latch = new CountDownLatch(GRID_CNT - 1);
-
-        UUID consumeId = events(grid(0).forRemotes()).remoteListen(
-            new P2<UUID, IgniteEvent>() {
-                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
-                    info("Event from " + nodeId + " [" + evt.shortDisplay() + ']');
-
-                    assertEquals(EVT_JOB_STARTED, evt.type());
-
-                    nodeIds.add(nodeId);
-                    cnt.incrementAndGet();
-                    latch.countDown();
-
-                    return true;
-                }
-            },
-            null,
-            EVT_JOB_STARTED
-        );
-
-        try {
-            assertNotNull(consumeId);
-
-            grid(0).compute().broadcast(F.noop());
-
-            assert latch.await(2, SECONDS);
-
-            assertEquals(GRID_CNT - 1, nodeIds.size());
-            assertEquals(GRID_CNT - 1, cnt.get());
-        }
-        finally {
-            grid(0).events().stopRemoteListen(consumeId);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testProjectionWithLocalNode() throws Exception {
-        final Collection<UUID> nodeIds = new HashSet<>();
-        final AtomicInteger cnt = new AtomicInteger();
-        final CountDownLatch latch = new CountDownLatch(GRID_CNT - 1);
-
-        UUID consumeId = events(grid(0).forAttribute("include", null)).remoteListen(
-            new P2<UUID, IgniteEvent>() {
-                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
-                    info("Event from " + nodeId + " [" + evt.shortDisplay() + ']');
-
-                    assertEquals(EVT_JOB_STARTED, evt.type());
-
-                    nodeIds.add(nodeId);
-                    cnt.incrementAndGet();
-                    latch.countDown();
-
-                    return true;
-                }
-            },
-            null,
-            EVT_JOB_STARTED
-        );
-
-        try {
-            assertNotNull(consumeId);
-
-            grid(0).compute().broadcast(F.noop());
-
-            assert latch.await(2, SECONDS);
-
-            assertEquals(GRID_CNT - 1, nodeIds.size());
-            assertEquals(GRID_CNT - 1, cnt.get());
-        }
-        finally {
-            grid(0).events().stopRemoteListen(consumeId);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testLocalNodeOnly() throws Exception {
-        final Collection<UUID> nodeIds = new HashSet<>();
-        final AtomicInteger cnt = new AtomicInteger();
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        UUID consumeId = events(grid(0).forLocal()).remoteListen(
-            new P2<UUID, IgniteEvent>() {
-                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
-                    info("Event from " + nodeId + " [" + evt.shortDisplay() + ']');
-
-                    assertEquals(EVT_JOB_STARTED, evt.type());
-
-                    nodeIds.add(nodeId);
-                    cnt.incrementAndGet();
-                    latch.countDown();
-
-                    return true;
-                }
-            },
-            null,
-            EVT_JOB_STARTED
-        );
-
-        try {
-            assertNotNull(consumeId);
-
-            grid(0).compute().broadcast(F.noop());
-
-            assert latch.await(2, SECONDS);
-
-            assertEquals(1, nodeIds.size());
-            assertEquals(1, cnt.get());
-
-            assertEquals(grid(0).localNode().id(), F.first(nodeIds));
-        }
-        finally {
-            grid(0).events().stopRemoteListen(consumeId);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testEmptyProjection() throws Exception {
-        try {
-            events(grid(0).forPredicate(F.<ClusterNode>alwaysFalse())).remoteListen(
-                new P2<UUID, IgniteEvent>() {
-                    @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
-                        return true;
-                    }
-                },
-                null
-            );
-
-            assert false : "Exception was not thrown.";
-        }
-        catch (IgniteCheckedException e) {
-            assertTrue(e.getMessage().startsWith(
-                "Failed to register remote continuous listener (projection is empty)."));
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testStopByCallback() throws Exception {
-        final Collection<UUID> nodeIds = new HashSet<>();
-        final AtomicInteger cnt = new AtomicInteger();
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        UUID consumeId = grid(0).events().remoteListen(
-            new P2<UUID, IgniteEvent>() {
-                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
-                    info("Event from " + nodeId + " [" + evt.shortDisplay() + ']');
-
-                    assertEquals(EVT_JOB_STARTED, evt.type());
-
-                    nodeIds.add(nodeId);
-                    cnt.incrementAndGet();
-                    latch.countDown();
-
-                    return false;
-                }
-            },
-            null,
-            EVT_JOB_STARTED
-        );
-
-        try {
-            assertNotNull(consumeId);
-
-            grid(0).compute().broadcast(F.noop());
-
-            assert latch.await(2, SECONDS);
-
-            assertEquals(1, nodeIds.size());
-            assertEquals(1, cnt.get());
-        }
-        finally {
-            grid(0).events().stopRemoteListen(consumeId);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testStopRemoteListen() throws Exception {
-        final Collection<UUID> nodeIds = new HashSet<>();
-        final AtomicInteger cnt = new AtomicInteger();
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        UUID consumeId = grid(0).events().remoteListen(
-            new P2<UUID, IgniteEvent>() {
-                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
-                    info("Event from " + nodeId + " [" + evt.shortDisplay() + ']');
-
-                    assertEquals(EVT_JOB_STARTED, evt.type());
-
-                    nodeIds.add(nodeId);
-                    cnt.incrementAndGet();
-                    latch.countDown();
-
-                    return true;
-                }
-            },
-            null,
-            EVT_JOB_STARTED
-        );
-
-        try {
-            assertNotNull(consumeId);
-
-            grid(0).compute().run(F.noop());
-
-            assert latch.await(2, SECONDS);
-
-            assertEquals(1, nodeIds.size());
-            assertEquals(1, cnt.get());
-
-            grid(0).events().stopRemoteListen(consumeId);
-
-            grid(0).compute().run(F.noop());
-
-            U.sleep(500);
-
-            assertEquals(1, nodeIds.size());
-            assertEquals(1, cnt.get());
-        }
-        finally {
-            grid(0).events().stopRemoteListen(consumeId);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testStopLocalListenByCallback() throws Exception {
-        final AtomicInteger cnt = new AtomicInteger();
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        grid(0).events().localListen(
-            new P1<IgniteEvent>() {
-                @Override public boolean apply(IgniteEvent evt) {
-                    info("Local event [" + evt.shortDisplay() + ']');
-
-                    assertEquals(EVT_JOB_STARTED, evt.type());
-
-                    cnt.incrementAndGet();
-                    latch.countDown();
-
-                    return false;
-                }
-            },
-            EVT_JOB_STARTED);
-
-        compute(grid(0).forLocal()).run(F.noop());
-
-        assert latch.await(2, SECONDS);
-
-        assertEquals(1, cnt.get());
-
-        compute(grid(0).forLocal()).run(F.noop());
-
-        U.sleep(500);
-
-        assertEquals(1, cnt.get());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testNodeJoin() throws Exception {
-        final Collection<UUID> nodeIds = new HashSet<>();
-        final AtomicInteger cnt = new AtomicInteger();
-        final CountDownLatch latch = new CountDownLatch(GRID_CNT + 1);
-
-        UUID consumeId = grid(0).events().remoteListen(
-            new P2<UUID, IgniteEvent>() {
-                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
-                    info("Event from " + nodeId + " [" + evt.shortDisplay() + ']');
-
-                    assertEquals(EVT_JOB_STARTED, evt.type());
-
-                    nodeIds.add(nodeId);
-                    cnt.incrementAndGet();
-                    latch.countDown();
-
-                    return true;
-                }
-            },
-            new P1<IgniteEvent>() {
-                @Override public boolean apply(IgniteEvent evt) {
-                    return evt.type() == EVT_JOB_STARTED;
-                }
-            },
-            EVT_JOB_STARTED, EVT_JOB_FINISHED
-        );
-
-        try {
-            assertNotNull(consumeId);
-
-            include = true;
-
-            startGrid("anotherGrid");
-
-            grid(0).compute().broadcast(F.noop());
-
-            assert latch.await(2, SECONDS);
-
-            assertEquals(GRID_CNT + 1, nodeIds.size());
-            assertEquals(GRID_CNT + 1, cnt.get());
-        }
-        finally {
-            stopGrid("anotherGrid");
-
-            grid(0).events().stopRemoteListen(consumeId);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testNodeJoinWithProjection() throws Exception {
-        final Collection<UUID> nodeIds = new HashSet<>();
-        final AtomicInteger cnt = new AtomicInteger();
-        final CountDownLatch latch = new CountDownLatch(GRID_CNT);
-
-        UUID consumeId = events(grid(0).forAttribute("include", null)).remoteListen(
-            new P2<UUID, IgniteEvent>() {
-                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
-                    info("Event from " + nodeId + " [" + evt.shortDisplay() + ']');
-
-                    assertEquals(EVT_JOB_STARTED, evt.type());
-
-                    nodeIds.add(nodeId);
-                    cnt.incrementAndGet();
-                    latch.countDown();
-
-                    return true;
-                }
-            },
-            null,
-            EVT_JOB_STARTED
-        );
-
-        try {
-            assertNotNull(consumeId);
-
-            include = true;
-
-            startGrid("anotherGrid1");
-
-            include = false;
-
-            startGrid("anotherGrid2");
-
-            grid(0).compute().broadcast(F.noop());
-
-            assert latch.await(2, SECONDS);
-
-            assertEquals(GRID_CNT, nodeIds.size());
-            assertEquals(GRID_CNT, cnt.get());
-        }
-        finally {
-            stopGrid("anotherGrid1");
-            stopGrid("anotherGrid2");
-
-            grid(0).events().stopRemoteListen(consumeId);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    // TODO: GG-6730
-    public void _testNodeJoinWithP2P() throws Exception {
-        final Collection<UUID> nodeIds = new HashSet<>();
-        final AtomicInteger cnt = new AtomicInteger();
-        final CountDownLatch latch = new CountDownLatch(GRID_CNT + 1);
-
-        ClassLoader ldr = getExternalClassLoader();
-
-        IgnitePredicate<ClusterNode> prjPred = (IgnitePredicate<ClusterNode>)ldr.loadClass(PRJ_PRED_CLS_NAME).newInstance();
-        IgnitePredicate<IgniteEvent> filter = (IgnitePredicate<IgniteEvent>)ldr.loadClass(FILTER_CLS_NAME).newInstance();
-
-        UUID consumeId = events(grid(0).forPredicate(prjPred)).remoteListen(new P2<UUID, IgniteEvent>() {
-            @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
-                info("Event from " + nodeId + " [" + evt.shortDisplay() + ']');
-
-                assertEquals(EVT_JOB_STARTED, evt.type());
-
-                nodeIds.add(nodeId);
-                cnt.incrementAndGet();
-                latch.countDown();
-
-                return true;
-            }
-        }, filter, EVT_JOB_STARTED);
-
-        try {
-            assertNotNull(consumeId);
-
-            startGrid("anotherGrid");
-
-            grid(0).compute().broadcast(F.noop());
-
-            assert latch.await(2, SECONDS);
-
-            assertEquals(GRID_CNT + 1, nodeIds.size());
-            assertEquals(GRID_CNT + 1, cnt.get());
-        }
-        finally {
-            stopGrid("anotherGrid1");
-            stopGrid("anotherGrid2");
-
-            grid(0).events().stopRemoteListen(consumeId);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testResources() throws Exception {
-        final Collection<UUID> nodeIds = new HashSet<>();
-        final AtomicInteger cnt = new AtomicInteger();
-        final CountDownLatch latch = new CountDownLatch(GRID_CNT);
-
-        UUID consumeId = grid(0).events().remoteListen(
-            new P2<UUID, IgniteEvent>() {
-                @IgniteInstanceResource
-                private Ignite grid;
-
-                @Override public boolean apply(UUID nodeId, IgniteEvent evt) {
-                    info("Event from " + nodeId + " [" + evt.shortDisplay() + ']');
-
-                    assertEquals(EVT_JOB_STARTED, evt.type());
-                    assertNotNull(grid);
-
-                    nodeIds.add(nodeId);
-                    cnt.incrementAndGet();
-                    latch.countDown();
-
-                    return true;
-                }
-            },
-            new P1<IgniteEvent>() {
-                @IgniteInstanceResource
-                private Ignite grid;
-
-                @Override public boolean apply(IgniteEvent evt) {
-                    assertNotNull(grid);
-
-                    return true;
-                }
-            },
-            EVT_JOB_STARTED
-        );
-
-        try {
-            assertNotNull(consumeId);
-
-            grid(0).compute().broadcast(F.noop());
-
-            assert latch.await(2, SECONDS);
-
-            assertEquals(GRID_CNT, nodeIds.size());
-            assertEquals(GRID_CNT, cnt.get());
-        }
-        finally {
-            grid(0).events().stopRemoteListen(consumeId);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testMasterNodeLeave() throws Exception {
-        Ignite g = startGrid("anotherGrid");
-
-        final UUID nodeId = g.cluster().localNode().id();
-        final CountDownLatch latch = new CountDownLatch(GRID_CNT);
-
-        for (int i = 0; i < GRID_CNT; i++) {
-            grid(0).events().localListen(new IgnitePredicate<IgniteEvent>() {
-                @Override public boolean apply(IgniteEvent evt) {
-                    if (nodeId.equals(((IgniteDiscoveryEvent) evt).eventNode().id()))
-                        latch.countDown();
-
-                    return true;
-                }
-            }, EVT_NODE_LEFT);
-        }
-
-        g.events().remoteListen(
-            null,
-            new P1<IgniteEvent>() {
-                @Override public boolean apply(IgniteEvent evt) {
-                    return true;
-                }
-            },
-            EVTS_ALL
-        );
-
-        stopGrid("anotherGrid");
-
-        latch.await();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testMasterNodeLeaveNoAutoUnsubscribe() throws Exception {
-        Ignite g = startGrid("anotherGrid");
-
-        final UUID nodeId = g.cluster().localNode().id();
-        final CountDownLatch discoLatch = new CountDownLatch(GRID_CNT);
-
-        for (int i = 0; i < GRID_CNT; i++) {
-            grid(0).events().localListen(new IgnitePredicate<IgniteEvent>() {
-                @Override public boolean apply(IgniteEvent evt) {
-                    if (nodeId.equals(((IgniteDiscoveryEvent) evt).eventNode().id()))
-                        discoLatch.countDown();
-
-                    return true;
-                }
-            }, EVT_NODE_LEFT);
-        }
-
-        consumeLatch = new CountDownLatch(GRID_CNT * 2 + 1);
-        consumeCnt = new AtomicInteger();
-
-        noAutoUnsubscribe = true;
-
-        g.events().remoteListen(
-            1, 0, false,
-            null,
-            new P1<IgniteEvent>() {
-                @Override public boolean apply(IgniteEvent evt) {
-                    consumeLatch.countDown();
-                    consumeCnt.incrementAndGet();
-
-                    return true;
-                }
-            },
-            EVT_JOB_STARTED
-        );
-
-        grid(0).compute().broadcast(F.noop());
-
-        stopGrid("anotherGrid");
-
-        discoLatch.await();
-
-        grid(0).compute().broadcast(F.noop());
-
-        assert consumeLatch.await(2, SECONDS);
-
-        assertEquals(GRID_CNT * 2 + 1, consumeCnt.get());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testMultithreadedWithNodeRestart() throws Exception {
-        final AtomicBoolean stop = new AtomicBoolean();
-        final BlockingQueue<IgniteBiTuple<Integer, UUID>> queue = new LinkedBlockingQueue<>();
-        final Collection<UUID> started = new GridConcurrentHashSet<>();
-        final Collection<UUID> stopped = new GridConcurrentHashSet<>();
-
-        final Random rnd = new Random();
-
-        IgniteFuture<?> starterFut = multithreadedAsync(new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                for (int i = 0; i < CONSUME_CNT; i++) {
-                    int idx = rnd.nextInt(GRID_CNT);
-
-                    try {
-                        IgniteEvents evts = grid(idx).events().enableAsync();
-
-                        evts.remoteListen(new P2<UUID, IgniteEvent>() {
-                            @Override public boolean apply(UUID uuid, IgniteEvent evt) {
-                                return true;
-                            }
-                        }, null, EVT_JOB_STARTED);
-
-                        UUID consumeId = evts.<UUID>future().get(3000);
-
-                        started.add(consumeId);
-
-                        queue.add(F.t(idx, consumeId));
-                    }
-                    catch (ClusterTopologyException ignored) {
-                        // No-op.
-                    }
-
-                    U.sleep(10);
-                }
-
-                stop.set(true);
-
-                return null;
-            }
-        }, 8, "consume-starter");
-
-        IgniteFuture<?> stopperFut = multithreadedAsync(new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                while (!stop.get()) {
-                    IgniteBiTuple<Integer, UUID> t = queue.poll(1, SECONDS);
-
-                    if (t == null)
-                        continue;
-
-                    int idx = t.get1();
-                    UUID consumeId = t.get2();
-
-                    try {
-                        IgniteEvents evts = grid(idx).events().enableAsync();
-
-                        evts.stopRemoteListen(consumeId);
-
-                        evts.future().get(3000);
-
-                        stopped.add(consumeId);
-                    }
-                    catch (ClusterTopologyException ignored) {
-                        // No-op.
-                    }
-                }
-
-                return null;
-            }
-        }, 4, "consume-stopper");
-
-        IgniteFuture<?> nodeRestarterFut = multithreadedAsync(new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                while (!stop.get()) {
-                    startGrid("anotherGrid");
-                    stopGrid("anotherGrid");
-                }
-
-                return null;
-            }
-        }, 1, "node-restarter");
-
-        IgniteFuture<?> jobRunnerFut = multithreadedAsync(new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                while (!stop.get()) {
-                    int idx = rnd.nextInt(GRID_CNT);
-
-                    try {
-                        IgniteCompute comp = grid(idx).compute().enableAsync();
-
-                        comp.run(F.noop());
-
-                        comp.future().get(3000);
-                    }
-                    catch (IgniteCheckedException ignored) {
-                        // Ignore all job execution related errors.
-                    }
-                }
-
-                return null;
-            }
-        }, 1, "job-runner");
-
-        starterFut.get();
-        stopperFut.get();
-        nodeRestarterFut.get();
-        jobRunnerFut.get();
-
-        IgniteBiTuple<Integer, UUID> t;
-
-        while ((t = queue.poll()) != null) {
-            int idx = t.get1();
-            UUID consumeId = t.get2();
-
-            IgniteEvents evts = grid(idx).events().enableAsync();
-
-            evts.stopRemoteListen(consumeId);
-
-            evts.future().get(3000);
-
-            stopped.add(consumeId);
-        }
-
-        Collection<UUID> notStopped = F.lose(started, true, stopped);
-
-        assertEquals("Not stopped IDs: " + notStopped, 0, notStopped.size());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/continuous/GridMessageListenSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/continuous/GridMessageListenSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/continuous/GridMessageListenSelfTest.java
deleted file mode 100644
index 5b986e1..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/continuous/GridMessageListenSelfTest.java
+++ /dev/null
@@ -1,489 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.continuous;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.messaging.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.gridgain.testframework.*;
-import org.gridgain.testframework.junits.common.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static java.util.concurrent.TimeUnit.*;
-
-/**
- * Message listen test.
- */
-public class GridMessageListenSelfTest extends GridCommonAbstractTest {
-    /** */
-    private static final int GRID_CNT = 3;
-
-    /** */
-    private static final String INC_ATTR = "include";
-
-    /** */
-    private static final String MSG = "Message";
-
-    /** */
-    private static final String TOPIC = "Topic";
-
-    /** */
-    private static final int MSG_CNT = 3;
-
-    /** */
-    private static final String TOPIC_CLS_NAME = "org.gridgain.grid.tests.p2p.GridTestMessageTopic";
-
-    /** */
-    private static final String LSNR_CLS_NAME = "org.gridgain.grid.tests.p2p.GridTestMessageListener";
-
-    /** */
-    private static boolean include;
-
-    /** */
-    private static final List<UUID> allNodes = new ArrayList<>();
-
-    /** */
-    private static final List<UUID> rmtNodes = new ArrayList<>();
-
-    /** */
-    private static final List<UUID> incNodes = new ArrayList<>();
-
-    /** */
-    private static final Collection<UUID> nodes = new GridConcurrentHashSet<>();
-
-    /** */
-    private static final AtomicInteger cnt = new AtomicInteger();
-
-    /** */
-    private static CountDownLatch latch;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        if (include)
-            cfg.setUserAttributes(F.asMap(INC_ATTR, true));
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        nodes.clear();
-        cnt.set(0);
-
-        include = true;
-
-        startGridsMultiThreaded(GRID_CNT - 1);
-
-        include = false;
-
-        Thread.sleep(500);
-
-        startGrid(GRID_CNT - 1);
-
-        allNodes.clear();
-        rmtNodes.clear();
-        incNodes.clear();
-
-        for (int i = 0; i < GRID_CNT; i++) {
-            UUID id = grid(i).localNode().id();
-
-            allNodes.add(id);
-
-            if (i != 0)
-                rmtNodes.add(id);
-
-            if (i != GRID_CNT - 1)
-                incNodes.add(id);
-        }
-
-        Collections.sort(allNodes);
-        Collections.sort(rmtNodes);
-        Collections.sort(incNodes);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testNullTopic() throws Exception {
-        latch = new CountDownLatch(MSG_CNT * GRID_CNT);
-
-        listen(grid(0), null, true);
-
-        send();
-
-        assert latch.await(2, SECONDS);
-
-        Thread.sleep(500);
-
-        assertEquals(MSG_CNT * GRID_CNT, cnt.get());
-
-        checkNodes(allNodes);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testNonNullTopic() throws Exception {
-        latch = new CountDownLatch(MSG_CNT * GRID_CNT);
-
-        listen(grid(0), null, true);
-
-        send();
-
-        assert latch.await(2, SECONDS);
-
-        Thread.sleep(500);
-
-        assertEquals(MSG_CNT * GRID_CNT, cnt.get());
-
-        checkNodes(allNodes);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testStopListen() throws Exception {
-        latch = new CountDownLatch(GRID_CNT);
-
-        listen(grid(0), null, false);
-
-        send();
-
-        assert latch.await(2, SECONDS);
-
-        Thread.sleep(500);
-
-        int expCnt = cnt.get();
-
-        send();
-
-        Thread.sleep(1000);
-
-        assertEquals(expCnt, cnt.get());
-
-        checkNodes(allNodes);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testProjection() throws Exception {
-        latch = new CountDownLatch(MSG_CNT * (GRID_CNT - 1));
-
-        listen(grid(0).forRemotes(), null, true);
-
-        send();
-
-        assert latch.await(2, SECONDS);
-
-        Thread.sleep(500);
-
-        assertEquals(MSG_CNT * (GRID_CNT - 1), cnt.get());
-
-        checkNodes(rmtNodes);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testNodeJoin() throws Exception {
-        latch = new CountDownLatch(MSG_CNT * (GRID_CNT + 1));
-
-        listen(grid(0), null, true);
-
-        try {
-            Ignite g = startGrid("anotherGrid");
-
-            send();
-
-            assert latch.await(2, SECONDS);
-
-            Thread.sleep(500);
-
-            assertEquals(MSG_CNT * (GRID_CNT + 1), cnt.get());
-
-            List<UUID> allNodes0 = new ArrayList<>(allNodes);
-
-            allNodes0.add(g.cluster().localNode().id());
-
-            Collections.sort(allNodes0);
-
-            checkNodes(allNodes0);
-        }
-        finally {
-            stopGrid("anotherGrid");
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testNodeJoinWithProjection() throws Exception {
-        latch = new CountDownLatch(MSG_CNT * GRID_CNT);
-
-        listen(grid(0).forAttribute(INC_ATTR, null), null, true);
-
-        try {
-            include = true;
-
-            Ignite g = startGrid("anotherGrid1");
-
-            include = false;
-
-            startGrid("anotherGrid2");
-
-            send();
-
-            assert latch.await(2, SECONDS);
-
-            Thread.sleep(500);
-
-            assertEquals(MSG_CNT * GRID_CNT, cnt.get());
-
-            List<UUID> incNodes0 = new ArrayList<>(incNodes);
-
-            incNodes0.add(g.cluster().localNode().id());
-
-            Collections.sort(incNodes0);
-
-            checkNodes(incNodes0);
-        }
-        finally {
-            stopGrid("anotherGrid1");
-            stopGrid("anotherGrid2");
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testNullTopicWithDeployment() throws Exception {
-        Class<?> cls = getExternalClassLoader().loadClass(LSNR_CLS_NAME);
-
-        grid(0).message().remoteListen(null, (IgniteBiPredicate<UUID, Object>)cls.newInstance());
-
-        send();
-
-        boolean s = GridTestUtils.waitForCondition(new PA() {
-            @Override public boolean apply() {
-                return checkDeployedListeners(GRID_CNT);
-            }
-        }, 2000);
-
-        assertTrue(s);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testNonNullTopicWithDeployment() throws Exception {
-        ClassLoader ldr = getExternalClassLoader();
-
-        Class<?> topicCls = ldr.loadClass(TOPIC_CLS_NAME);
-        Class<?> lsnrCls = ldr.loadClass(LSNR_CLS_NAME);
-
-        Object topic = topicCls.newInstance();
-
-        grid(0).message().remoteListen(topic, (IgniteBiPredicate<UUID, Object>)lsnrCls.newInstance());
-
-        send(topic);
-
-        boolean s = GridTestUtils.waitForCondition(new PA() {
-            @Override public boolean apply() {
-                return checkDeployedListeners(GRID_CNT);
-            }
-        }, 2000);
-
-        assertTrue(s);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testListenActor() throws Exception {
-        latch = new CountDownLatch(MSG_CNT * (GRID_CNT + 1));
-
-        grid(0).message().remoteListen(null, new Actor(grid(0)));
-
-        try {
-            Ignite g = startGrid("anotherGrid");
-
-            send();
-
-            assert latch.await(2, SECONDS);
-
-            Thread.sleep(500);
-
-            assertEquals(MSG_CNT * (GRID_CNT + 1), cnt.get());
-
-            List<UUID> allNodes0 = new ArrayList<>(allNodes);
-
-            allNodes0.add(g.cluster().localNode().id());
-
-            Collections.sort(allNodes0);
-
-            checkNodes(allNodes0);
-        }
-        finally {
-            stopGrid("anotherGrid");
-        }
-    }
-
-    /**
-     * @param prj Projection.
-     * @param topic Topic.
-     * @param ret Value returned from listener.
-     * @throws Exception In case of error.
-     */
-    private void listen(final ClusterGroup prj, @Nullable Object topic, final boolean ret) throws Exception {
-        assert prj != null;
-
-        message(prj).remoteListen(topic, new Listener(prj, ret));
-    }
-
-    /**
-     * @throws Exception In case of error.
-     */
-    private void send() throws Exception {
-        send(TOPIC);
-    }
-
-    /**
-     * @param topic Non-null topic.
-     * @throws Exception In case of error.
-     */
-    private void send(Object topic) throws Exception {
-        assert topic != null;
-
-        for (int i = 0; i < MSG_CNT; i++)
-            grid(0).message().send(null, MSG);
-
-        for (int i = 0; i < MSG_CNT; i++)
-            grid(0).message().send(topic, MSG);
-    }
-
-    /**
-     * @param expCnt Expected messages count.
-     * @return If check passed.
-     */
-    private boolean checkDeployedListeners(int expCnt) {
-        for (Ignite g : G.allGrids()) {
-            AtomicInteger cnt = g.cluster().<String, AtomicInteger>nodeLocalMap().get("msgCnt");
-
-            if (cnt == null || cnt.get() != expCnt)
-                return false;
-        }
-
-        return true;
-    }
-
-    /**
-     * @param expNodes Expected nodes.
-     */
-    private void checkNodes(List<UUID> expNodes) {
-        List<UUID> nodes0 = new ArrayList<>(nodes);
-
-        Collections.sort(nodes0);
-
-        assertEquals(expNodes, nodes0);
-    }
-
-    /** */
-    private static class Listener implements P2<UUID, Object> {
-        /** */
-        private final ClusterGroup prj;
-
-        /** */
-        private final boolean ret;
-
-        /** */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
-        /**
-         * @param prj Projection.
-         * @param ret Return value.
-         */
-        private Listener(ClusterGroup prj, boolean ret) {
-            this.prj = prj;
-            this.ret = ret;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean apply(UUID nodeId, Object msg) {
-            assertNotNull(ignite);
-            assertNotNull(ignite.configuration().getNodeId());
-
-            X.println("Received message [nodeId=" + nodeId + ", locNodeId=" + ignite.cluster().localNode().id() + ']');
-
-            assertEquals(prj.ignite().cluster().localNode().id(), nodeId);
-            assertEquals(MSG, msg);
-
-            nodes.add(ignite.configuration().getNodeId());
-            cnt.incrementAndGet();
-            latch.countDown();
-
-            return ret;
-        }
-    }
-
-    /** */
-    private static class Actor extends MessagingListenActor<Object> {
-        /** */
-        private final ClusterGroup prj;
-
-        /**
-         * @param prj Projection.
-         */
-        private Actor(ClusterGroup prj) {
-            this.prj = prj;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void receive(UUID nodeId, Object msg) throws Throwable {
-            assertNotNull(ignite());
-
-            UUID locNodeId = ignite().cluster().localNode().id();
-
-            X.println("Received message [nodeId=" + nodeId + ", locNodeId=" + locNodeId + ']');
-
-            assertEquals(prj.ignite().cluster().localNode().id(), nodeId);
-            assertEquals(MSG, msg);
-
-            nodes.add(locNodeId);
-            cnt.incrementAndGet();
-            latch.countDown();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderImplSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderImplSelfTest.java
deleted file mode 100644
index 0f6576b..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderImplSelfTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.marshaller.*;
-import org.apache.ignite.marshaller.optimized.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.gridgain.testframework.junits.common.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.cache.GridCacheMode.*;
-import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
-
-/**
- * Tests for {@code GridDataLoaderImpl}.
- */
-public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest {
-    /** IP finder. */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** Number of keys to load via data loader. */
-    private static final int KEYS_COUNT = 1000;
-
-    /** Started grid counter. */
-    private static int cnt;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-        discoSpi.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(discoSpi);
-
-        // Forth node goes without cache.
-        if (cnt < 4)
-            cfg.setCacheConfiguration(cacheConfiguration());
-
-        cnt++;
-
-        return cfg;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testNullPointerExceptionUponDataLoaderClosing() throws Exception {
-        try {
-            startGrids(5);
-
-            final CyclicBarrier barrier = new CyclicBarrier(2);
-
-            multithreadedAsync(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    U.awaitQuiet(barrier);
-
-                    G.stopAll(true);
-
-                    return null;
-                }
-            }, 1);
-
-            Ignite g4 = grid(4);
-
-            IgniteDataLoader<Object, Object> dataLdr = g4.dataLoader(null);
-
-            dataLdr.perNodeBufferSize(32);
-
-            for (int i = 0; i < 100000; i += 2) {
-                dataLdr.addData(i, i);
-                dataLdr.removeData(i + 1);
-            }
-
-            U.awaitQuiet(barrier);
-
-            info("Closing data loader.");
-
-            try {
-                dataLdr.close(true);
-            }
-            catch (IllegalStateException ignore) {
-                // This is ok to ignore this exception as test is racy by it's nature -
-                // grid is stopping in different thread.
-            }
-        }
-        finally {
-            G.stopAll(true);
-        }
-    }
-
-    /**
-     * Data loader should correctly load entries from HashMap in case of grids with more than one node
-     *  and with GridOptimizedMarshaller that requires serializable.
-     *
-     * @throws Exception If failed.
-     */
-    public void testAddDataFromMap() throws Exception {
-        try {
-            cnt = 0;
-
-            startGrids(2);
-
-            Ignite g0 = grid(0);
-
-            IgniteMarshaller marsh = g0.configuration().getMarshaller();
-
-            if (marsh instanceof IgniteOptimizedMarshaller)
-                assertTrue(((IgniteOptimizedMarshaller)marsh).isRequireSerializable());
-            else
-                fail("Expected GridOptimizedMarshaller, but found: " + marsh.getClass().getName());
-
-            IgniteDataLoader<Integer, String> dataLdr = g0.dataLoader(null);
-
-            Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
-
-            for (int i = 0; i < KEYS_COUNT; i ++)
-                map.put(i, String.valueOf(i));
-
-            dataLdr.addData(map);
-
-            dataLdr.close();
-
-            Random rnd = new Random();
-
-            GridCache<Integer, String> c = g0.cache(null);
-
-            for (int i = 0; i < KEYS_COUNT; i ++) {
-                Integer k = rnd.nextInt(KEYS_COUNT);
-
-                String v = c.get(k);
-
-                assertEquals(k.toString(), v);
-            }
-        }
-        finally {
-            G.stopAll(true);
-        }
-    }
-
-    /**
-     * Gets cache configuration.
-     *
-     * @return Cache configuration.
-     */
-    private CacheConfiguration cacheConfiguration() {
-        CacheConfiguration cacheCfg = defaultCacheConfiguration();
-
-        cacheCfg.setCacheMode(PARTITIONED);
-        cacheCfg.setBackups(1);
-        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
-
-        return cacheCfg;
-    }
-
-    /**
-     *
-     */
-    private static class TestObject implements Serializable {
-        /** */
-        private int val;
-
-        /**
-         */
-        private TestObject() {
-            // No-op.
-        }
-
-        /**
-         * @param val Value.
-         */
-        private TestObject(int val) {
-            this.val = val;
-        }
-
-        public Integer val() {
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            return obj instanceof TestObject && ((TestObject)obj).val == val;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderPerformanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderPerformanceTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderPerformanceTest.java
deleted file mode 100644
index 57a2e2e..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderPerformanceTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.gridgain.testframework.junits.common.*;
-import org.jdk8.backport.*;
-
-import java.util.concurrent.*;
-
-import static org.apache.ignite.events.IgniteEventType.*;
-import static org.apache.ignite.cache.GridCacheMode.*;
-import static org.apache.ignite.cache.GridCacheDistributionMode.*;
-import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
-
-/**
- * Data loader performance test. Compares group lock data loader to traditional lock.
- * <p>
- * Disable assertions and give at least 2 GB heap to run this test.
- */
-public class GridDataLoaderPerformanceTest extends GridCommonAbstractTest {
-    /** */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private static final int GRID_CNT = 3;
-
-    /** */
-    private static final int ENTRY_CNT = 80000;
-
-    /** */
-    private boolean useCache;
-
-    /** */
-    private boolean useGrpLock;
-
-    /** */
-    private String[] vals = new String[2048];
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi spi = new TcpDiscoverySpi();
-
-        spi.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(spi);
-
-        cfg.setIncludeProperties();
-
-        cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
-
-        cfg.setRestEnabled(false);
-
-        cfg.setPeerClassLoadingEnabled(true);
-
-        if (useCache) {
-            CacheConfiguration cc = defaultCacheConfiguration();
-
-            cc.setCacheMode(PARTITIONED);
-
-            cc.setDistributionMode(PARTITIONED_ONLY);
-            cc.setWriteSynchronizationMode(FULL_SYNC);
-            cc.setStartSize(ENTRY_CNT / GRID_CNT);
-            cc.setSwapEnabled(false);
-
-            cc.setBackups(1);
-
-            cc.setStoreValueBytes(true);
-
-            cfg.setCacheSanityCheckEnabled(false);
-            cfg.setCacheConfiguration(cc);
-        }
-        else
-            cfg.setCacheConfiguration();
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        for (int i = 0; i < vals.length; i++) {
-            int valLen = ThreadLocalRandom8.current().nextInt(128, 512);
-
-            StringBuilder sb = new StringBuilder();
-
-            for (int j = 0; j < valLen; j++)
-                sb.append('a' + ThreadLocalRandom8.current().nextInt(20));
-
-            vals[i] = sb.toString();
-
-            info("Value: " + vals[i]);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPerformance() throws Exception {
-        useGrpLock = false;
-
-        doTest();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPerformanceGroupLock() throws Exception {
-        useGrpLock = true;
-
-        doTest();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void doTest() throws Exception {
-        System.gc();
-        System.gc();
-        System.gc();
-
-        try {
-            useCache = true;
-
-            startGridsMultiThreaded(GRID_CNT);
-
-            useCache = false;
-
-            Ignite ignite = startGrid();
-
-            final IgniteDataLoader<Integer, String> ldr = ignite.dataLoader(null);
-
-            ldr.perNodeBufferSize(8192);
-            ldr.updater(useGrpLock ? GridDataLoadCacheUpdaters.<Integer, String>groupLocked() :
-                GridDataLoadCacheUpdaters.<Integer, String>batchedSorted());
-            ldr.autoFlushFrequency(0);
-
-            final LongAdder cnt = new LongAdder();
-
-            long start = U.currentTimeMillis();
-
-            Thread t = new Thread(new Runnable() {
-                @SuppressWarnings("BusyWait")
-                @Override public void run() {
-                    while (true) {
-                        try {
-                            Thread.sleep(10000);
-                        }
-                        catch (InterruptedException ignored) {
-                            break;
-                        }
-
-                        info(">>> Adds/sec: " + cnt.sumThenReset() / 10);
-                    }
-                }
-            });
-
-            t.setDaemon(true);
-
-            t.start();
-
-            int threadNum = 2;//Runtime.getRuntime().availableProcessors();
-
-            multithreaded(new Callable<Object>() {
-                @SuppressWarnings("InfiniteLoopStatement")
-                @Override public Object call() throws Exception {
-                    ThreadLocalRandom8 rnd = ThreadLocalRandom8.current();
-
-                    while (true) {
-                        int i = rnd.nextInt(ENTRY_CNT);
-
-                        ldr.addData(i, vals[rnd.nextInt(vals.length)]);
-
-                        cnt.increment();
-                    }
-                }
-            }, threadNum, "loader");
-
-            info("Closing loader...");
-
-            ldr.close(false);
-
-            long duration = U.currentTimeMillis() - start;
-
-            info("Finished performance test. Duration: " + duration + "ms.");
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java
deleted file mode 100644
index 19b8ee3..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessorSelfTest.java
+++ /dev/null
@@ -1,883 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.eviction.fifo.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.optimized.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.gridgain.testframework.junits.common.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.*;
-import javax.cache.configuration.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
-import static org.apache.ignite.cache.GridCacheDistributionMode.*;
-import static org.apache.ignite.cache.GridCacheMode.*;
-import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
-import static org.apache.ignite.events.IgniteEventType.*;
-
-/**
- *
- */
-public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
-    /** */
-    private static ConcurrentHashMap<Object, Object> storeMap;
-
-    /** */
-    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private GridCacheMode mode = PARTITIONED;
-
-    /** */
-    private boolean nearEnabled = true;
-
-    /** */
-    private boolean useCache;
-
-    /** */
-    private boolean useGrpLock;
-
-    /** */
-    private TestStore store;
-
-    /** {@inheritDoc} */
-    @Override public void afterTest() throws Exception {
-        super.afterTest();
-
-        useCache = false;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"IfMayBeConditional", "unchecked"})
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi spi = new TcpDiscoverySpi();
-
-        spi.setIpFinder(ipFinder);
-
-        cfg.setDiscoverySpi(spi);
-
-        cfg.setIncludeProperties();
-
-        cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
-
-        if (useCache) {
-            CacheConfiguration cc = defaultCacheConfiguration();
-
-            cc.setCacheMode(mode);
-            cc.setAtomicityMode(TRANSACTIONAL);
-            cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY);
-            cc.setWriteSynchronizationMode(FULL_SYNC);
-
-            cc.setEvictionPolicy(new GridCacheFifoEvictionPolicy(10000));
-
-            cc.setEvictSynchronized(false);
-            cc.setEvictNearSynchronized(false);
-
-            if (store != null) {
-                cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
-                cc.setReadThrough(true);
-                cc.setWriteThrough(true);
-            }
-
-            cfg.setCacheConfiguration(cc);
-        }
-        else
-            cfg.setCacheConfiguration();
-
-        return cfg;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPartitioned() throws Exception {
-        mode = PARTITIONED;
-
-        checkDataLoader();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testColocated() throws Exception {
-        mode = PARTITIONED;
-        nearEnabled = false;
-
-        checkDataLoader();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPartitionedGroupLock() throws Exception {
-        mode = PARTITIONED;
-        useGrpLock = true;
-
-        checkDataLoader();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testReplicated() throws Exception {
-        mode = REPLICATED;
-
-        checkDataLoader();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testReplicatedGroupLock() throws Exception {
-        mode = REPLICATED;
-        useGrpLock = true;
-
-        checkDataLoader();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testLocal() throws Exception {
-        mode = LOCAL;
-
-        try {
-            checkDataLoader();
-
-            assert false;
-        }
-        catch (IgniteCheckedException e) {
-            // Cannot load local cache configured remotely.
-            info("Caught expected exception: " + e);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @SuppressWarnings("ErrorNotRethrown")
-    private void checkDataLoader() throws Exception {
-        try {
-            Ignite g1 = startGrid(1);
-
-            useCache = true;
-
-            Ignite g2 = startGrid(2);
-            Ignite g3 = startGrid(3);
-
-            final IgniteDataLoader<Integer, Integer> ldr = g1.dataLoader(null);
-
-            ldr.updater(useGrpLock ? GridDataLoadCacheUpdaters.<Integer, Integer>groupLocked() :
-                GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted());
-
-            final AtomicInteger idxGen = new AtomicInteger();
-            final int cnt = 400;
-            final int threads = 10;
-
-            final CountDownLatch l1 = new CountDownLatch(threads);
-
-            IgniteFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
-
-                    for (int i = 0; i < cnt; i++) {
-                        int idx = idxGen.getAndIncrement();
-
-                        futs.add(ldr.addData(idx, idx));
-                    }
-
-                    l1.countDown();
-
-                    for (IgniteFuture<?> fut : futs)
-                        fut.get();
-
-                    return null;
-                }
-            }, threads);
-
-            l1.await();
-
-            // This will wait until data loader finishes loading.
-            stopGrid(getTestGridName(1), false);
-
-            f1.get();
-
-            int s2 = g2.cache(null).primaryKeySet().size();
-            int s3 = g3.cache(null).primaryKeySet().size();
-            int total = threads * cnt;
-
-            assertEquals(total, s2 + s3);
-
-            final IgniteDataLoader<Integer, Integer> rmvLdr = g2.dataLoader(null);
-
-            rmvLdr.updater(useGrpLock ? GridDataLoadCacheUpdaters.<Integer, Integer>groupLocked() :
-                GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted());
-
-            final CountDownLatch l2 = new CountDownLatch(threads);
-
-            IgniteFuture<?> f2 = multithreadedAsync(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
-
-                    for (int i = 0; i < cnt; i++) {
-                        final int key = idxGen.decrementAndGet();
-
-                        futs.add(rmvLdr.removeData(key));
-                    }
-
-                    l2.countDown();
-
-                    for (IgniteFuture<?> fut : futs)
-                        fut.get();
-
-                    return null;
-                }
-            }, threads);
-
-            l2.await();
-
-            rmvLdr.close(false);
-
-            f2.get();
-
-            s2 = g2.cache(null).primaryKeySet().size();
-            s3 = g3.cache(null).primaryKeySet().size();
-
-            assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + ", s3=" + s3 + ']';
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * Test primitive arrays can be passed into data loader.
-     *
-     * @throws Exception If failed.
-     */
-    public void testPrimitiveArrays() throws Exception {
-        try {
-            useCache = true;
-            mode = PARTITIONED;
-
-            Ignite g1 = startGrid(1);
-            startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used).
-
-            List<Object> arrays = Arrays.<Object>asList(
-                new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4},
-                new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8});
-
-            IgniteDataLoader<Object, Object> dataLdr = g1.dataLoader(null);
-
-            for (int i = 0, size = arrays.size(); i < 1000; i++) {
-                Object arr = arrays.get(i % size);
-
-                dataLdr.addData(i, arr);
-                dataLdr.addData(i, fixedClosure(arr));
-            }
-
-            dataLdr.close(false);
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testReplicatedMultiThreaded() throws Exception {
-        mode = REPLICATED;
-
-        checkLoaderMultithreaded(1, 2);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testReplicatedMultiThreadedGroupLock() throws Exception {
-        mode = REPLICATED;
-        useGrpLock = true;
-
-        checkLoaderMultithreaded(1, 2);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPartitionedMultiThreaded() throws Exception {
-        mode = PARTITIONED;
-
-        checkLoaderMultithreaded(1, 3);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPartitionedMultiThreadedGroupLock() throws Exception {
-        mode = PARTITIONED;
-        useGrpLock = true;
-
-        checkLoaderMultithreaded(1, 3);
-    }
-
-    /**
-     * Tests loader in multithreaded environment with various count of grids started.
-     *
-     * @param nodesCntNoCache How many nodes should be started without cache.
-     * @param nodesCntCache How many nodes should be started with cache.
-     * @throws Exception If failed.
-     */
-    protected void checkLoaderMultithreaded(int nodesCntNoCache, int nodesCntCache)
-        throws Exception {
-        try {
-            // Start all required nodes.
-            int idx = 1;
-
-            for (int i = 0; i < nodesCntNoCache; i++)
-                startGrid(idx++);
-
-            useCache = true;
-
-            for (int i = 0; i < nodesCntCache; i++)
-                startGrid(idx++);
-
-            Ignite g1 = grid(1);
-
-            // Get and configure loader.
-            final IgniteDataLoader<Integer, Integer> ldr = g1.dataLoader(null);
-
-            ldr.updater(useGrpLock ? GridDataLoadCacheUpdaters.<Integer, Integer>groupLocked() :
-                GridDataLoadCacheUpdaters.<Integer, Integer>individual());
-            ldr.perNodeBufferSize(2);
-
-            // Define count of puts.
-            final AtomicInteger idxGen = new AtomicInteger();
-
-            final AtomicBoolean done = new AtomicBoolean();
-
-            try {
-                final int totalPutCnt = 50000;
-
-                IgniteFuture<?> fut1 = multithreadedAsync(new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        Collection<IgniteFuture<?>> futs = new ArrayList<>();
-
-                        while (!done.get()) {
-                            int idx = idxGen.getAndIncrement();
-
-                            if (idx >= totalPutCnt) {
-                                info(">>> Stopping producer thread since maximum count of puts reached.");
-
-                                break;
-                            }
-
-                            futs.add(ldr.addData(idx, idx));
-                        }
-
-                        ldr.flush();
-
-                        for (IgniteFuture<?> fut : futs)
-                            fut.get();
-
-                        return null;
-                    }
-                }, 5, "producer");
-
-                IgniteFuture<?> fut2 = multithreadedAsync(new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        while (!done.get()) {
-                            ldr.flush();
-
-                            U.sleep(100);
-                        }
-
-                        return null;
-                    }
-                }, 1, "flusher");
-
-                // Define index of node being restarted.
-                final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1;
-
-                IgniteFuture<?> fut3 = multithreadedAsync(new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        try {
-                            for (int i = 0; i < 5; i++) {
-                                Ignite g = startGrid(restartNodeIdx);
-
-                                UUID id = g.cluster().localNode().id();
-
-                                info(">>>>>>> Started node: " + id);
-
-                                U.sleep(1000);
-
-                                stopGrid(getTestGridName(restartNodeIdx), true);
-
-                                info(">>>>>>> Stopped node: " + id);
-                            }
-                        }
-                        finally {
-                            done.set(true);
-
-                            info("Start stop thread finished.");
-                        }
-
-                        return null;
-                    }
-                }, 1, "start-stop-thread");
-
-                fut1.get();
-                fut2.get();
-                fut3.get();
-            }
-            finally {
-                ldr.close(false);
-            }
-
-            info("Cache size on second grid: " + grid(nodesCntNoCache + 1).cache(null).primaryKeySet().size());
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testLoaderApi() throws Exception {
-        useCache = true;
-
-        try {
-            Ignite g1 = startGrid(1);
-
-            IgniteDataLoader<Object, Object> ldr = g1.dataLoader(null);
-
-            ldr.close(false);
-
-            try {
-                ldr.addData(0, 0);
-
-                assert false;
-            }
-            catch (IllegalStateException e) {
-                info("Caught expected exception: " + e);
-            }
-
-            assert ldr.future().isDone();
-
-            ldr.future().get();
-
-            try {
-                // Create another loader.
-                ldr = g1.dataLoader("UNKNOWN_CACHE");
-
-                assert false;
-            }
-            catch (IllegalStateException e) {
-                info("Caught expected exception: " + e);
-            }
-
-            ldr.close(true);
-
-            assert ldr.future().isDone();
-
-            ldr.future().get();
-
-            // Create another loader.
-            ldr = g1.dataLoader(null);
-
-            // Cancel with future.
-            ldr.future().cancel();
-
-            try {
-                ldr.addData(0, 0);
-
-                assert false;
-            }
-            catch (IllegalStateException e) {
-                info("Caught expected exception: " + e);
-            }
-
-            assert ldr.future().isDone();
-
-            try {
-                ldr.future().get();
-
-                assert false;
-            }
-            catch (IgniteFutureCancelledException e) {
-                info("Caught expected exception: " + e);
-            }
-
-            // Create another loader.
-            ldr = g1.dataLoader(null);
-
-            // This will close loader.
-            stopGrid(getTestGridName(1), false);
-
-            try {
-                ldr.addData(0, 0);
-
-                assert false;
-            }
-            catch (IllegalStateException e) {
-                info("Caught expected exception: " + e);
-            }
-
-            assert ldr.future().isDone();
-
-            ldr.future().get();
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * Wraps integer to closure returning it.
-     *
-     * @param i Value to wrap.
-     * @return Callable.
-     */
-    private static Callable<Integer> callable(@Nullable final Integer i) {
-        return new Callable<Integer>() {
-            @Override public Integer call() throws Exception {
-                return i;
-            }
-        };
-    }
-
-    /**
-     * Wraps integer to closure returning it.
-     *
-     * @param i Value to wrap.
-     * @return Closure.
-     */
-    private static IgniteClosure<Integer, Integer> closure(@Nullable final Integer i) {
-        return new IgniteClosure<Integer, Integer>() {
-            @Override public Integer apply(Integer e) {
-                return e == null ? i : e + i;
-            }
-        };
-    }
-
-    /**
-     * Wraps object to closure returning it.
-     *
-     * @param obj Value to wrap.
-     * @return Closure.
-     */
-    private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T obj) {
-        return new IgniteClosure<T, T>() {
-            @Override public T apply(T e) {
-                assert e == null || obj == null || e.getClass() == obj.getClass() :
-                    "Expects the same types [e=" + e + ", obj=" + obj + ']';
-
-                return obj;
-            }
-        };
-    }
-
-    /**
-     * Wraps integer to closure expecting it and returning {@code null}.
-     *
-     * @param exp Expected closure value.
-     * @return Remove expected cache value closure.
-     */
-    private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T exp) {
-        return new IgniteClosure<T, T>() {
-            @Override public T apply(T act) {
-                if (exp == null ? act == null : exp.equals(act))
-                    return null;
-
-                throw new AssertionError("Unexpected value [exp=" + exp + ", act=" + act + ']');
-            }
-        };
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testFlush() throws Exception {
-        mode = LOCAL;
-
-        useCache = true;
-
-        try {
-            Ignite g = startGrid();
-
-            final GridCache<Integer, Integer> c = g.cache(null);
-
-            final IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null);
-
-            ldr.perNodeBufferSize(10);
-
-            for (int i = 0; i < 9; i++)
-                ldr.addData(i, i);
-
-            assertTrue(c.isEmpty());
-
-            multithreaded(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    ldr.flush();
-
-                    assertEquals(9, c.size());
-
-                    return null;
-                }
-            }, 5, "flush-checker");
-
-            ldr.addData(100, 100);
-
-            ldr.flush();
-
-            assertEquals(10, c.size());
-
-            ldr.addData(200, 200);
-
-            ldr.close(false);
-
-            ldr.future().get();
-
-            assertEquals(11, c.size());
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTryFlush() throws Exception {
-        mode = LOCAL;
-
-        useCache = true;
-
-        try {
-            Ignite g = startGrid();
-
-            GridCache<Integer, Integer> c = g.cache(null);
-
-            IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null);
-
-            ldr.perNodeBufferSize(10);
-
-            for (int i = 0; i < 9; i++)
-                ldr.addData(i, i);
-
-            assertTrue(c.isEmpty());
-
-            ldr.tryFlush();
-
-            Thread.sleep(100);
-
-            assertEquals(9, c.size());
-
-            ldr.close(false);
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testFlushTimeout() throws Exception {
-        mode = LOCAL;
-
-        useCache = true;
-
-        try {
-            Ignite g = startGrid();
-
-            final CountDownLatch latch = new CountDownLatch(9);
-
-            g.events().localListen(new IgnitePredicate<IgniteEvent>() {
-                @Override public boolean apply(IgniteEvent evt) {
-                    latch.countDown();
-
-                    return true;
-                }
-            }, EVT_CACHE_OBJECT_PUT);
-
-            GridCache<Integer, Integer> c = g.cache(null);
-
-            assertTrue(c.isEmpty());
-
-            IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null);
-
-            ldr.perNodeBufferSize(10);
-            ldr.autoFlushFrequency(3000);
-
-            for (int i = 0; i < 9; i++)
-                ldr.addData(i, i);
-
-            assertTrue(c.isEmpty());
-
-            assertFalse(latch.await(1000, MILLISECONDS));
-
-            assertTrue(c.isEmpty());
-
-            assertTrue(latch.await(3000, MILLISECONDS));
-
-            assertEquals(9, c.size());
-
-            ldr.close(false);
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testUpdateStore() throws Exception {
-        storeMap = new ConcurrentHashMap<>();
-
-        try {
-            store = new TestStore();
-
-            useCache = true;
-
-            Ignite ignite = startGrid(1);
-
-            startGrid(2);
-            startGrid(3);
-
-            for (int i = 0; i < 1000; i++)
-                storeMap.put(i, i);
-
-            try (IgniteDataLoader<Object, Object> ldr = ignite.dataLoader(null)) {
-                assertFalse(ldr.skipStore());
-
-                for (int i = 0; i < 1000; i++)
-                    ldr.removeData(i);
-
-                for (int i = 1000; i < 2000; i++)
-                    ldr.addData(i, i);
-            }
-
-            for (int i = 0; i < 1000; i++)
-                assertNull(storeMap.get(i));
-
-            for (int i = 1000; i < 2000; i++)
-                assertEquals(i, storeMap.get(i));
-
-            try (IgniteDataLoader<Object, Object> ldr = ignite.dataLoader(null)) {
-                ldr.skipStore(true);
-
-                for (int i = 0; i < 1000; i++)
-                    ldr.addData(i, i);
-
-                for (int i = 1000; i < 2000; i++)
-                    ldr.removeData(i);
-            }
-
-            IgniteCache<Object, Object> cache = ignite.jcache(null);
-
-            for (int i = 0; i < 1000; i++) {
-                assertNull(storeMap.get(i));
-
-                assertEquals(i, cache.get(i));
-            }
-
-            for (int i = 1000; i < 2000; i++) {
-                assertEquals(i, storeMap.get(i));
-
-                assertNull(cache.localPeek(i));
-            }
-        }
-        finally {
-            storeMap = null;
-        }
-    }
-
-    /**
-     *
-     */
-    private static class TestObject {
-        /** Value. */
-        private final int val;
-
-        /**
-         * @param val Value.
-         */
-        private TestObject(int val) {
-            this.val = val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            TestObject obj = (TestObject)o;
-
-            return val == obj.val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return val;
-        }
-    }
-
-    /**
-     *
-     */
-    private class TestStore extends CacheStoreAdapter<Object, Object> {
-        /** {@inheritDoc} */
-        @Nullable @Override public Object load(Object key) {
-            return storeMap.get(key);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(Cache.Entry<?, ?> entry) {
-            storeMap.put(entry.getKey(), entry.getValue());
-        }
-
-        /** {@inheritDoc} */
-        @Override public void delete(Object key) {
-            storeMap.remove(key);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridBasicTestSuite.java b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridBasicTestSuite.java
index 92cda61..076f9fc 100644
--- a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridBasicTestSuite.java
+++ b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridBasicTestSuite.java
@@ -20,9 +20,9 @@ package org.gridgain.testsuites.bamboo;
 import junit.framework.*;
 import org.gridgain.grid.*;
 import org.gridgain.grid.kernal.*;
-import org.gridgain.grid.kernal.processors.affinity.*;
-import org.gridgain.grid.kernal.processors.closure.*;
-import org.gridgain.grid.kernal.processors.continuous.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.closure.*;
+import org.apache.ignite.internal.processors.continuous.*;
 import org.gridgain.grid.product.*;
 import org.gridgain.grid.spi.*;
 import org.apache.ignite.internal.util.typedef.internal.*;


Mime
View raw message