ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/3] incubator-ignite git commit: # ignite-981 fixed wait for cache initialization on clients
Date Thu, 04 Jun 2015 12:07:24 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-sprint-5 a03d111f2 -> 20e567773


# ignite-981 fixed wait for cache initialization on clients


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ddcb2a3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ddcb2a3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ddcb2a3f

Branch: refs/heads/ignite-sprint-5
Commit: ddcb2a3f6932fe8d3f86d3e1c16a3c4a4610959f
Parents: 1603fe5
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Jun 4 09:25:42 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Jun 4 11:50:36 2015 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 117 +++++++------
 .../dht/preloader/GridDhtPreloader.java         |   2 +-
 .../IgniteMessagingWithClientTest.java          | 164 +++++++++++++++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java |   1 +
 4 files changed, 232 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 6e8d457..4382731 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1722,68 +1722,83 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 return;
             }
 
-            Object msgBody = ioMsg.body();
-
-            assert msgBody != null || ioMsg.bodyBytes() != null;
+            busyLock.readLock();
 
             try {
-                byte[] msgTopicBytes = ioMsg.topicBytes();
-
-                Object msgTopic = ioMsg.topic();
-
-                GridDeployment dep = ioMsg.deployment();
-
-                if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
-                    ioMsg.deploymentClassName() != null) {
-                    dep = ctx.deploy().getGlobalDeployment(
-                        ioMsg.deploymentMode(),
-                        ioMsg.deploymentClassName(),
-                        ioMsg.deploymentClassName(),
-                        ioMsg.userVersion(),
-                        nodeId,
-                        ioMsg.classLoaderId(),
-                        ioMsg.loaderParticipants(),
-                        null);
-
-                    if (dep == null)
-                        throw new IgniteDeploymentCheckedException(
-                            "Failed to obtain deployment information for user message. "
+
-                            "If you are using custom message or topic class, try implementing
" +
-                            "GridPeerDeployAware interface. [msg=" + ioMsg + ']');
-
-                    ioMsg.deployment(dep); // Cache deployment.
+                if (stopping) {
+                    if (log.isDebugEnabled())
+                        log.debug("Received user message while stopping (will ignore) [nodeId="
+
+                            nodeId + ", msg=" + msg + ']');
+
+                    return;
                 }
 
-                // Unmarshall message topic if needed.
-                if (msgTopic == null && msgTopicBytes != null) {
-                    msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader()
: null);
+                Object msgBody = ioMsg.body();
 
-                    ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings.
-                }
+                assert msgBody != null || ioMsg.bodyBytes() != null;
 
-                if (!F.eq(topic, msgTopic))
-                    return;
+                try {
+                    byte[] msgTopicBytes = ioMsg.topicBytes();
+
+                    Object msgTopic = ioMsg.topic();
+
+                    GridDeployment dep = ioMsg.deployment();
+
+                    if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
+                        ioMsg.deploymentClassName() != null) {
+                        dep = ctx.deploy().getGlobalDeployment(
+                            ioMsg.deploymentMode(),
+                            ioMsg.deploymentClassName(),
+                            ioMsg.deploymentClassName(),
+                            ioMsg.userVersion(),
+                            nodeId,
+                            ioMsg.classLoaderId(),
+                            ioMsg.loaderParticipants(),
+                            null);
+
+                        if (dep == null)
+                            throw new IgniteDeploymentCheckedException(
+                                "Failed to obtain deployment information for user message.
" +
+                                    "If you are using custom message or topic class, try
implementing " +
+                                    "GridPeerDeployAware interface. [msg=" + ioMsg + ']');
+
+                        ioMsg.deployment(dep); // Cache deployment.
+                    }
 
-                if (msgBody == null) {
-                    msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader()
: null);
+                    // Unmarshall message topic if needed.
+                    if (msgTopic == null && msgTopicBytes != null) {
+                        msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader()
: null);
 
-                    ioMsg.body(msgBody); // Save body to avoid future unmarshallings.
-                }
+                        ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings.
+                    }
 
