Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4F061175F3 for ; Mon, 29 Jun 2015 19:47:14 +0000 (UTC) Received: (qmail 24656 invoked by uid 500); 29 Jun 2015 19:47:14 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 24620 invoked by uid 500); 29 Jun 2015 19:47:14 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 24611 invoked by uid 99); 29 Jun 2015 19:47:14 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Jun 2015 19:47:14 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 8968E1A6380 for ; Mon, 29 Jun 2015 19:47:13 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.791 X-Spam-Level: X-Spam-Status: No, score=0.791 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id UmiaqssEjEQA for ; Mon, 29 Jun 2015 19:46:59 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id D647725072 for ; Mon, 29 Jun 2015 19:46:46 +0000 (UTC) Received: (qmail 20284 invoked by uid 99); 29 Jun 2015 19:46:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Jun 2015 19:46:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A59D5E35D2; Mon, 29 Jun 2015 19:46:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Mon, 29 Jun 2015 19:47:02 -0000 Message-Id: <2d8f75e3cc3c4171ac7f72a68de0dbaf@git.apache.org> In-Reply-To: <283ea914128f4f1bacc3f813877bff50@git.apache.org> References: <283ea914128f4f1bacc3f813877bff50@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [17/50] incubator-ignite git commit: # minor (renaming) # minor (renaming) Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6c3d97f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6c3d97f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6c3d97f8 Branch: refs/heads/ignite-788-dev-review Commit: 6c3d97f8069c2aec66195e03b7f1062ceb91e172 Parents: fc17f07 Author: Yakov Zhdanov Authored: Thu Jun 25 17:18:33 2015 +0300 Committer: Yakov Zhdanov Committed: Thu Jun 25 17:18:33 2015 +0300 ---------------------------------------------------------------------- .../internal/ClusterGroupAbstractTest.java | 777 ++++++++++++++++++ .../ignite/internal/ClusterGroupSelfTest.java | 251 ++++++ .../internal/GridProjectionAbstractTest.java | 784 ------------------- .../ignite/internal/GridProjectionSelfTest.java | 251 ------ .../apache/ignite/internal/GridSelfTest.java | 2 +- .../ignite/testsuites/IgniteBasicTestSuite.java | 2 +- 6 files changed, 1030 insertions(+), 1037 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c3d97f8/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupAbstractTest.java new file mode 100644 index 0000000..0514ad9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupAbstractTest.java @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.EventType.*; + +/** + * Abstract test for {@link org.apache.ignite.cluster.ClusterGroup} + */ +@SuppressWarnings("deprecation") +public abstract class ClusterGroupAbstractTest extends GridCommonAbstractTest implements Externalizable { + /** VM ip finder for TCP discovery. */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Waiting timeout. */ + private static final int WAIT_TIMEOUT = 30000; + + /** Utility static variable. */ + private static final AtomicInteger cnt = new AtomicInteger(0); + + /** Mutex. */ + private static final Object mux = new Object(); + + /** Projection. */ + private ClusterGroup prj; + + /** Runnable job. */ + private IgniteRunnable runJob = new TestRunnable(); + + /** Callable job. */ + private IgniteCallable calJob = new TestCallable<>(); + + /** Closure job. */ + private IgniteClosure clrJob = new IgniteClosure() { + @Override public String apply(String s) { + return s; + } + + @Override public String toString() { + return "clrJob"; + } + }; + + /** Reducer. */ + private IgniteReducer rdc = new IgniteReducer() { + @Override public boolean collect(String e) { + return true; + } + + @Nullable @Override public Object reduce() { + return null; + } + + @Override public String toString() { + return "rdc"; + } + }; + + /** */ + protected ClusterGroupAbstractTest() { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setForceServerMode(true).setIpFinder(ipFinder)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + prj = projection(); + + cnt.set(0); + } + + /** + * @return Projection. + */ + protected abstract ClusterGroup projection(); + + /** + * @return Local node ID. + */ + @Nullable protected abstract UUID localNodeId(); + + /** + * @return Remote nodes IDs. + */ + protected Collection remoteNodeIds() { + return F.nodeIds(projection().forRemotes().nodes()); + } + + /** + * @return Projection size. + */ + private int projectionSize() { + int size = localNodeId() != null ? 1 : 0; + + size += remoteNodeIds().size(); + + assert size > 0; + + return size; + } + + /** + * @return Collection of projection node IDs. + */ + private Collection projectionNodeIds() { + Collection ids = new LinkedList<>(); + + UUID id = localNodeId(); + + if (id != null) + ids.add(id); + + ids.addAll(remoteNodeIds()); + + assert !ids.isEmpty(); + + return ids; + } + + /** + * Test for projection on not existing node IDs. + */ + public void testInvalidProjection() { + Collection ids = new HashSet<>(); + + ids.add(UUID.randomUUID()); + ids.add(UUID.randomUUID()); + + ClusterGroup invalidPrj = prj.forNodeIds(ids); + + assertEquals(0, invalidPrj.nodes().size()); + } + + /** + * @throws Exception If test failed. + */ + public void testProjection() throws Exception { + assert prj != null; + + assert prj.ignite() != null; + assert prj.predicate() != null; + + int size = projectionSize(); + + assert prj.nodes().size() == size; + + Collection nodeIds = projectionNodeIds(); + + for (ClusterNode node : prj.nodes()) + assert nodeIds.contains(node.id()); + } + + /** + * @throws Exception If test failed. + */ + public void testRemoteNodes() throws Exception { + Collection remoteNodeIds = remoteNodeIds(); + + UUID locNodeId = localNodeId(); + + int size = remoteNodeIds.size(); + + String name = "oneMoreGrid"; + + try { + Ignite g = startGrid(name); + + UUID excludedId = g.cluster().localNode().id(); + + assertEquals(size, prj.forRemotes().nodes().size()); + + for (ClusterNode node : prj.forRemotes().nodes()) { + UUID id = node.id(); + + assert !id.equals(locNodeId) && remoteNodeIds.contains(id) && !excludedId.equals(id); + } + } + finally { + stopGrid(name); + } + } + + /** + * @throws Exception If test failed. + */ + public void testRemoteProjection() throws Exception { + Collection remoteNodeIds = remoteNodeIds(); + + ClusterGroup remotePrj = projection().forRemotes(); + + Collection prjNodeIds = F.nodeIds(remotePrj.nodes()); + + assert prjNodeIds.size() == remoteNodeIds.size(); + + assert prjNodeIds.containsAll(remoteNodeIds()); + + assert !prjNodeIds.contains(localNodeId()); + + String name = "oneMoreGrid"; + + try { + Ignite g = startGrid(name); + + UUID excludedId = g.cluster().localNode().id(); + + assert !F.nodeIds(remotePrj.nodes()).contains(excludedId); + } + finally { + stopGrid(name); + } + } + + /** + * @throws Exception If test failed. + */ + public void testExecution() throws Exception { + String name = "oneMoreGrid"; + + Collection>> lsnrs = new LinkedList<>(); + + try { + final AtomicInteger cnt = new AtomicInteger(); + + Ignite g = startGrid(name); + + IgnitePredicate lsnr; + + if (!IgniteCluster.class.isAssignableFrom(projection().getClass())) { + g.events().localListen(lsnr = new IgnitePredicate() { + @Override public boolean apply(Event evt) { + assert evt.type() == EVT_JOB_STARTED; + + assert false; + + return true; + } + }, EVT_JOB_STARTED); + + lsnrs.add(F.t(g, lsnr)); + } + + for (ClusterNode node : prj.nodes()) { + g = G.ignite(node.id()); + + g.events().localListen(lsnr = new IgnitePredicate() { + @Override public boolean apply(Event evt) { + assert evt.type() == EVT_JOB_STARTED; + + synchronized (mux) { + cnt.incrementAndGet(); + + mux.notifyAll(); + } + + return true; + } + }, EVT_JOB_STARTED); + + lsnrs.add(F.t(g, lsnr)); + } + + run1(cnt); + run2(cnt); + + call1(cnt); + call2(cnt); + call3(cnt); + call4(cnt); + call5(cnt); + + forkjoin1(cnt); + forkjoin2(cnt); + + exec1(cnt); + exec2(cnt); + + executorService(cnt); + + checkActiveFutures(); + } + finally { + for (IgniteBiTuple> t : lsnrs) + t.get1().events().stopLocalListen(t.get2(), EVT_JOB_STARTED); + + stopGrid(name); + } + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void run1(AtomicInteger cnt) throws Exception { + IgniteCompute comp = compute(prj).withAsync(); + + comp.broadcast(runJob); + + ComputeTaskFuture fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + compute(prj).broadcast(runJob); + + waitForValue(cnt, projectionSize()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void run2(AtomicInteger cnt) throws Exception { + Collection jobs = F.asList(runJob); + + IgniteCompute comp = compute(prj).withAsync(); + + comp.run(jobs); + + ComputeTaskFuture fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + compute(prj).run(jobs); + + waitForValue(cnt, jobs.size()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void call1(AtomicInteger cnt) throws Exception { + IgniteCompute comp = compute(prj).withAsync(); + + comp.broadcast(calJob); + + ComputeTaskFuture fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + compute(prj).broadcast(calJob); + + waitForValue(cnt, projectionSize()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void call2(AtomicInteger cnt) throws Exception { + IgniteCompute comp = compute(prj).withAsync(); + + Collection> jobs = F.asList(calJob); + + comp.call(jobs); + + ComputeTaskFuture fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + compute(prj).call(jobs); + + waitForValue(cnt, jobs.size()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void call3(AtomicInteger cnt) throws Exception { + IgniteCompute comp = compute(prj).withAsync(); + + comp.apply(clrJob, (String) null); + + ComputeTaskFuture fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + compute(prj).apply(clrJob, (String) null); + + waitForValue(cnt, 1); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void call4(AtomicInteger cnt) throws Exception { + Collection args = F.asList("a", "b", "c"); + + IgniteCompute comp = compute(prj).withAsync(); + + comp.apply(clrJob, args); + + ComputeTaskFuture fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + compute(prj).apply(clrJob, args); + + waitForValue(cnt, args.size()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void call5(AtomicInteger cnt) throws Exception { + IgniteCompute comp = compute(prj).withAsync(); + + comp.broadcast(new TestClosure(), "arg"); + + ComputeTaskFuture> fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + Collection res = compute(prj).broadcast(new TestClosure(), "arg"); + + assertEquals(projectionSize(), res.size()); + + waitForValue(cnt, projectionSize()); + + for (String resStr : res) + assertEquals("arg", resStr); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void forkjoin1(AtomicInteger cnt) throws Exception { + Collection args = F.asList("a", "b", "c"); + + IgniteCompute comp = compute(prj).withAsync(); + + comp.apply(clrJob, args, rdc); + + ComputeTaskFuture fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + compute(prj).apply(clrJob, args, rdc); + + waitForValue(cnt, args.size()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void forkjoin2(AtomicInteger cnt) throws Exception { + Collection> jobs = F.asList(calJob); + + IgniteCompute comp = compute(prj).withAsync(); + + comp.call(jobs, rdc); + + ComputeTaskFuture fut = comp.future(); + + waitForExecution(fut); + + cnt.set(0); + + compute(prj).call(jobs, rdc); + + waitForValue(cnt, jobs.size()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void exec1(AtomicInteger cnt) throws Exception { + cnt.set(0); + + compute(prj).execute(TestTask.class.getName(), null); + + waitForValue(cnt, projectionSize()); + + cnt.set(0); + + compute(prj).execute(new TestTask(), null); + + waitForValue(cnt, projectionSize()); + + cnt.set(0); + + compute(prj).execute(TestTask.class, null); + + waitForValue(cnt, projectionSize()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void exec2(AtomicInteger cnt) throws Exception { + cnt.set(0); + + compute(prj).withTimeout(WAIT_TIMEOUT).execute(TestTask.class.getName(), null); + + waitForValue(cnt, projectionSize()); + + cnt.set(0); + + compute(prj).withTimeout(WAIT_TIMEOUT).execute(new TestTask(), null); + + waitForValue(cnt, projectionSize()); + + cnt.set(0); + + compute(prj).withTimeout(WAIT_TIMEOUT).execute(TestTask.class, null); + + waitForValue(cnt, projectionSize()); + } + + /** + * @param cnt Counter. + * @throws Exception If failed. + */ + private void executorService(AtomicInteger cnt) throws Exception { + cnt.set(0); + + ExecutorService execSrvc = prj.ignite().executorService(prj); + + Future fut = execSrvc.submit(new TestCallable() { + @Override public String call() throws Exception { + return "submit1"; + } + }); + + waitForValue(cnt, 1); + + assertEquals("submit1", fut.get()); + + cnt.set(0); + + fut = execSrvc.submit(new TestRunnable(), "submit2"); + + waitForValue(cnt, 1); + + assertEquals("submit2", fut.get()); + + cnt.set(0); + + Future runFut = execSrvc.submit(new TestRunnable()); + + waitForValue(cnt, 1); + + runFut.get(); + } + + /** + * @param fut Execution future. + * @throws InterruptedException Thrown if wait was interrupted. + */ + @SuppressWarnings({"UnconditionalWait"}) + private void waitForExecution(IgniteFuture fut) throws InterruptedException { + long sleep = 250; + + long threshold = System.currentTimeMillis() + WAIT_TIMEOUT; + + do synchronized (mux) { + mux.wait(sleep); + } + while (fut != null && !fut.isDone() && !fut.isCancelled() && threshold > System.currentTimeMillis()); + + assert fut == null || fut.isDone(); + } + + /** + * @param cnt Counter to check. + * @param val Value to check. + * @throws InterruptedException Thrown if wait was interrupted. + */ + private void waitForValue(AtomicInteger cnt, int val) throws InterruptedException { + assert cnt != null; + assert val > 0; + + long threshold = System.currentTimeMillis() + WAIT_TIMEOUT; + + long time; + + while (threshold > (time = System.currentTimeMillis())) + synchronized (mux) { + if (cnt.get() == val) + break; + + mux.wait(threshold - time); + } + + assert cnt.get() == val; + } + + /** + * @throws Exception If test failed. + */ + private void checkActiveFutures() throws Exception { + IgniteCompute comp = compute(prj).withAsync(); + + assertEquals(0, comp.activeTaskFutures().size()); + + cnt.set(0); + + Collection> futsList = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + comp.call(new TestWaitCallable<>()); + + ComputeTaskFuture fut = comp.future(); + + assertFalse(fut.isDone()); + + Map> futs = comp.activeTaskFutures(); + + assertEquals(i + 1, futs.size()); + + assertTrue(futs.containsKey(fut.getTaskSession().getId())); + + futsList.add(fut); + } + + synchronized (mux) { + cnt.incrementAndGet(); + + mux.notifyAll(); + } + + for (ComputeTaskFuture fut : futsList) + fut.get(); + + assertEquals(0, comp.activeTaskFutures().size()); + } + + /** + * Test closure. + */ + private static class TestClosure implements IgniteClosure { + /** {@inheritDoc} */ + @Override public String apply(String s) { + return s; + } + } + + /** + * Test runnable. + */ + private static class TestRunnable implements IgniteRunnable { + /** {@inheritDoc} */ + @Override public void run() { + // No-op. + } + } + + /** + * Test callable. + */ + private static class TestCallable implements IgniteCallable { + /** {@inheritDoc} */ + @Nullable @Override public T call() throws Exception { + return null; + } + } + + /** + * Test callable. + */ + private static class TestWaitCallable implements IgniteCallable { + /** {@inheritDoc} */ + @Nullable @Override public T call() throws Exception { + synchronized (mux) { + while (cnt.get() == 0) + mux.wait(); + } + + return null; + } + } + + /** + * Test task. + */ + @SuppressWarnings({"PublicInnerClass"}) + public static class TestTask extends ComputeTaskSplitAdapter { + /** {@inheritDoc} */ + @Override protected Collection split(int gridSize, String arg) { + Collection jobs = new HashSet<>(); + + for (int i = 0; i < gridSize; i++) + jobs.add(new TestJob()); + + return jobs; + } + + /** {@inheritDoc} */ + @Nullable @Override public Void reduce(List results) { + return null; + } + } + + /** + * Test job. + */ + @SuppressWarnings({"PublicInnerClass"}) + public static class TestJob extends ComputeJobAdapter { + /** {@inheritDoc} */ + @Nullable @Override public Object execute() { + return null; + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c3d97f8/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java new file mode 100644 index 0000000..ceb9bef --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +/** + * Test for {@link ClusterGroup}. + */ +@GridCommonTest(group = "Kernal Self") +public class ClusterGroupSelfTest extends ClusterGroupAbstractTest { + /** Nodes count. */ + private static final int NODES_CNT = 4; + + /** Projection node IDs. */ + private static Collection ids; + + /** */ + private static Ignite ignite; + + /** {@inheritDoc} */ + @SuppressWarnings({"ConstantConditions"}) + @Override protected void beforeTestsStarted() throws Exception { + assert NODES_CNT > 2; + + ids = new LinkedList<>(); + + try { + for (int i = 0; i < NODES_CNT; i++) { + Ignition.setClientMode(i > 1); + + Ignite g = startGrid(i); + + ids.add(g.cluster().localNode().id()); + + if (i == 0) + ignite = g; + } + } + finally { + Ignition.setClientMode(false); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + for (int i = 0; i < NODES_CNT; i++) + stopGrid(i); + } + + /** {@inheritDoc} */ + @Override protected ClusterGroup projection() { + return grid(0).cluster().forPredicate(F.nodeForNodeIds(ids)); + } + + /** {@inheritDoc} */ + @Override protected UUID localNodeId() { + return grid(0).localNode().id(); + } + + /** + * @throws Exception If failed. + */ + public void testRandom() throws Exception { + assertTrue(ignite.cluster().nodes().contains(ignite.cluster().forRandom().node())); + } + + /** + * @throws Exception If failed. + */ + public void testOldest() throws Exception { + ClusterGroup oldest = ignite.cluster().forOldest(); + + ClusterNode node = null; + + long minOrder = Long.MAX_VALUE; + + for (ClusterNode n : ignite.cluster().nodes()) { + if (n.order() < minOrder) { + node = n; + + minOrder = n.order(); + } + } + + assertEquals(oldest.node(), ignite.cluster().forNode(node).node()); + } + + /** + * @throws Exception If failed. + */ + public void testYoungest() throws Exception { + ClusterGroup youngest = ignite.cluster().forYoungest(); + + ClusterNode node = null; + + long maxOrder = Long.MIN_VALUE; + + for (ClusterNode n : ignite.cluster().nodes()) { + if (n.order() > maxOrder) { + node = n; + + maxOrder = n.order(); + } + } + + assertEquals(youngest.node(), ignite.cluster().forNode(node).node()); + } + + /** + * @throws Exception If failed. + */ + public void testNewNodes() throws Exception { + ClusterGroup youngest = ignite.cluster().forYoungest(); + ClusterGroup oldest = ignite.cluster().forOldest(); + + ClusterNode old = oldest.node(); + ClusterNode last = youngest.node(); + + assertNotNull(last); + + try (Ignite g = startGrid(NODES_CNT)) { + ClusterNode n = g.cluster().localNode(); + + ClusterNode latest = youngest.node(); + + assertNotNull(latest); + assertEquals(latest.id(), n.id()); + assertEquals(oldest.node(), old); + } + } + + /** + * @throws Exception If failed. + */ + public void testForPredicate() throws Exception { + IgnitePredicate evenP = new IgnitePredicate() { + @Override public boolean apply(ClusterNode node) { + return node.order() % 2 == 0; + } + }; + + IgnitePredicate oddP = new IgnitePredicate() { + @Override public boolean apply(ClusterNode node) { + return node.order() % 2 == 1; + } + }; + + ClusterGroup remotes = ignite.cluster().forRemotes(); + + ClusterGroup evenYoungest = remotes.forPredicate(evenP).forYoungest(); + ClusterGroup evenOldest = remotes.forPredicate(evenP).forOldest(); + + ClusterGroup oddYoungest = remotes.forPredicate(oddP).forYoungest(); + ClusterGroup oddOldest = remotes.forPredicate(oddP).forOldest(); + + int clusterSize = ignite.cluster().nodes().size(); + + assertEquals(grid(gridMaxOrder(clusterSize, true)).localNode().id(), evenYoungest.node().id()); + assertEquals(grid(1).localNode().id(), evenOldest.node().id()); + + assertEquals(grid(gridMaxOrder(clusterSize, false)).localNode().id(), oddYoungest.node().id()); + assertEquals(grid(2).localNode().id(), oddOldest.node().id()); + + try (Ignite g4 = startGrid(NODES_CNT); + Ignite g5 = startGrid(NODES_CNT + 1)) + { + clusterSize = g4.cluster().nodes().size(); + + assertEquals(grid(gridMaxOrder(clusterSize, true)).localNode().id(), evenYoungest.node().id()); + assertEquals(grid(1).localNode().id(), evenOldest.node().id()); + + assertEquals(grid(gridMaxOrder(clusterSize, false)).localNode().id(), oddYoungest.node().id()); + assertEquals(grid(2).localNode().id(), oddOldest.node().id()); + } + } + + /** + * @throws Exception If failed. + */ + public void testAgeClusterGroupSerialization() throws Exception { + Marshaller marshaller = getConfiguration().getMarshaller(); + + ClusterGroup grp = ignite.cluster().forYoungest(); + ClusterNode node = grp.node(); + + byte[] arr = marshaller.marshal(grp); + + ClusterGroup obj = marshaller.unmarshal(arr, null); + + assertEquals(node.id(), obj.node().id()); + + try (Ignite ignore = startGrid()) { + obj = marshaller.unmarshal(arr, null); + + assertEquals(grp.node().id(), obj.node().id()); + assertFalse(node.id().equals(obj.node().id())); + } + } + + /** + * @throws Exception If failed. + */ + public void testClientServer() throws Exception { + ClusterGroup srv = ignite.cluster().forServers(); + + assertEquals(2, srv.nodes().size()); + assertTrue(srv.nodes().contains(ignite(0).cluster().localNode())); + assertTrue(srv.nodes().contains(ignite(1).cluster().localNode())); + + ClusterGroup cli = ignite.cluster().forClients(); + + assertEquals(2, srv.nodes().size()); + assertTrue(cli.nodes().contains(ignite(2).cluster().localNode())); + assertTrue(cli.nodes().contains(ignite(3).cluster().localNode())); + } + + /** + * @param cnt Count. + * @param even Even. + */ + private static int gridMaxOrder(int cnt, boolean even) { + assert cnt > 2; + + cnt = cnt - (cnt % 2); + + return even ? cnt - 1 : cnt - 2; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c3d97f8/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionAbstractTest.java deleted file mode 100644 index cb1341c..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionAbstractTest.java +++ /dev/null @@ -1,784 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.events.EventType.*; - -/** - * Abstract test for {@link org.apache.ignite.cluster.ClusterGroup} - */ -@SuppressWarnings("deprecation") -public abstract class GridProjectionAbstractTest extends GridCommonAbstractTest implements Externalizable { - /** VM ip finder for TCP discovery. */ - private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** Waiting timeout. */ - private static final int WAIT_TIMEOUT = 30000; - - /** Utility static variable. */ - private static final AtomicInteger cnt = new AtomicInteger(0); - - /** Mutex. */ - private static final Object mux = new Object(); - - /** Projection. */ - private ClusterGroup prj; - - /** Runnable job. */ - private IgniteRunnable runJob = new TestRunnable(); - - /** Callable job. */ - private IgniteCallable calJob = new TestCallable<>(); - - /** Closure job. */ - private IgniteClosure clrJob = new IgniteClosure() { - @Override public String apply(String s) { - return s; - } - - @Override public String toString() { - return "clrJob"; - } - }; - - /** Reducer. */ - private IgniteReducer rdc = new IgniteReducer() { - @Override public boolean collect(String e) { - return true; - } - - @Nullable @Override public Object reduce() { - return null; - } - - @Override public String toString() { - return "rdc"; - } - }; - - /** */ - protected GridProjectionAbstractTest() { - // No-op. - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setDiscoverySpi(new TcpDiscoverySpi().setForceServerMode(true).setIpFinder(ipFinder)); - - return cfg; - } - - /** - * @param startGrid Start grid flag. - */ - protected GridProjectionAbstractTest(boolean startGrid) { - super(startGrid); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - prj = projection(); - - cnt.set(0); - } - - /** - * @return Projection. - */ - protected abstract ClusterGroup projection(); - - /** - * @return Local node ID. - */ - @Nullable protected abstract UUID localNodeId(); - - /** - * @return Remote nodes IDs. - */ - protected Collection remoteNodeIds() { - return F.nodeIds(projection().forRemotes().nodes()); - } - - /** - * @return Projection size. - */ - private int projectionSize() { - int size = localNodeId() != null ? 1 : 0; - - size += remoteNodeIds().size(); - - assert size > 0; - - return size; - } - - /** - * @return Collection of projection node IDs. - */ - private Collection projectionNodeIds() { - Collection ids = new LinkedList<>(); - - UUID id = localNodeId(); - - if (id != null) - ids.add(id); - - ids.addAll(remoteNodeIds()); - - assert !ids.isEmpty(); - - return ids; - } - - /** - * Test for projection on not existing node IDs. - */ - public void testInvalidProjection() { - Collection ids = new HashSet<>(); - - ids.add(UUID.randomUUID()); - ids.add(UUID.randomUUID()); - - ClusterGroup invalidPrj = prj.forNodeIds(ids); - - assertEquals(0, invalidPrj.nodes().size()); - } - - /** - * @throws Exception If test failed. - */ - public void testProjection() throws Exception { - assert prj != null; - - assert prj.ignite() != null; - assert prj.predicate() != null; - - int size = projectionSize(); - - assert prj.nodes().size() == size; - - Collection nodeIds = projectionNodeIds(); - - for (ClusterNode node : prj.nodes()) - assert nodeIds.contains(node.id()); - } - - /** - * @throws Exception If test failed. - */ - public void testRemoteNodes() throws Exception { - Collection remoteNodeIds = remoteNodeIds(); - - UUID locNodeId = localNodeId(); - - int size = remoteNodeIds.size(); - - String name = "oneMoreGrid"; - - try { - Ignite g = startGrid(name); - - UUID excludedId = g.cluster().localNode().id(); - - assertEquals(size, prj.forRemotes().nodes().size()); - - for (ClusterNode node : prj.forRemotes().nodes()) { - UUID id = node.id(); - - assert !id.equals(locNodeId) && remoteNodeIds.contains(id) && !excludedId.equals(id); - } - } - finally { - stopGrid(name); - } - } - - /** - * @throws Exception If test failed. - */ - public void testRemoteProjection() throws Exception { - Collection remoteNodeIds = remoteNodeIds(); - - ClusterGroup remotePrj = projection().forRemotes(); - - Collection prjNodeIds = F.nodeIds(remotePrj.nodes()); - - assert prjNodeIds.size() == remoteNodeIds.size(); - - assert prjNodeIds.containsAll(remoteNodeIds()); - - assert !prjNodeIds.contains(localNodeId()); - - String name = "oneMoreGrid"; - - try { - Ignite g = startGrid(name); - - UUID excludedId = g.cluster().localNode().id(); - - assert !F.nodeIds(remotePrj.nodes()).contains(excludedId); - } - finally { - stopGrid(name); - } - } - - /** - * @throws Exception If test failed. - */ - public void testExecution() throws Exception { - String name = "oneMoreGrid"; - - Collection>> lsnrs = new LinkedList<>(); - - try { - final AtomicInteger cnt = new AtomicInteger(); - - Ignite g = startGrid(name); - - IgnitePredicate lsnr; - - if (!IgniteCluster.class.isAssignableFrom(projection().getClass())) { - g.events().localListen(lsnr = new IgnitePredicate() { - @Override public boolean apply(Event evt) { - assert evt.type() == EVT_JOB_STARTED; - - assert false; - - return true; - } - }, EVT_JOB_STARTED); - - lsnrs.add(F.t(g, lsnr)); - } - - for (ClusterNode node : prj.nodes()) { - g = G.ignite(node.id()); - - g.events().localListen(lsnr = new IgnitePredicate() { - @Override public boolean apply(Event evt) { - assert evt.type() == EVT_JOB_STARTED; - - synchronized (mux) { - cnt.incrementAndGet(); - - mux.notifyAll(); - } - - return true; - } - }, EVT_JOB_STARTED); - - lsnrs.add(F.t(g, lsnr)); - } - - run1(cnt); - run2(cnt); - - call1(cnt); - call2(cnt); - call3(cnt); - call4(cnt); - call5(cnt); - - forkjoin1(cnt); - forkjoin2(cnt); - - exec1(cnt); - exec2(cnt); - - executorService(cnt); - - checkActiveFutures(); - } - finally { - for (IgniteBiTuple> t : lsnrs) - t.get1().events().stopLocalListen(t.get2(), EVT_JOB_STARTED); - - stopGrid(name); - } - } - - /** - * @param cnt Counter. - * @throws Exception If failed. - */ - private void run1(AtomicInteger cnt) throws Exception { - IgniteCompute comp = compute(prj).withAsync(); - - comp.broadcast(runJob); - - ComputeTaskFuture fut = comp.future(); - - waitForExecution(fut); - - cnt.set(0); - - compute(prj).broadcast(runJob); - - waitForValue(cnt, projectionSize()); - } - - /** - * @param cnt Counter. - * @throws Exception If failed. - */ - private void run2(AtomicInteger cnt) throws Exception { - Collection jobs = F.asList(runJob); - - IgniteCompute comp = compute(prj).withAsync(); - - comp.run(jobs); - - ComputeTaskFuture fut = comp.future(); - - waitForExecution(fut); - - cnt.set(0); - - compute(prj).run(jobs); - - waitForValue(cnt, jobs.size()); - } - - /** - * @param cnt Counter. - * @throws Exception If failed. - */ - private void call1(AtomicInteger cnt) throws Exception { - IgniteCompute comp = compute(prj).withAsync(); - - comp.broadcast(calJob); - - ComputeTaskFuture fut = comp.future(); - - waitForExecution(fut); - - cnt.set(0); - - compute(prj).broadcast(calJob); - - waitForValue(cnt, projectionSize()); - } - - /** - * @param cnt Counter. - * @throws Exception If failed. - */ - private void call2(AtomicInteger cnt) throws Exception { - IgniteCompute comp = compute(prj).withAsync(); - - Collection> jobs = F.asList(calJob); - - comp.call(jobs); - - ComputeTaskFuture fut = comp.future(); - - waitForExecution(fut); - - cnt.set(0); - - compute(prj).call(jobs); - - waitForValue(cnt, jobs.size()); - } - - /** - * @param cnt Counter. - * @throws Exception If failed. - */ - private void call3(AtomicInteger cnt) throws Exception { - IgniteCompute comp = compute(prj).withAsync(); - - comp.apply(clrJob, (String) null); - - ComputeTaskFuture fut = comp.future(); - - waitForExecution(fut); - - cnt.set(0); - - compute(prj).apply(clrJob, (String) null); - - waitForValue(cnt, 1); - } - - /** - * @param cnt Counter. - * @throws Exception If failed. - */ - private void call4(AtomicInteger cnt) throws Exception { - Collection args = F.asList("a", "b", "c"); - - IgniteCompute comp = compute(prj).withAsync(); - - comp.apply(clrJob, args); - - ComputeTaskFuture fut = comp.future(); - - waitForExecution(fut); - - cnt.set(0); - - compute(prj).apply(clrJob, args); - - waitForValue(cnt, args.size()); - } - - /** - * @param cnt Counter. - * @throws Exception If failed. - */ - private void call5(AtomicInteger cnt) throws Exception { - IgniteCompute comp = compute(prj).withAsync(); - - comp.broadcast(new TestClosure(), "arg"); - - ComputeTaskFuture> fut = comp.future(); - - waitForExecution(fut); - - cnt.set(0); - - Collection res = compute(prj).broadcast(new TestClosure(), "arg"); - - assertEquals(projectionSize(), res.size()); - - waitForValue(cnt, projectionSize()); - - for (String resStr : res) - assertEquals("arg", resStr); - } - - /** - * @param cnt Counter. - * @throws Exception If failed. - */ - private void forkjoin1(AtomicInteger cnt) throws Exception { - Collection args = F.asList("a", "b", "c"); - - IgniteCompute comp = compute(prj).withAsync(); - - comp.apply(clrJob, args, rdc); - - ComputeTaskFuture fut = comp.future(); - - waitForExecution(fut); - - cnt.set(0); - - compute(prj).apply(clrJob, args, rdc); - - waitForValue(cnt, args.size()); - } - - /** - * @param cnt Counter. - * @throws Exception If failed. - */ - private void forkjoin2(AtomicInteger cnt) throws Exception { - Collection> jobs = F.asList(calJob); - - IgniteCompute comp = compute(prj).withAsync(); - - comp.call(jobs, rdc); - - ComputeTaskFuture fut = comp.future(); - - waitForExecution(fut); - - cnt.set(0); - - compute(prj).call(jobs, rdc); - - waitForValue(cnt, jobs.size()); - } - - /** - * @param cnt Counter. - * @throws Exception If failed. - */ - private void exec1(AtomicInteger cnt) throws Exception { - cnt.set(0); - - compute(prj).execute(TestTask.class.getName(), null); - - waitForValue(cnt, projectionSize()); - - cnt.set(0); - - compute(prj).execute(new TestTask(), null); - - waitForValue(cnt, projectionSize()); - - cnt.set(0); - - compute(prj).execute(TestTask.class, null); - - waitForValue(cnt, projectionSize()); - } - - /** - * @param cnt Counter. - * @throws Exception If failed. - */ - private void exec2(AtomicInteger cnt) throws Exception { - cnt.set(0); - - compute(prj).withTimeout(WAIT_TIMEOUT).execute(TestTask.class.getName(), null); - - waitForValue(cnt, projectionSize()); - - cnt.set(0); - - compute(prj).withTimeout(WAIT_TIMEOUT).execute(new TestTask(), null); - - waitForValue(cnt, projectionSize()); - - cnt.set(0); - - compute(prj).withTimeout(WAIT_TIMEOUT).execute(TestTask.class, null); - - waitForValue(cnt, projectionSize()); - } - - /** - * @param cnt Counter. - * @throws Exception If failed. - */ - private void executorService(AtomicInteger cnt) throws Exception { - cnt.set(0); - - ExecutorService execSrvc = prj.ignite().executorService(prj); - - Future fut = execSrvc.submit(new TestCallable() { - @Override public String call() throws Exception { - return "submit1"; - } - }); - - waitForValue(cnt, 1); - - assertEquals("submit1", fut.get()); - - cnt.set(0); - - fut = execSrvc.submit(new TestRunnable(), "submit2"); - - waitForValue(cnt, 1); - - assertEquals("submit2", fut.get()); - - cnt.set(0); - - Future runFut = execSrvc.submit(new TestRunnable()); - - waitForValue(cnt, 1); - - runFut.get(); - } - - /** - * @param fut Execution future. - * @throws InterruptedException Thrown if wait was interrupted. - */ - @SuppressWarnings({"UnconditionalWait"}) - private void waitForExecution(IgniteFuture fut) throws InterruptedException { - long sleep = 250; - - long threshold = System.currentTimeMillis() + WAIT_TIMEOUT; - - do synchronized (mux) { - mux.wait(sleep); - } - while (fut != null && !fut.isDone() && !fut.isCancelled() && threshold > System.currentTimeMillis()); - - assert fut == null || fut.isDone(); - } - - /** - * @param cnt Counter to check. - * @param val Value to check. - * @throws InterruptedException Thrown if wait was interrupted. - */ - private void waitForValue(AtomicInteger cnt, int val) throws InterruptedException { - assert cnt != null; - assert val > 0; - - long threshold = System.currentTimeMillis() + WAIT_TIMEOUT; - - long time; - - while (threshold > (time = System.currentTimeMillis())) - synchronized (mux) { - if (cnt.get() == val) - break; - - mux.wait(threshold - time); - } - - assert cnt.get() == val; - } - - /** - * @throws Exception If test failed. - */ - private void checkActiveFutures() throws Exception { - IgniteCompute comp = compute(prj).withAsync(); - - assertEquals(0, comp.activeTaskFutures().size()); - - cnt.set(0); - - Collection> futsList = new ArrayList<>(); - - for (int i = 0; i < 10; i++) { - comp.call(new TestWaitCallable<>()); - - ComputeTaskFuture fut = comp.future(); - - assertFalse(fut.isDone()); - - Map> futs = comp.activeTaskFutures(); - - assertEquals(i + 1, futs.size()); - - assertTrue(futs.containsKey(fut.getTaskSession().getId())); - - futsList.add(fut); - } - - synchronized (mux) { - cnt.incrementAndGet(); - - mux.notifyAll(); - } - - for (ComputeTaskFuture fut : futsList) - fut.get(); - - assertEquals(0, comp.activeTaskFutures().size()); - } - - /** - * Test closure. - */ - private static class TestClosure implements IgniteClosure { - /** {@inheritDoc} */ - @Override public String apply(String s) { - return s; - } - } - - /** - * Test runnable. - */ - private static class TestRunnable implements IgniteRunnable { - /** {@inheritDoc} */ - @Override public void run() { - // No-op. - } - } - - /** - * Test callable. - */ - private static class TestCallable implements IgniteCallable { - /** {@inheritDoc} */ - @Nullable @Override public T call() throws Exception { - return null; - } - } - - /** - * Test callable. - */ - private static class TestWaitCallable implements IgniteCallable { - /** {@inheritDoc} */ - @Nullable @Override public T call() throws Exception { - synchronized (mux) { - while (cnt.get() == 0) - mux.wait(); - } - - return null; - } - } - - /** - * Test task. - */ - @SuppressWarnings({"PublicInnerClass"}) - public static class TestTask extends ComputeTaskSplitAdapter { - /** {@inheritDoc} */ - @Override protected Collection split(int gridSize, String arg) { - Collection jobs = new HashSet<>(); - - for (int i = 0; i < gridSize; i++) - jobs.add(new TestJob()); - - return jobs; - } - - /** {@inheritDoc} */ - @Nullable @Override public Void reduce(List results) { - return null; - } - } - - /** - * Test job. - */ - @SuppressWarnings({"PublicInnerClass"}) - public static class TestJob extends ComputeJobAdapter { - /** {@inheritDoc} */ - @Nullable @Override public Object execute() { - return null; - } - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c3d97f8/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionSelfTest.java deleted file mode 100644 index 9fbad80..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionSelfTest.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.*; - -/** - * Test for {@link ClusterGroup}. - */ -@GridCommonTest(group = "Kernal Self") -public class GridProjectionSelfTest extends GridProjectionAbstractTest { - /** Nodes count. */ - private static final int NODES_CNT = 4; - - /** Projection node IDs. */ - private static Collection ids; - - /** */ - private static Ignite ignite; - - /** {@inheritDoc} */ - @SuppressWarnings({"ConstantConditions"}) - @Override protected void beforeTestsStarted() throws Exception { - assert NODES_CNT > 2; - - ids = new LinkedList<>(); - - try { - for (int i = 0; i < NODES_CNT; i++) { - Ignition.setClientMode(i > 1); - - Ignite g = startGrid(i); - - ids.add(g.cluster().localNode().id()); - - if (i == 0) - ignite = g; - } - } - finally { - Ignition.setClientMode(false); - } - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - for (int i = 0; i < NODES_CNT; i++) - stopGrid(i); - } - - /** {@inheritDoc} */ - @Override protected ClusterGroup projection() { - return grid(0).cluster().forPredicate(F.nodeForNodeIds(ids)); - } - - /** {@inheritDoc} */ - @Override protected UUID localNodeId() { - return grid(0).localNode().id(); - } - - /** - * @throws Exception If failed. - */ - public void testRandom() throws Exception { - assertTrue(ignite.cluster().nodes().contains(ignite.cluster().forRandom().node())); - } - - /** - * @throws Exception If failed. - */ - public void testOldest() throws Exception { - ClusterGroup oldest = ignite.cluster().forOldest(); - - ClusterNode node = null; - - long minOrder = Long.MAX_VALUE; - - for (ClusterNode n : ignite.cluster().nodes()) { - if (n.order() < minOrder) { - node = n; - - minOrder = n.order(); - } - } - - assertEquals(oldest.node(), ignite.cluster().forNode(node).node()); - } - - /** - * @throws Exception If failed. - */ - public void testYoungest() throws Exception { - ClusterGroup youngest = ignite.cluster().forYoungest(); - - ClusterNode node = null; - - long maxOrder = Long.MIN_VALUE; - - for (ClusterNode n : ignite.cluster().nodes()) { - if (n.order() > maxOrder) { - node = n; - - maxOrder = n.order(); - } - } - - assertEquals(youngest.node(), ignite.cluster().forNode(node).node()); - } - - /** - * @throws Exception If failed. - */ - public void testNewNodes() throws Exception { - ClusterGroup youngest = ignite.cluster().forYoungest(); - ClusterGroup oldest = ignite.cluster().forOldest(); - - ClusterNode old = oldest.node(); - ClusterNode last = youngest.node(); - - assertNotNull(last); - - try (Ignite g = startGrid(NODES_CNT)) { - ClusterNode n = g.cluster().localNode(); - - ClusterNode latest = youngest.node(); - - assertNotNull(latest); - assertEquals(latest.id(), n.id()); - assertEquals(oldest.node(), old); - } - } - - /** - * @throws Exception If failed. - */ - public void testForPredicate() throws Exception { - IgnitePredicate evenP = new IgnitePredicate() { - @Override public boolean apply(ClusterNode node) { - return node.order() % 2 == 0; - } - }; - - IgnitePredicate oddP = new IgnitePredicate() { - @Override public boolean apply(ClusterNode node) { - return node.order() % 2 == 1; - } - }; - - ClusterGroup remotes = ignite.cluster().forRemotes(); - - ClusterGroup evenYoungest = remotes.forPredicate(evenP).forYoungest(); - ClusterGroup evenOldest = remotes.forPredicate(evenP).forOldest(); - - ClusterGroup oddYoungest = remotes.forPredicate(oddP).forYoungest(); - ClusterGroup oddOldest = remotes.forPredicate(oddP).forOldest(); - - int clusterSize = ignite.cluster().nodes().size(); - - assertEquals(grid(gridMaxOrder(clusterSize, true)).localNode().id(), evenYoungest.node().id()); - assertEquals(grid(1).localNode().id(), evenOldest.node().id()); - - assertEquals(grid(gridMaxOrder(clusterSize, false)).localNode().id(), oddYoungest.node().id()); - assertEquals(grid(2).localNode().id(), oddOldest.node().id()); - - try (Ignite g4 = startGrid(NODES_CNT); - Ignite g5 = startGrid(NODES_CNT + 1)) - { - clusterSize = g4.cluster().nodes().size(); - - assertEquals(grid(gridMaxOrder(clusterSize, true)).localNode().id(), evenYoungest.node().id()); - assertEquals(grid(1).localNode().id(), evenOldest.node().id()); - - assertEquals(grid(gridMaxOrder(clusterSize, false)).localNode().id(), oddYoungest.node().id()); - assertEquals(grid(2).localNode().id(), oddOldest.node().id()); - } - } - - /** - * @throws Exception If failed. - */ - public void testAgeClusterGroupSerialization() throws Exception { - Marshaller marshaller = getConfiguration().getMarshaller(); - - ClusterGroup grp = ignite.cluster().forYoungest(); - ClusterNode node = grp.node(); - - byte[] arr = marshaller.marshal(grp); - - ClusterGroup obj = marshaller.unmarshal(arr, null); - - assertEquals(node.id(), obj.node().id()); - - try (Ignite ignore = startGrid()) { - obj = marshaller.unmarshal(arr, null); - - assertEquals(grp.node().id(), obj.node().id()); - assertFalse(node.id().equals(obj.node().id())); - } - } - - /** - * @throws Exception If failed. - */ - public void testClientServer() throws Exception { - ClusterGroup srv = ignite.cluster().forServers(); - - assertEquals(2, srv.nodes().size()); - assertTrue(srv.nodes().contains(ignite(0).cluster().localNode())); - assertTrue(srv.nodes().contains(ignite(1).cluster().localNode())); - - ClusterGroup cli = ignite.cluster().forClients(); - - assertEquals(2, srv.nodes().size()); - assertTrue(cli.nodes().contains(ignite(2).cluster().localNode())); - assertTrue(cli.nodes().contains(ignite(3).cluster().localNode())); - } - - /** - * @param cnt Count. - * @param even Even. - */ - private static int gridMaxOrder(int cnt, boolean even) { - assert cnt > 2; - - cnt = cnt - (cnt % 2); - - return even ? cnt - 1 : cnt - 2; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c3d97f8/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java index 2de04b0..3ec3278 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java @@ -32,7 +32,7 @@ import java.util.concurrent.atomic.*; * Test for {@link org.apache.ignite.IgniteCluster}. */ @GridCommonTest(group = "Kernal Self") -public class GridSelfTest extends GridProjectionAbstractTest { +public class GridSelfTest extends ClusterGroupAbstractTest { /** Nodes count. */ private static final int NODES_CNT = 4; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6c3d97f8/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 2d14728..3da198c 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -65,7 +65,7 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTest(IgniteStreamSelfTestSuite.suite()); suite.addTest(new TestSuite(GridSelfTest.class)); - GridTestUtils.addTestIfNeeded(suite, GridProjectionSelfTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, ClusterGroupSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridMessagingSelfTest.class, ignoredTests); suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class)); GridTestUtils.addTestIfNeeded(suite, GridMessagingNoPeerClassLoadingSelfTest.class, ignoredTests);