ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [38/53] [abbrv] [partial] incubator-ignite git commit: # ignite-164 : GridAbstractTest -> IgniteAbstractTest - auto-renaming with all suggested options + rename all methods and fields inside AbstractTest
Date Wed, 04 Feb 2015 14:38:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6563e8a9/modules/core/src/test/java/org/apache/ignite/internal/IgniteCommunicationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteCommunicationSelfTest.java
new file mode 100644
index 0000000..65d9366
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteCommunicationSelfTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Grid basic communication test.
+ */
+@GridCommonTest(group = "Kernal Self")
+public class IgniteCommunicationSelfTest extends IgniteCommonAbstractTest {
+    /** */
+    private static Ignite ignite;
+
+    /** */
+    public IgniteCommunicationSelfTest() {
+        super(/*start grid*/true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        ignite = G.ignite(getTestIgniteName());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSendMessageToEmptyNodes() throws Exception {
+        Collection<ClusterNode> empty = Collections.emptyList();
+
+        try {
+            sendMessage(empty, 1);
+        }
+        catch (IllegalArgumentException ignored) {
+            // No-op.
+        }
+    }
+
+    /**
+     * @param nodes Nodes to send message to.
+     * @param cntr Counter.
+     */
+    private void sendMessage(Collection<ClusterNode> nodes, int cntr) {
+        try {
+            message(ignite.cluster().forNodes(nodes)).send(null,
+                new GridTestCommunicationMessage(cntr, ignite.cluster().localNode().id()));
+        }
+        catch (IgniteException e) {
+            error("Failed to send message.", e);
+        }
+    }
+
+    /**
+     * Test message.
+     */
+    @SuppressWarnings({"PublicInnerClass"})
+    public static class GridTestCommunicationMessage implements Serializable {
+        /** */
+        private final int msgId;
+
+        /** */
+        private final UUID sndId;
+
+        /**
+         * @param msgId Message id.
+         * @param sndId Sender id.
+         */
+        public GridTestCommunicationMessage(int msgId, UUID sndId) {
+            assert sndId != null;
+
+            this.msgId = msgId;
+            this.sndId = sndId;
+        }
+
+        /**
+         * @return Message id.
+         */
+        public int getMessageId() {
+            return msgId;
+        }
+
+        /**
+         * @return Sender id.
+         */
+        public UUID getSenderId() {
+            return sndId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            StringBuilder buf = new StringBuilder();
+
+            buf.append(getClass().getSimpleName());
+            buf.append(" [msgId=").append(msgId);
+            buf.append(']');
+
+            return buf.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6563e8a9/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
index bdbdd86..c605d2a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
@@ -38,13 +38,14 @@ import static org.apache.ignite.cache.CacheMode.*;
 /**
  *
  */
-public class IgniteComputeEmptyClusterGroupTest extends GridCommonAbstractTest {
+public class IgniteComputeEmptyClusterGroupTest extends IgniteCommonAbstractTest {
     /** */
     private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
+    /** {@inheritDoc}
+     * @param igniteName*/
+    @Override protected IgniteConfiguration getConfiguration(String igniteName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteName);
 
         TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
 
@@ -65,12 +66,12 @@ public class IgniteComputeEmptyClusterGroupTest extends GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
-        startGrids(2);
+        startIgnites(2);
     }
 
     /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
+        stopAllIgnites();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6563e8a9/modules/core/src/test/java/org/apache/ignite/internal/IgniteContinuousJobAnnotationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteContinuousJobAnnotationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteContinuousJobAnnotationSelfTest.java
new file mode 100644
index 0000000..e6524f2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteContinuousJobAnnotationSelfTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Test for various job callback annotations.
+ */
+@GridCommonTest(group = "Kernal Self")
+public class IgniteContinuousJobAnnotationSelfTest extends IgniteCommonAbstractTest {
+    /** */
+    private static final AtomicBoolean fail = new AtomicBoolean();
+
+    /** */
+    private static final AtomicInteger afterSendCnt = new AtomicInteger();
+
+    /** */
+    private static final AtomicInteger beforeFailoverCnt = new AtomicInteger();
+
+    /** */
+    private static final AtomicReference<Exception> err = new AtomicReference<>();
+
+    /** {@inheritDoc}
+     * @param igniteName*/
+    @Override protected IgniteConfiguration getConfiguration(String igniteName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(igniteName);
+
+        c.setMarshalLocalJobs(false);
+
+        return c;
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testJobAnnotation() throws Exception {
+        testContinuousJobAnnotation(TestJob.class);
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testJobChildAnnotation() throws Exception {
+        testContinuousJobAnnotation(TestJobChild.class);
+    }
+
+    /**
+     * @param jobCls Job class.
+     * @throws Exception If test failed.
+     */
+    public void testContinuousJobAnnotation(Class<?> jobCls) throws Exception {
+        try {
+            Ignite ignite = startIgnite(0);
+            startIgnite(1);
+
+            fail.set(true);
+
+            ignite.compute().execute(TestTask.class, jobCls);
+
+            Exception e = err.get();
+
+            if (e != null)
+                throw e;
+        }
+        finally {
+            stopIgnite(0);
+            stopIgnite(1);
+        }
+
+        assertEquals(2, afterSendCnt.getAndSet(0));
+        assertEquals(1, beforeFailoverCnt.getAndSet(0));
+    }
+
+    /** */
+    @SuppressWarnings({"PublicInnerClass", "unused"})
+    public static class TestTask implements ComputeTask<Object, Object> {
+        /** */
+        @IgniteTaskContinuousMapperResource
+        private ComputeTaskContinuousMapper mapper;
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object arg) {
+            try {
+                mapper.send(((Class<ComputeJob>)arg).newInstance());
+            }
+            catch (Exception e) {
+                throw new IgniteException("Job instantination failed.", e);
+            }
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> received)
+            throws IgniteException {
+            if (res.getException() != null) {
+                if (res.getException() instanceof ComputeUserUndeclaredException)
+                    throw new IgniteException("Job threw unexpected exception.", res.getException());
+
+                return ComputeJobResultPolicy.FAILOVER;
+            }
+
+            return ComputeJobResultPolicy.WAIT;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException {
+            assert results.size() == 1 : "Unexpected result count: " + results.size();
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestJob extends ComputeJobAdapter {
+        /** */
+        private boolean flag = true;
+
+        /** */
+        TestJob() {
+            X.println("Constructing TestJob [this=" + this + ", identity=" + System.identityHashCode(this) + "]");
+        }
+
+
+        /** */
+        @ComputeJobAfterSend
+        private void afterSend() {
+            X.println("AfterSend start TestJob [this=" + this + ", identity=" + System.identityHashCode(this) +
+                ", flag=" + flag + "]");
+
+            afterSendCnt.incrementAndGet();
+
+            flag = false;
+
+            X.println("AfterSend end TestJob [this=" + this + ", identity=" + System.identityHashCode(this) +
+                ", flag=" + flag + "]");
+        }
+
+        /** */
+        @ComputeJobBeforeFailover
+        private void beforeFailover() {
+            X.println("BeforeFailover start TestJob [this=" + this + ", identity=" + System.identityHashCode(this) +
+                ", flag=" + flag + "]");
+
+            beforeFailoverCnt.incrementAndGet();
+
+            flag = true;
+
+            X.println("BeforeFailover end TestJob [this=" + this + ", identity=" + System.identityHashCode(this) +
+                ", flag=" + flag + "]");
+        }
+
+        /** {@inheritDoc} */
+        @Override public Serializable execute() throws IgniteException {
+            X.println("Execute TestJob [this=" + this + ", identity=" + System.identityHashCode(this) +
+                ", flag=" + flag + "]");
+
+            if (!flag) {
+                String msg = "Flag is false on execute [this=" + this + ", identity=" + System.identityHashCode(this) +
+                    ", flag=" + flag + "]";
+
+                X.println(msg);
+
+                err.compareAndSet(null, new Exception(msg));
+            }
+
+            if (fail.get()) {
+                fail.set(false);
+
+                throw new IgniteException("Expected test exception.");
+            }
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestJobChild extends TestJob {
+        /**
+         * Required for reflectional creation.
+         */
+        TestJobChild() {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6563e8a9/modules/core/src/test/java/org/apache/ignite/internal/IgniteContinuousJobSiblingsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteContinuousJobSiblingsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteContinuousJobSiblingsSelfTest.java
new file mode 100644
index 0000000..c48c3b0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteContinuousJobSiblingsSelfTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Test continuous mapper with siblings.
+ */
+@GridCommonTest(group = "Kernal Self")
+public class IgniteContinuousJobSiblingsSelfTest extends IgniteCommonAbstractTest {
+    /** */
+    private static final int JOB_COUNT = 10;
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testContinuousJobSiblings() throws Exception {
+        try {
+            Ignite ignite = startIgnite(0);
+            startIgnite(1);
+
+            ignite.compute().execute(TestTask.class, null);
+        }
+        finally {
+            stopAllIgnites();
+        }
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testContinuousJobSiblingsLocalNode() throws Exception {
+        try {
+            Ignite ignite = startIgnite(0);
+
+            compute(ignite.cluster().forLocal()).execute(TestTask.class, null);
+        }
+        finally {
+            stopAllIgnites();
+        }
+    }
+
+    /** */
+    private static class TestTask extends ComputeTaskSplitAdapter<Object, Object> {
+        /** */
+        @IgniteTaskContinuousMapperResource
+        private ComputeTaskContinuousMapper mapper;
+
+        /** */
+        @IgniteTaskSessionResource
+        private ComputeTaskSession ses;
+
+        /** */
+        private volatile int jobCnt;
+
+        /** {@inheritDoc} */
+        @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) {
+            return Collections.singleton(new TestJob(++jobCnt));
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> received) {
+            if (res.getException() != null)
+                throw new IgniteException("Job resulted in error: " + res, res.getException());
+
+            assert ses.getJobSiblings().size() == jobCnt;
+
+            if (jobCnt < JOB_COUNT) {
+                mapper.send(new TestJob(++jobCnt));
+
+                assert ses.getJobSiblings().size() == jobCnt;
+            }
+
+            return ComputeJobResultPolicy.WAIT;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object reduce(List<ComputeJobResult> results) {
+            assertEquals(JOB_COUNT, results.size());
+
+            return null;
+        }
+    }
+
+    /** */
+    private static class TestJob extends ComputeJobAdapter {
+        /** */
+        @IgniteTaskSessionResource
+        private ComputeTaskSession ses;
+
+        /** */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /**
+         * @param sibCnt Siblings count to check.
+         */
+        TestJob(int sibCnt) {
+            super(sibCnt);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Serializable execute() {
+            assert ses != null;
+            assert argument(0) != null;
+
+            Integer sibCnt = argument(0);
+
+            log.info("Executing job.");
+
+            assert sibCnt != null;
+
+            Collection<ComputeJobSibling> sibs = ses.getJobSiblings();
+
+            assert sibs != null;
+            assert sibs.size() == sibCnt : "Unexpected siblings collection [expectedSize=" + sibCnt +
+                ", siblingsCnt=" + sibs.size() + ", siblings=" + sibs + ']';
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6563e8a9/modules/core/src/test/java/org/apache/ignite/internal/IgniteContinuousTaskSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteContinuousTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteContinuousTaskSelfTest.java
new file mode 100644
index 0000000..5cac2f0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteContinuousTaskSelfTest.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Continuous task test.
+ */
+@GridCommonTest(group = "Kernal Self")
+public class IgniteContinuousTaskSelfTest extends IgniteCommonAbstractTest {
+    /** */
+    private static final int JOB_COUNT = 10;
+
+    /** */
+    private static final int THREAD_COUNT = 10;
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testContinuousJobsChain() throws Exception {
+        try {
+            Ignite ignite = startIgnite(0);
+
+            IgniteCompute comp = ignite.compute().withAsync();
+
+            comp.execute(TestJobsChainTask.class, true);
+
+            ComputeTaskFuture<Integer> fut1 = comp.future();
+
+            comp.execute(TestJobsChainTask.class, false);
+
+            ComputeTaskFuture<Integer> fut2 = comp.future();
+
+            assert fut1.get() == 55;
+            assert fut2.get() == 55;
+        }
+        finally {
+            stopIgnite(0);
+        }
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testContinuousJobsChainMultiThreaded() throws Exception {
+        try {
+            final Ignite ignite = startIgnite(0);
+            startIgnite(1);
+
+            GridTestUtils.runMultiThreaded(new Runnable() {
+                /** {@inheritDoc} */
+                @Override public void run() {
+                    try {
+                        IgniteCompute comp = ignite.compute().withAsync();
+
+                        comp.execute(TestJobsChainTask.class, true);
+
+                        ComputeTaskFuture<Integer> fut1 = comp.future();
+
+                        comp.execute(TestJobsChainTask.class, false);
+
+                        ComputeTaskFuture<Integer> fut2 = comp.future();
+
+                        assert fut1.get() == 55;
+                        assert fut2.get() == 55;
+                    }
+                    catch (IgniteException e) {
+                        assert false : "Test task failed: " + e;
+                    }
+                }
+
+            }, THREAD_COUNT, "continuous-jobs-chain");
+        }
+        finally {
+            stopIgnite(0);
+            stopIgnite(1);
+        }
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testContinuousJobsSessionChain() throws Exception {
+        try {
+            Ignite ignite = startIgnite(0);
+            startIgnite(1);
+
+            ignite.compute().execute(SessionChainTestTask.class, false);
+        }
+        finally {
+            stopIgnite(0);
+            stopIgnite(1);
+        }
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testContinuousSlowMap() throws Exception {
+        try {
+            Ignite ignite = startIgnite(0);
+
+            Integer cnt = ignite.compute().execute(SlowMapTestTask.class, null);
+
+            assert cnt != null;
+            assert cnt == 2 : "Unexpected result: " + cnt;
+        }
+        finally {
+            stopIgnite(0);
+        }
+    }
+
+    /** */
+    @SuppressWarnings({"PublicInnerClass"})
+    public static class TestJobsChainTask implements ComputeTask<Boolean, Integer> {
+        /** */
+        @IgniteTaskContinuousMapperResource
+        private ComputeTaskContinuousMapper mapper;
+
+        /** */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /** */
+        private int cnt;
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Boolean arg) {
+            assert mapper != null;
+            assert arg != null;
+
+            ComputeJob job = new TestJob(++cnt);
+
+            if (arg) {
+                mapper.send(job, subgrid.get(0));
+
+                log.info("Sent test task by continuous mapper: " + job);
+            }
+            else {
+                log.info("Will return job as map() result: " + job);
+
+                return Collections.singletonMap(job, subgrid.get(0));
+            }
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> received) {
+            assert mapper != null;
+            assert res.getException() == null : "Unexpected exception: " + res.getException();
+
+            log.info("Received job result [result=" + res + ", count=" + cnt + ']');
+
+            int tmp = ++cnt;
+
+            if (tmp <= JOB_COUNT) {
+                mapper.send(new TestJob(tmp));
+
+                log.info("Sent test task by continuous mapper (from result() method).");
+            }
+
+            return ComputeJobResultPolicy.WAIT;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer reduce(List<ComputeJobResult> results) {
+            assert results.size() == 10 : "Unexpected result count: " + results.size();
+
+            log.info("Called reduce() method [results=" + results + ']');
+
+            int res = 0;
+
+            for (ComputeJobResult result : results) {
+                assert result.getData() != null : "Unexpected result data (null): " + result;
+
+                res += (Integer)result.getData();
+            }
+
+            return res;
+        }
+    }
+
+    /** */
+    @SuppressWarnings({"PublicInnerClass"})
+    public static class TestJob extends ComputeJobAdapter {
+        /** */
+        public TestJob() { /* No-op. */ }
+
+        /**
+         * @param arg Job argument.
+         */
+        public TestJob(Integer arg) {
+            super(arg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Serializable execute() {
+            Integer i = argument(0);
+
+            return i != null ? i : 0;
+        }
+    }
+
+    /** */
+    @SuppressWarnings({"PublicInnerClass"})
+    @ComputeTaskSessionFullSupport
+    public static class SessionChainTestTask extends ComputeTaskSplitAdapter<Object, Object> {
+        /** */
+        @IgniteTaskSessionResource
+        private ComputeTaskSession ses;
+
+        /** */
+        @IgniteTaskContinuousMapperResource
+        private ComputeTaskContinuousMapper mapper;
+
+        /** */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) {
+            ses.addAttributeListener(new ComputeTaskSessionAttributeListener() {
+                @Override public void onAttributeSet(Object key, Object val) {
+                    if (key instanceof String) {
+                        if (((String)key).startsWith("sendJob")) {
+                            assert val instanceof Integer;
+
+                            int cnt = (Integer)val;
+
+                            if (cnt < JOB_COUNT) {
+                                try {
+                                    mapper.send(new SessionChainTestJob(cnt));
+                                }
+                                catch (IgniteException e) {
+                                    log.error("Failed to send new job.", e);
+                                }
+                            }
+                        }
+                    }
+                }
+            }, true);
+
+            Collection<ComputeJob> jobs = new ArrayList<>();
+
+            for (int i = 0; i < JOB_COUNT; i++)
+                jobs.add(new SessionChainTestJob(0));
+
+            return jobs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object reduce(List<ComputeJobResult> results) {
+            assertEquals(JOB_COUNT * JOB_COUNT, results.size());
+
+            return null;
+        }
+    }
+
+    /** */
+    @SuppressWarnings({"PublicInnerClass"})
+    public static class SessionChainTestJob extends ComputeJobAdapter {
+        /** */
+        @IgniteTaskSessionResource
+        private ComputeTaskSession ses;
+
+        /** */
+        @IgniteJobContextResource
+        private ComputeJobContext ctx;
+
+        /** */
+        public SessionChainTestJob() { /* No-op. */}
+
+        /**
+         * @param arg Job argument.
+         */
+        public SessionChainTestJob(Integer arg) {
+            super(arg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Serializable execute() {
+            Integer i = argument(0);
+
+            int arg = i != null ? i : 0;
+
+            ses.setAttribute("sendJob" + ctx.getJobId(), 1 + arg);
+
+            return arg;
+        }
+    }
+
+    /** */
+    @SuppressWarnings({"PublicInnerClass"})
+    public static class SlowMapTestTask extends ComputeTaskAdapter<Object, Integer> {
+        /** */
+        @IgniteTaskContinuousMapperResource
+        private ComputeTaskContinuousMapper mapper;
+
+        /** */
+        private int cnt;
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object arg) {
+            mapper.send(new TestJob(++cnt));
+
+            try {
+                Thread.sleep(10000);
+            }
+            catch (InterruptedException e) {
+                throw new IgniteException("Job has been interrupted.", e);
+            }
+
+            mapper.send(new TestJob(++cnt));
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteException {
+            return results == null ? 0 : results.size();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6563e8a9/modules/core/src/test/java/org/apache/ignite/internal/IgniteDeploymentMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteDeploymentMultiThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDeploymentMultiThreadedSelfTest.java
new file mode 100644
index 0000000..7839f0d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDeploymentMultiThreadedSelfTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+
+/**
+ * Task deployment tests.
+ */
+public class IgniteDeploymentMultiThreadedSelfTest extends IgniteCommonAbstractTest {
+    /** */
+    private static final int THREAD_CNT = 20;
+
+    /** */
+    private static final int EXEC_CNT = 30000;
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeploy() throws Exception {
+        try {
+            final Ignite ignite = startIgnite(0);
+
+            ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader());
+
+            assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null;
+
+            ignite.compute().undeployTask(GridDeploymentTestTask.class.getName());
+
+            final CyclicBarrier barrier = new CyclicBarrier(THREAD_CNT, new Runnable() {
+                private int iterCnt;
+
+                @Override public void run() {
+                    try {
+                        ignite.compute().undeployTask(GridDeploymentTestTask.class.getName());
+
+                        assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) == null;
+
+                        if (++iterCnt % 100 == 0)
+                            info("Iterations count: " + iterCnt);
+                    }
+                    catch (IgniteException e) {
+                        U.error(log, "Failed to undeploy task message.", e);
+
+                        fail("See logs for details.");
+                    }
+                }
+            });
+
+            GridTestUtils.runMultiThreaded(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    try {
+                        for (int i = 0; i < EXEC_CNT; i++) {
+                            barrier.await(2000, MILLISECONDS);
+
+                            ignite.compute().localDeployTask(GridDeploymentTestTask.class,
+                                GridDeploymentTestTask.class.getClassLoader());
+
+                            assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null;
+                        }
+                    }
+                    catch (Exception e) {
+                        U.error(log, "Test failed.", e);
+
+                        throw e;
+                    }
+                    finally {
+                        info("Thread finished.");
+                    }
+
+                    return null;
+                }
+            }, THREAD_CNT, "grid-load-test-thread");
+        }
+        finally {
+            stopAllIgnites();
+        }
+    }
+
+    /**
+     * Test task.
+     */
+    private static class GridDeploymentTestTask extends ComputeTaskAdapter<Object, Object> {
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object arg) {
+            assert false;
+
+            return Collections.emptyMap();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object reduce(List<ComputeJobResult> results) {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6563e8a9/modules/core/src/test/java/org/apache/ignite/internal/IgniteDeploymentSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDeploymentSelfTest.java
new file mode 100644
index 0000000..ec7bbd0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDeploymentSelfTest.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.deployment.local.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+
+/**
+ * Task deployment tests.
+ */
+@SuppressWarnings("unchecked")
+@GridCommonTest(group = "Kernal Self")
+public class IgniteDeploymentSelfTest extends IgniteCommonAbstractTest {
+    /** */
+    private TestDeploymentSpi depSpi;
+
+    /** */
+    private boolean p2pEnabled = true;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        depSpi = new TestDeploymentSpi();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        depSpi = null;
+    }
+
+    /** {@inheritDoc}
+     * @param igniteName*/
+    @Override protected IgniteConfiguration getConfiguration(String igniteName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteName);
+
+        cfg.setDeploymentSpi(depSpi = new TestDeploymentSpi());
+        cfg.setPeerClassLoadingEnabled(p2pEnabled);
+
+        // Disable cache since it can deploy some classes during start process.
+        cfg.setCacheConfiguration();
+
+        return cfg;
+    }
+
+    /** */
+    public IgniteDeploymentSelfTest() {
+        super(/*start grid*/false);
+    }
+
+    /**
+     * @param ignite Grid.
+     * @param taskName Task name.
+     * @return {@code True} if task is not deployed.
+     */
+    private boolean checkUndeployed(Ignite ignite, String taskName) {
+        return ignite.compute().localTasks().get(taskName) == null;
+    }
+
+    /**
+     * @param ignite Grid.
+     */
+    @SuppressWarnings({"CatchGenericClass"})
+    private void stopGrid(Ignite ignite) {
+        try {
+            if (ignite != null)
+                stopIgnite(ignite.name());
+        }
+        catch (Throwable e) {
+            error("Got error when stopping grid.", e);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeploy() throws Exception {
+        Ignite ignite = startIgnite(getTestIgniteName());
+
+        try {
+            ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader());
+
+            assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount();
+            assert depSpi.getUnregisterCount() == 0 : "Invalid undeploy count: " + depSpi.getUnregisterCount();
+
+            assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null;
+
+            ignite.compute().undeployTask(GridDeploymentTestTask.class.getName());
+
+            assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount();
+            assert depSpi.getUnregisterCount() == 1 : "Invalid undeploy count: " + depSpi.getUnregisterCount();
+
+            assert checkUndeployed(ignite, GridDeploymentTestTask.class.getName());
+        }
+        finally {
+            stopGrid(ignite);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIgnoreDeploymentSpi() throws Exception {
+        // If peer class loading is disabled and local deployment SPI
+        // is configured, SPI should be ignored.
+        p2pEnabled = false;
+
+        Ignite ignite = startIgnite(getTestIgniteName());
+
+        try {
+            ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader());
+
+            assert depSpi.getRegisterCount() == 0 : "Invalid deploy count: " + depSpi.getRegisterCount();
+            assert depSpi.getUnregisterCount() == 0 : "Invalid undeploy count: " + depSpi.getUnregisterCount();
+
+            ignite.compute().undeployTask(GridDeploymentTestTask.class.getName());
+
+            assert depSpi.getRegisterCount() == 0 : "Invalid deploy count: " + depSpi.getRegisterCount();
+            assert depSpi.getUnregisterCount() == 0 : "Invalid undeploy count: " + depSpi.getUnregisterCount();
+        }
+        finally {
+            stopGrid(ignite);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRedeploy() throws Exception {
+        Ignite ignite = startIgnite(getTestIgniteName());
+
+        try {
+            // Added to work with P2P.
+            ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader());
+
+            // Check auto-deploy.
+            ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridDeploymentTestTask.class.getName(), null);
+
+            fut.get();
+
+            assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount();
+            assert depSpi.getUnregisterCount() == 0 : "Invalid undeploy count: " + depSpi.getUnregisterCount();
+
+            assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null;
+
+            // Check 2nd execute.
+            fut = executeAsync(ignite.compute(), GridDeploymentTestTask.class.getName(), null);
+
+            fut.get();
+
+            assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount();
+            assert depSpi.getUnregisterCount() == 0 : "Invalid undeploy count: " + depSpi.getUnregisterCount();
+
+            assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null;
+
+            // Redeploy, should be NO-OP for the same task.
+            ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader());
+
+            assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null;
+
+            assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount();
+            assert depSpi.getUnregisterCount() == 0 : "Invalid undeploy count: " + depSpi.getUnregisterCount();
+
+            // Check 2nd execute.
+            fut = executeAsync(ignite.compute(), GridDeploymentTestTask.class.getName(), null);
+
+            fut.get();
+
+            assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null;
+
+            assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount();
+            assert depSpi.getUnregisterCount() == 0 : "Invalid undeploy count: " + depSpi.getUnregisterCount();
+
+            // Check undeploy.
+            ignite.compute().undeployTask(GridDeploymentTestTask.class.getName());
+
+            assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) == null;
+
+            assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount();
+            assert depSpi.getUnregisterCount() == 1 : "Invalid undeploy count: " + depSpi.getUnregisterCount();
+
+            // Added to work with P2P
+            ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader());
+
+            // Check auto-deploy.
+            executeAsync(ignite.compute(), GridDeploymentTestTask.class.getName(), null);
+
+            assert depSpi.getRegisterCount() == 2;
+            assert depSpi.getUnregisterCount() == 1;
+
+            assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null;
+
+            ignite.compute().localDeployTask(GridDeploymentTestTask1.class,
+                GridDeploymentTestTask1.class.getClassLoader());
+
+            try {
+                ignite.compute().localDeployTask(GridDeploymentTestTask2.class,
+                    GridDeploymentTestTask2.class.getClassLoader());
+
+                assert false : "Should not be able to deploy 2 task with same task name";
+            }
+            catch (IgniteException e) {
+                info("Received expected grid exception: " + e);
+            }
+
+            assert depSpi.getRegisterCount() == 3 : "Invalid register count: " + depSpi.getRegisterCount();
+            assert depSpi.getUnregisterCount() == 1 : "Invalid unregister count: " + depSpi.getUnregisterCount();
+
+            assert ignite.compute().localTasks().get("GridDeploymentTestTask") != null;
+
+            Class<? extends ComputeTask<?, ?>> cls = ignite.compute().localTasks().get("GridDeploymentTestTask");
+
+            assert cls.getName().equals(GridDeploymentTestTask1.class.getName());
+        }
+        finally {
+            stopGrid(ignite);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"BusyWait"})
+    public void testDeployOnTwoNodes() throws Exception {
+        Ignite ignite1 = startIgnite(getTestIgniteName() + '1');
+        Ignite ignite2 = startIgnite(getTestIgniteName() + '2');
+
+        try {
+            assert !ignite1.cluster().forRemotes().nodes().isEmpty() : ignite1.cluster().forRemotes();
+            assert !ignite2.cluster().forRemotes().nodes().isEmpty() : ignite2.cluster().forRemotes();
+
+            ignite1.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader());
+            ignite2.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader());
+
+            assert ignite1.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null;
+            assert ignite2.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null;
+
+            ignite1.compute().undeployTask(GridDeploymentTestTask.class.getName());
+
+            assert checkUndeployed(ignite1, GridDeploymentTestTask.class.getName());
+
+            int cnt = 0;
+
+            boolean taskUndeployed = false;
+
+            while (cnt++ < 10 && !taskUndeployed) {
+                taskUndeployed = checkUndeployed(ignite2, GridDeploymentTestTask.class.getName());
+
+                if (!taskUndeployed)
+                    Thread.sleep(500);
+            }
+
+            // Undeploy on one node should undeploy explicitly deployed
+            // tasks on the others
+            assert taskUndeployed;
+        }
+        finally {
+            stopGrid(ignite1);
+            stopGrid(ignite2);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeployEvents() throws Exception {
+        Ignite ignite = startIgnite(getTestIgniteName());
+
+        try {
+            DeploymentEventListener evtLsnr = new DeploymentEventListener();
+
+            ignite.events().localListen(evtLsnr, EVT_TASK_DEPLOYED, EVT_TASK_UNDEPLOYED);
+
+            // Should generate 1st deployment event.
+            ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader());
+
+            assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount();
+            assert depSpi.getUnregisterCount() == 0 : "Invalid undeploy count: " + depSpi.getUnregisterCount();
+
+            assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null;
+
+            // Should generate 1st un-deployment event.
+            ignite.compute().undeployTask(GridDeploymentTestTask.class.getName());
+
+            assert depSpi.getRegisterCount() == 1 : "Invalid deploy count: " + depSpi.getRegisterCount();
+            assert depSpi.getUnregisterCount() == 1 : "Invalid undeploy count: " + depSpi.getUnregisterCount();
+
+            assert checkUndeployed(ignite, GridDeploymentTestTask.class.getName());
+
+            // Should generate 2nd deployment event.
+            ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader());
+
+            assert depSpi.getRegisterCount() == 2 : "Invalid deploy count: " + depSpi.getRegisterCount();
+            assert depSpi.getUnregisterCount() == 1 : "Invalid undeploy count: " + depSpi.getUnregisterCount();
+
+            assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null;
+
+            // Should generate 2nd un-deployment event.
+            ignite.compute().undeployTask(GridDeploymentTestTask.class.getName());
+
+            assert depSpi.getRegisterCount() == 2 : "Invalid deploy count: " + depSpi.getRegisterCount();
+            assert depSpi.getUnregisterCount() == 2 : "Invalid undeploy count: " + depSpi.getUnregisterCount();
+
+            assert checkUndeployed(ignite, GridDeploymentTestTask.class.getName());
+
+            // Should generate 3rd deployment event.
+            ignite.compute().localDeployTask(GridDeploymentTestTask.class, GridDeploymentTestTask.class.getClassLoader());
+
+            assert depSpi.getRegisterCount() == 3 : "Invalid deploy count: " + depSpi.getRegisterCount();
+            assert depSpi.getUnregisterCount() == 2 : "Invalid undeploy count: " + depSpi.getUnregisterCount();
+
+            assert ignite.compute().localTasks().get(GridDeploymentTestTask.class.getName()) != null;
+
+            // Should generate 3rd un-deployment event.
+            ignite.compute().undeployTask(GridDeploymentTestTask.class.getName());
+
+            assert depSpi.getRegisterCount() == 3 : "Invalid deploy count: " + depSpi.getRegisterCount();
+            assert depSpi.getUnregisterCount() == 3 : "Invalid undeploy count: " + depSpi.getUnregisterCount();
+
+            assert checkUndeployed(ignite, GridDeploymentTestTask.class.getName());
+
+            assert evtLsnr.getDeployCount() == 3 : "Invalid number of deployment events" + evtLsnr.getDeployCount();
+            assert evtLsnr.getUndeployCount() == 3 : "Invalid number of un-deployment events" + evtLsnr.getDeployCount();
+        }
+        finally {
+            stopGrid(ignite);
+        }
+    }
+
+    /**
+     * Test deployable task.
+     */
+    private static class GridDeploymentTestTask extends ComputeTaskAdapter<Object, Object> {
+        /** */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object arg) {
+            Map<ComputeJobAdapter, ClusterNode> map = new HashMap<>(subgrid.size());
+
+            for (ClusterNode node : subgrid) {
+                map.put(new ComputeJobAdapter() {
+                    @Override public Serializable execute() {
+                        if (log.isInfoEnabled())
+                            log.info("Executing grid job: " + this);
+
+                        return null;
+                    }
+                }, node);
+            }
+
+            return map;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object reduce(List<ComputeJobResult> results) {
+            return null;
+        }
+    }
+
+    /**
+     * Test deployable named task.
+     */
+    @ComputeTaskName(value = "GridDeploymentTestTask")
+    private static class GridDeploymentTestTask1 extends ComputeTaskAdapter<Object, Object> {
+        /** */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object arg) {
+            Map<ComputeJobAdapter, ClusterNode> map = new HashMap<>(subgrid.size());
+
+            for (ClusterNode node : subgrid) {
+                map.put(new ComputeJobAdapter() {
+                    @Override public Serializable execute() {
+                        log.info("Executing grid job: " + this);
+
+                        return null;
+                    }
+                }, node);
+            }
+
+            return map;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object reduce(List<ComputeJobResult> results) {
+            return null;
+        }
+    }
+
+    /**
+     * Test deployable named task.
+     */
+    @ComputeTaskName(value = "GridDeploymentTestTask")
+    private static class GridDeploymentTestTask2 extends ComputeTaskAdapter<Object, Object> {
+        /** */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object arg) {
+            Map<ComputeJobAdapter, ClusterNode> map = new HashMap<>(subgrid.size());
+
+            for (ClusterNode node : subgrid) {
+                map.put(new ComputeJobAdapter() {
+                    @Override public Serializable execute() {
+                        if (log.isInfoEnabled())
+                            log.info("Executing grid job: " + this);
+
+                        return null;
+                    }
+                }, node);
+            }
+
+            return map;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object reduce(List<ComputeJobResult> results) {
+            return null;
+        }
+    }
+
+    /**
+     *
+     * Test deployment spi.
+     */
+    private static class TestDeploymentSpi extends LocalDeploymentSpi {
+        /** */
+        private volatile int deployCnt;
+
+        /** */
+        private volatile int undeployCnt;
+
+        /** {@inheritDoc} */
+        @Override public boolean register(ClassLoader ldr, Class rsrc) throws IgniteSpiException {
+            if (super.register(ldr, rsrc)) {
+                deployCnt++;
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean unregister(String rsrcName) {
+            undeployCnt++;
+
+            return super.unregister(rsrcName);
+        }
+
+        /**
+         * @return Deploy count.
+         */
+        public int getRegisterCount() {
+            return deployCnt;
+        }
+
+        /**
+         * @return Undeploy count.
+         */
+        public int getUnregisterCount() {
+            return undeployCnt;
+        }
+    }
+
+    /**
+     * Deployment listener.
+     */
+    private static class DeploymentEventListener implements IgnitePredicate<IgniteEvent> {
+        /** */
+        private int depCnt;
+
+        /** */
+        private int undepCnt;
+
+        /**
+         * Gonna process task deployment events only.
+         *
+         * @param evt local grid event.
+         */
+        @Override public boolean apply(IgniteEvent evt) {
+            if (evt.type() == EVT_TASK_DEPLOYED)
+                depCnt++;
+            else if (evt.type() == EVT_TASK_UNDEPLOYED)
+                undepCnt++;
+
+            return true;
+        }
+
+        /**
+         * @return Deploy count.
+         */
+        public int getDeployCount() {
+            return depCnt;
+        }
+
+        /**
+         * @return Undeploy count.
+         */
+        public int getUndeployCount() {
+            return undepCnt;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6563e8a9/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryEventSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryEventSelfTest.java
new file mode 100644
index 0000000..38e31fd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryEventSelfTest.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+
+/**
+ * Tests discovery event topology snapshots.
+ */
+public class IgniteDiscoveryEventSelfTest extends IgniteCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Daemon flag. */
+    private boolean daemon;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        daemon = false;
+    }
+
+    /** */
+    private static final IgniteClosure<ClusterNode, UUID> NODE_2ID = new IgniteClosure<ClusterNode, UUID>() {
+        @Override public UUID apply(ClusterNode n) {
+            return n.id();
+        }
+
+        @Override public String toString() {
+            return "Grid node shadow to node ID transformer closure.";
+        }
+    };
+
+    /** {@inheritDoc}
+     * @param igniteName*/
+    @Override protected IgniteConfiguration getConfiguration(String igniteName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(igniteName);
+
+        c.setDaemon(daemon);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        c.setRestEnabled(false);
+
+        return c;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinSequenceEvents() throws Exception {
+        try {
+            Ignite g0 = startIgnite(0);
+
+            UUID id0 = g0.cluster().localNode().id();
+
+            final ConcurrentMap<Integer, Collection<ClusterNode>> evts = new ConcurrentHashMap<>();
+
+            g0.events().localListen(new IgnitePredicate<IgniteEvent>() {
+                private AtomicInteger cnt = new AtomicInteger();
+
+                @Override public boolean apply(IgniteEvent evt) {
+                    assert evt.type() == EVT_NODE_JOINED;
+
+                    evts.put(cnt.getAndIncrement(), ((IgniteDiscoveryEvent) evt).topologyNodes());
+
+                    return true;
+                }
+            }, EVT_NODE_JOINED);
+
+            UUID id1 = startIgnite(1).cluster().localNode().id();
+            UUID id2 = startIgnite(2).cluster().localNode().id();
+            UUID id3 = startIgnite(3).cluster().localNode().id();
+
+            U.sleep(100);
+
+            assertEquals("Wrong count of events received", 3, evts.size());
+
+            Collection<ClusterNode> top0 = evts.get(0);
+
+            assertNotNull(top0);
+            assertEquals(2, top0.size());
+            assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id0));
+            assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id1));
+
+            Collection<ClusterNode> top1 = evts.get(1);
+
+            assertNotNull(top1);
+            assertEquals(3, top1.size());
+            assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id0));
+            assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id1));
+            assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id2));
+
+            Collection<ClusterNode> top2 = evts.get(2);
+
+            assertNotNull(top2);
+            assertEquals(4, top2.size());
+            assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id0));
+            assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id1));
+            assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id2));
+            assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id3));
+        }
+        finally {
+            stopAllIgnites();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLeaveSequenceEvents() throws Exception {
+        try {
+            Ignite g0 = startIgnite(0);
+
+            UUID id0 = g0.cluster().localNode().id();
+            UUID id1 = startIgnite(1).cluster().localNode().id();
+            UUID id2 = startIgnite(2).cluster().localNode().id();
+            UUID id3 = startIgnite(3).cluster().localNode().id();
+
+            final ConcurrentMap<Integer, Collection<ClusterNode>> evts = new ConcurrentHashMap<>();
+
+            g0.events().localListen(new IgnitePredicate<IgniteEvent>() {
+                private AtomicInteger cnt = new AtomicInteger();
+
+                @Override public boolean apply(IgniteEvent evt) {
+                    assert evt.type() == EVT_NODE_LEFT;
+
+                    evts.put(cnt.getAndIncrement(), ((IgniteDiscoveryEvent) evt).topologyNodes());
+
+                    return true;
+                }
+            }, EVT_NODE_LEFT);
+
+            stopIgnite(3);
+            stopIgnite(2);
+            stopIgnite(1);
+
+            U.sleep(100);
+
+            assertEquals("Wrong count of events received", 3, evts.size());
+
+            Collection<ClusterNode> top2 = evts.get(0);
+
+            assertNotNull(top2);
+            assertEquals(3, top2.size());
+            assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id0));
+            assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id1));
+            assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id2));
+            assertFalse(F.viewReadOnly(top2, NODE_2ID).contains(id3));
+
+            Collection<ClusterNode> top1 = evts.get(1);
+
+            assertNotNull(top1);
+            assertEquals(2, top1.size());
+            assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id0));
+            assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id1));
+            assertFalse(F.viewReadOnly(top1, NODE_2ID).contains(id2));
+            assertFalse(F.viewReadOnly(top1, NODE_2ID).contains(id3));
+
+            Collection<ClusterNode> top0 = evts.get(2);
+
+            assertNotNull(top0);
+            assertEquals(1, top0.size());
+            assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id0));
+            assertFalse(F.viewReadOnly(top0, NODE_2ID).contains(id1));
+            assertFalse(F.viewReadOnly(top0, NODE_2ID).contains(id2));
+            assertFalse(F.viewReadOnly(top0, NODE_2ID).contains(id3));
+        }
+        finally {
+            stopAllIgnites();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMixedSequenceEvents() throws Exception {
+        try {
+            Ignite g0 = startIgnite(0);
+
+            UUID id0 = g0.cluster().localNode().id();
+
+            final ConcurrentMap<Integer, Collection<ClusterNode>> evts = new ConcurrentHashMap<>();
+
+            g0.events().localListen(new IgnitePredicate<IgniteEvent>() {
+                private AtomicInteger cnt = new AtomicInteger();
+
+                @Override public boolean apply(IgniteEvent evt) {
+                    assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT;
+
+                    evts.put(cnt.getAndIncrement(), ((IgniteDiscoveryEvent) evt).topologyNodes());
+
+                    return true;
+                }
+            }, EVT_NODE_JOINED, EVT_NODE_LEFT);
+
+            UUID id1 = startIgnite(1).cluster().localNode().id();
+            UUID id2 = startIgnite(2).cluster().localNode().id();
+            UUID id3 = startIgnite(3).cluster().localNode().id();
+
+            stopIgnite(3);
+            stopIgnite(2);
+            stopIgnite(1);
+
+            UUID id4 = startIgnite(4).cluster().localNode().id();
+
+            stopIgnite(4);
+
+            U.sleep(100);
+
+            assertEquals("Wrong count of events received", 8, evts.size());
+
+            Collection<ClusterNode> top0 = evts.get(0);
+
+            assertNotNull(top0);
+            assertEquals(2, top0.size());
+            assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id0));
+            assertTrue(F.viewReadOnly(top0, NODE_2ID).contains(id1));
+
+            Collection<ClusterNode> top1 = evts.get(1);
+
+            assertNotNull(top1);
+            assertEquals(3, top1.size());
+            assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id0));
+            assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id1));
+            assertTrue(F.viewReadOnly(top1, NODE_2ID).contains(id2));
+
+            Collection<ClusterNode> top2 = evts.get(2);
+
+            assertNotNull(top2);
+            assertEquals(4, top2.size());
+            assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id0));
+            assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id1));
+            assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id2));
+            assertTrue(F.viewReadOnly(top2, NODE_2ID).contains(id3));
+
+            Collection<ClusterNode> top3 = evts.get(3);
+
+            assertNotNull(top3);
+            assertEquals(3, top3.size());
+            assertTrue(F.viewReadOnly(top3, NODE_2ID).contains(id0));
+            assertTrue(F.viewReadOnly(top3, NODE_2ID).contains(id1));
+            assertTrue(F.viewReadOnly(top3, NODE_2ID).contains(id2));
+            assertFalse(F.viewReadOnly(top3, NODE_2ID).contains(id3));
+
+            Collection<ClusterNode> top4 = evts.get(4);
+
+            assertNotNull(top4);
+            assertEquals(2, top4.size());
+            assertTrue(F.viewReadOnly(top4, NODE_2ID).contains(id0));
+            assertTrue(F.viewReadOnly(top4, NODE_2ID).contains(id1));
+            assertFalse(F.viewReadOnly(top4, NODE_2ID).contains(id2));
+            assertFalse(F.viewReadOnly(top4, NODE_2ID).contains(id3));
+
+            Collection<ClusterNode> top5 = evts.get(5);
+
+            assertNotNull(top5);
+            assertEquals(1, top5.size());
+            assertTrue(F.viewReadOnly(top5, NODE_2ID).contains(id0));
+            assertFalse(F.viewReadOnly(top5, NODE_2ID).contains(id1));
+            assertFalse(F.viewReadOnly(top5, NODE_2ID).contains(id2));
+            assertFalse(F.viewReadOnly(top5, NODE_2ID).contains(id3));
+
+            Collection<ClusterNode> top6 = evts.get(6);
+
+            assertNotNull(top6);
+            assertEquals(2, top6.size());
+            assertTrue(F.viewReadOnly(top6, NODE_2ID).contains(id0));
+            assertTrue(F.viewReadOnly(top6, NODE_2ID).contains(id4));
+            assertFalse(F.viewReadOnly(top6, NODE_2ID).contains(id1));
+            assertFalse(F.viewReadOnly(top6, NODE_2ID).contains(id2));
+            assertFalse(F.viewReadOnly(top6, NODE_2ID).contains(id3));
+
+            Collection<ClusterNode> top7 = evts.get(7);
+
+            assertNotNull(top7);
+            assertEquals(1, top7.size());
+            assertTrue(F.viewReadOnly(top7, NODE_2ID).contains(id0));
+            assertFalse(F.viewReadOnly(top7, NODE_2ID).contains(id1));
+            assertFalse(F.viewReadOnly(top7, NODE_2ID).contains(id2));
+            assertFalse(F.viewReadOnly(top7, NODE_2ID).contains(id3));
+            assertFalse(F.viewReadOnly(top7, NODE_2ID).contains(id4));
+        }
+        finally {
+            stopAllIgnites();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentJoinEvents() throws Exception {
+        try {
+            Ignite g0 = startIgnite(0);
+
+            UUID id0 = g0.cluster().localNode().id();
+
+            final ConcurrentMap<Integer, Collection<ClusterNode>> evts = new ConcurrentHashMap<>();
+
+            g0.events().localListen(new IgnitePredicate<IgniteEvent>() {
+                private AtomicInteger cnt = new AtomicInteger();
+
+                @Override public boolean apply(IgniteEvent evt) {
+                    assert evt.type() == EVT_NODE_JOINED;
+
+                    X.println(">>>>>>> Joined " + F.viewReadOnly(((IgniteDiscoveryEvent) evt).topologyNodes(),
+                        NODE_2ID));
+
+                    evts.put(cnt.getAndIncrement(), ((IgniteDiscoveryEvent) evt).topologyNodes());
+
+                    return true;
+                }
+            }, EVT_NODE_JOINED);
+
+            U.sleep(100);
+
+            startIgnitesMultiThreaded(1, 10);
+
+            U.sleep(100);
+
+            assertEquals(10, evts.size());
+
+            for (int i = 0; i < 10; i++) {
+                Collection<ClusterNode> snapshot = evts.get(i);
+
+                assertEquals(2 + i, snapshot.size());
+                assertTrue(F.viewReadOnly(snapshot, NODE_2ID).contains(id0));
+
+                for (ClusterNode n : snapshot)
+                    assertTrue("Wrong node order in snapshot [i=" + i + ", node=" + n + ']', n.order() <= 2 + i);
+            }
+
+            Collection<UUID> ids = F.viewReadOnly(evts.get(9), NODE_2ID);
+
+            for (int i = 1; i <= 10; i++)
+                assertTrue(ids.contains(igniteEx(i).localNode().id()));
+        }
+        finally {
+            stopAllIgnites();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDaemonNodeJoin() throws Exception {
+        try {
+            startIgnitesMultiThreaded(3);
+
+            final AtomicReference<IgniteCheckedException> err = new AtomicReference<>();
+
+            for (int i = 0; i < 3; i++) {
+                Ignite g = igniteEx(i);
+
+                g.events().localListen(new IgnitePredicate<IgniteEvent>() {
+                    @Override public boolean apply(IgniteEvent evt) {
+                        IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent) evt;
+
+                        if (discoEvt.topologyNodes().size() != 3)
+                            err.compareAndSet(null, new IgniteCheckedException("Invalid discovery event [evt=" + discoEvt +
+                                ", nodes=" + discoEvt.topologyNodes() + ']'));
+
+                        return true;
+                    }
+                }, IgniteEventType.EVT_NODE_JOINED);
+            }
+
+            daemon = true;
+
+            IgniteKernal daemon = (IgniteKernal) startIgnite(3);
+
+            IgniteDiscoveryEvent join = daemon.context().discovery().localJoinEvent();
+
+            assertEquals(3, join.topologyNodes().size());
+
+            U.sleep(100);
+
+            if (err.get() != null)
+                throw err.get();
+        }
+        finally {
+            stopAllIgnites();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6563e8a9/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoverySelfTest.java
new file mode 100644
index 0000000..6726ce2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoverySelfTest.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.lang.IgniteProductVersion.*;
+
+/**
+ *  GridDiscovery self test.
+ */
+public class IgniteDiscoverySelfTest extends IgniteCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static Ignite ignite;
+
+    /** Nodes count. */
+    private static final int NODES_CNT = 5;
+
+    /** Maximum timeout when remote nodes join/left the topology */
+    private static final int MAX_TIMEOUT_IN_MINS = 5;
+
+    /** */
+    public IgniteDiscoverySelfTest() {
+        super(/*start grid*/true);
+    }
+
+    /** {@inheritDoc}
+     * @param igniteName*/
+    @Override protected IgniteConfiguration getConfiguration(String igniteName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        //cacheCfg.setName(null);
+        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        ignite = G.ignite(getTestIgniteName());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetRemoteNodes() throws Exception {
+        Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes();
+
+        printNodes(nodes);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetAllNodes() throws Exception {
+        Collection<ClusterNode> nodes = ignite.cluster().nodes();
+
+        printNodes(nodes);
+
+        assert nodes != null;
+        assert !nodes.isEmpty();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetTopologyHash() throws Exception {
+        int hashCnt = 5000;
+
+        Random rand = new Random();
+
+        Collection<Long> hashes = new HashSet<>(hashCnt, 1.0f);
+
+        for (int i = 0; i < hashCnt; i++) {
+            // Max topology of 10 nodes.
+            int size = rand.nextInt(10) + 1;
+
+            Collection<ClusterNode> nodes = new ArrayList<>(size);
+
+            for (int j = 0; j < size; j++)
+                nodes.add(new GridDiscoveryTestNode());
+
+            @SuppressWarnings("deprecation")
+            long hash = ((IgniteKernal) ignite).context().discovery().topologyHash(nodes);
+
+            boolean isHashed = hashes.add(hash);
+
+            assert isHashed : "Duplicate hash [hash=" + hash + ", topSize=" + size + ", iteration=" + i + ']';
+        }
+
+        info("No duplicates found among '" + hashCnt + "' hashes.");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"SuspiciousMethodCalls"})
+    public void testGetLocalNode() throws Exception {
+        ClusterNode node = ignite.cluster().localNode();
+
+        assert node != null;
+
+        Collection<ClusterNode> nodes = ignite.cluster().nodes();
+
+        assert nodes != null;
+        assert nodes.contains(node);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPingNode() throws Exception {
+        ClusterNode node = ignite.cluster().localNode();
+
+        assert node != null;
+
+        boolean pingRes = ignite.cluster().pingNode(node.id());
+
+        assert pingRes : "Failed to ping local node.";
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDiscoveryListener() throws Exception {
+        ClusterNode node = ignite.cluster().localNode();
+
+        assert node != null;
+
+        final AtomicInteger cnt = new AtomicInteger();
+
+        /** Joined nodes counter. */
+        final CountDownLatch joinedCnt = new CountDownLatch(NODES_CNT);
+
+        /** Left nodes counter. */
+        final CountDownLatch leftCnt = new CountDownLatch(NODES_CNT);
+
+        IgnitePredicate<IgniteEvent> lsnr = new IgnitePredicate<IgniteEvent>() {
+            @Override public boolean apply(IgniteEvent evt) {
+                if (EVT_NODE_JOINED == evt.type()) {
+                    cnt.incrementAndGet();
+
+                    joinedCnt.countDown();
+                }
+                else if (EVT_NODE_LEFT == evt.type()) {
+                    int i = cnt.decrementAndGet();
+
+                    assert i >= 0;
+
+                    leftCnt.countDown();
+                }
+                else
+                    assert false;
+
+                return true;
+            }
+        };
+
+        ignite.events().localListen(lsnr, EVT_NODE_LEFT, EVT_NODE_JOINED);
+
+        try {
+            for (int i = 0; i < NODES_CNT; i++)
+                startIgnite(i);
+
+            joinedCnt.await(MAX_TIMEOUT_IN_MINS, MINUTES);
+
+            assert cnt.get() == NODES_CNT;
+
+            for (int i = 0; i < NODES_CNT; i++)
+                stopIgnite(i);
+
+            leftCnt.await(MAX_TIMEOUT_IN_MINS, MINUTES);
+
+            assert cnt.get() == 0;
+
+            ignite.events().stopLocalListen(lsnr);
+
+            assert cnt.get() == 0;
+        }
+        finally {
+            for (int i = 0; i < NODES_CNT; i++)
+                stopAndCancelIgnite(i);
+        }
+    }
+
+    /**
+     * Test cache nodes resolved correctly from topology history.
+     *
+     * @throws Exception In case of any exception.
+     */
+    public void testCacheNodes() throws Exception {
+        // Validate only original node is available.
+        GridDiscoveryManager discoMgr = ((IgniteKernal) ignite).context().discovery();
+
+        Collection<ClusterNode> nodes = discoMgr.allNodes();
+
+        assert nodes.size() == 1 : "Expects only original node is available: " + nodes;
+
+        final long topVer0 = discoMgr.topologyVersion();
+
+        assert topVer0 > 0 : "Unexpected initial topology version: " + topVer0;
+
+        List<UUID> uuids = new ArrayList<>(NODES_CNT);
+
+        UUID locId = ignite.cluster().localNode().id();
+
+        try {
+            // Start nodes.
+            for (int i = 0; i < NODES_CNT; i++)
+                uuids.add(startIgnite(i).cluster().localNode().id());
+
+            // Stop nodes.
+            for (int i = 0; i < NODES_CNT; i++)
+                stopIgnite(i);
+
+            final long topVer = discoMgr.topologyVersion();
+
+            assert topVer == topVer0 + NODES_CNT * 2 : "Unexpected topology version: " + topVer;
+
+            for (long ver = topVer0; ver <= topVer; ver++) {
+                Collection<UUID> exp = new ArrayList<>();
+
+                exp.add(locId);
+
+                for (int i = 0; i < NODES_CNT && i < ver - topVer0; i++)
+                    exp.add(uuids.get(i));
+
+                for (int i = 0; i < ver - topVer0 - NODES_CNT; i++)
+                    exp.remove(uuids.get(i));
+
+                // Cache nodes by topology version (e.g. NODE_CNT == 3).
+                //            0 1 2 3 (node id)
+                // 1 (topVer) +       - only local node
+                // 2          + +
+                // 3          + + +
+                // 4          + + + +
+                // 5          +   + +
+                // 6          +     +
+                // 7          +       - only local node
+
+                Collection<ClusterNode> cacheNodes = discoMgr.cacheNodes(null, ver);
+
+                Collection<UUID> act = new ArrayList<>(F.viewReadOnly(cacheNodes, new C1<ClusterNode, UUID>() {
+                    @Override public UUID apply(ClusterNode n) {
+                        return n.id();
+                    }
+                }));
+
+                assertEquals("Expects correct cache nodes for topology version: " + ver, exp, act);
+            }
+        }
+        finally {
+            for (int i = 0; i < NODES_CNT; i++)
+                stopAndCancelIgnite(i);
+        }
+    }
+
+    /**
+     * @param nodes Nodes.
+     */
+    private void printNodes(Collection<ClusterNode> nodes) {
+        StringBuilder buf = new StringBuilder();
+
+        if (nodes != null && !nodes.isEmpty()) {
+            buf.append("Found nodes [nodes={");
+
+            int i = 0;
+
+            for (Iterator<ClusterNode> iter = nodes.iterator(); iter.hasNext(); i++) {
+                ClusterNode node = iter.next();
+
+                buf.append(node.id());
+
+                if (i + 1 != nodes.size())
+                    buf.append(", ");
+            }
+
+            buf.append("}]");
+        }
+        else
+            buf.append("Found no nodes.");
+
+        if (log().isDebugEnabled())
+            log().debug(buf.toString());
+    }
+
+    /**
+     *
+     */
+    private static class GridDiscoveryTestNode extends GridMetadataAwareAdapter implements ClusterNode {
+        /** */
+        private static AtomicInteger consistentIdCtr = new AtomicInteger();
+
+        /** */
+        private UUID nodeId = UUID.randomUUID();
+
+        /** */
+        private Object consistentId = consistentIdCtr.incrementAndGet();
+
+        /** {@inheritDoc} */
+        @Override public long order() {
+            return -1;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteProductVersion version() {
+            return fromString("99.99.99");
+        }
+
+        /** {@inheritDoc} */
+        @Override public UUID id() {
+            return nodeId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object consistentId() {
+            return consistentId;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Nullable @Override public <T> T attribute(String name) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public ClusterMetrics metrics() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Map<String, Object> attributes() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<String> addresses() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isLocal() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isDaemon() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isClient() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<String> hostNames() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            return F.eqNodes(this, o);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id().hashCode();
+        }
+    }
+}


Mime
View raw message