-                // Resource injection.
-                if (dep != null)
-                    ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()),
msgBody);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message="
+
-                    msg + ']', e);
-            }
+                    if (!F.eq(topic, msgTopic))
+                        return;
+
+                    if (msgBody == null) {
+                        msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader()
: null);
+
+                        ioMsg.body(msgBody); // Save body to avoid future unmarshallings.
+                    }
 
-            if (msgBody != null) {
-                if (predLsnr != null) {
-                    if (!predLsnr.apply(nodeId, msgBody))
-                        removeMessageListener(TOPIC_COMM_USER, this);
+                    // Resource injection.
+                    if (dep != null)
+                        ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()),
msgBody);
                 }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to unmarshal user message [node=" + nodeId + ",
message=" +
+                        msg + ']', e);
+                }
+
+                if (msgBody != null) {
+                    if (predLsnr != null) {
+                        if (!predLsnr.apply(nodeId, msgBody))
+                            removeMessageListener(TOPIC_COMM_USER, this);
+                    }
+                }
+            }
+            finally {
+                busyLock.readUnlock();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 1aef18c..51010ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -274,7 +274,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> syncFuture() {
-        return demandPool.syncFuture();
+        return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
new file mode 100644
index 0000000..855a4f7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.messaging;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class IgniteMessagingWithClientTest extends GridCommonAbstractTest implements Serializable
{
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Message topic. */
+    private enum TOPIC {
+        /** */
+        ORDERED
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setMarshaller(new OptimizedMarshaller(false));
+
+        if (gridName.equals(getTestGridName(2))) {
+            cfg.setClientMode(true);
+
+            ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
+        }
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMessageSendWithClientJoin() throws Exception {
+        startGrid(0);
+
+        Ignite ignite1 = startGrid(1);
+
+        ClusterGroup rmts = ignite1.cluster().forRemotes();
+
+        IgniteMessaging msg = ignite1.message(rmts);
+
+        msg.localListen(TOPIC.ORDERED, new LocalListener());
+
+        msg.remoteListen(TOPIC.ORDERED, new RemoteListener());
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>()
{
+            @Override public Object call() throws Exception {
+                int iter = 0;
+
+                while (!stop.get()) {
+                    if (iter % 10 == 0)
+                        log.info("Client start/stop iteration: " + iter);
+
+                    iter++;
+
+                    try (Ignite ignite = startGrid(2)) {
+                        assertTrue(ignite.configuration().isClientMode());
+                    }
+                }
+
+                return null;
+            }
+        }, 1, "client-start-stop");
+
+        try {
+            long stopTime = U.currentTimeMillis() + 30_000;
+
+            int iter = 0;
+
+            while (System.currentTimeMillis() < stopTime) {
+                try {
+                    ignite1.message(rmts).sendOrdered(TOPIC.ORDERED, Integer.toString(iter),
0);
+                }
+                catch (IgniteException e) {
+                    log.info("Message send failed: " + e);
+                }
+
+                iter++;
+
+                if (iter % 100 == 0)
+                    Thread.sleep(5);
+            }
+        }
+        finally {
+            stop.set(true);
+        }
+
+        fut.get();
+    }
+
+    /**
+     *
+     */
+    private static class LocalListener implements IgniteBiPredicate<UUID, String> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID uuid, String s) {
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class RemoteListener implements IgniteBiPredicate<UUID, String>
{
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID nodeId, String msg) {
+            ignite.message(ignite.cluster().forNodeId(nodeId)).send(TOPIC.ORDERED, msg);
+
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 9eb31f1..e0a1e6e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -53,6 +53,7 @@ public class IgniteBasicTestSuite extends TestSuite {
         suite.addTest(new TestSuite(GridSelfTest.class));
         suite.addTest(new TestSuite(GridProjectionSelfTest.class));
         suite.addTest(new TestSuite(GridMessagingSelfTest.class));
+        suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class));
         suite.addTest(new TestSuite(GridMessagingNoPeerClassLoadingSelfTest.class));
 
         if (U.isLinux() || U.isMacOs())


Mime
View raw message