ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sevdoki...@apache.org
Subject incubator-ignite git commit: # IGNITE-499 Implement Client communication.
Date Mon, 20 Apr 2015 20:05:29 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-499_1 4ad742b8f -> b161f1c45


# IGNITE-499 Implement Client communication.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b161f1c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b161f1c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b161f1c4

Branch: refs/heads/ignite-499_1
Commit: b161f1c45e7be24d8e0d3e8c036b5c8391c23ccd
Parents: 4ad742b
Author: sevdokimov <sergey.evdokimov@jetbrains.com>
Authored: Mon Apr 20 23:05:20 2015 +0300
Committer: sevdokimov <sergey.evdokimov@jetbrains.com>
Committed: Mon Apr 20 23:05:20 2015 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   5 +
 .../tcp/TcpClientMessageWrapper.java            | 176 +++++++++++++++++++
 .../communication/tcp/TcpCommunicationSpi.java  |  83 ++++++++-
 .../tcp/ClientTcpCommunicationSelfTest.java     | 113 ++++++++++++
 .../IgniteSpiCommunicationSelfTestSuite.java    |   2 +
 5 files changed, 377 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b161f1c4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index b82147b..d64ff07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -595,6 +595,11 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case 112:
+                msg = new TcpClientMessageWrapper();
+
+                break;
+
             default:
                 if (ext != null) {
                     for (MessageFactory factory : ext) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b161f1c4/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpClientMessageWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpClientMessageWrapper.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpClientMessageWrapper.java
new file mode 100644
index 0000000..66d23dd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpClientMessageWrapper.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ *
+ */
+public class TcpClientMessageWrapper implements Message {
+    /** */
+    private Message msg;
+
+    /** */
+    private UUID dest;
+
+    /** */
+    private UUID snd;
+
+    /**
+     * Empty constructor required by {@link Message}.
+     */
+    public TcpClientMessageWrapper() {
+        // No-op.
+    }
+
+    /**
+     * @param msg Message.
+     * @param dest Destination.
+     */
+    public TcpClientMessageWrapper(Message msg, UUID snd, UUID dest) {
+        this.msg = msg;
+        this.snd = snd;
+        this.dest = dest;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeMessage("msg", msg))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeUuid("sender", snd))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeUuid("dest", dest))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                msg = reader.readMessage("msg");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                snd = reader.readUuid("src");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                dest = reader.readUuid("dest");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 112;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+
+    /**
+     * @return Message.
+     */
+    public Message message() {
+        return msg;
+    }
+
+    /**
+     * @param msg New message.
+     */
+    public void message(Message msg) {
+        this.msg = msg;
+    }
+
+    /**
+     * @return Destination.
+     */
+    public UUID destination() {
+        return dest;
+    }
+
+    /**
+     * @param dest New destination.
+     */
+    public void destination(UUID dest) {
+        this.dest = dest;
+    }
+
+    /**
+     * @return Source.
+     */
+    public UUID sender() {
+        return snd;
+    }
+
+    /**
+     * @param snd New sender.
+     */
+    public void sender(UUID snd) {
+        this.snd = snd;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b161f1c4/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 36bd03e..44c6b73 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -34,6 +34,7 @@ import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.communication.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
@@ -150,6 +151,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Node attribute that is mapped to node's external addresses (value is <tt>comm.tcp.ext-addrs</tt>).
*/
     public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs";
 
+    /** Node attribute means that all messages should be send through router node
+     * (value is <tt>comm.tcp.connect.throw.router</tt>).
+     * @see org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpi
+     */
+    public static final String ATTR_CONNECT_TO_ROUTER_ONLY = "comm.tcp.connect.through.router";
+
     /** Default port which node sets listener to (value is <tt>47100</tt>). */
     public static final int DFLT_PORT = 47100;
 
@@ -322,6 +329,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Local node ID message. */
     private NodeIdMessage nodeIdMsg;
 
+    /** All messages shoult be sent through router node, not a directly. */
+    private boolean connectToRouterOnly;
+
     /** Received messages count. */
     private final LongAdder8 rcvdMsgsCnt = new LongAdder8();
 
@@ -864,11 +874,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null :
                 U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())),
boundTcpPort);
 
-            return F.asMap(
+            Map<String, Object> res = F.asMap(
                 createSpiAttributeName(ATTR_ADDRS), addrs.get1(),
                 createSpiAttributeName(ATTR_HOST_NAMES), addrs.get2(),
                 createSpiAttributeName(ATTR_PORT), boundTcpPort,
                 createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
+
+            if (connectToRouterOnly)
+                res.put(createSpiAttributeName(ATTR_CONNECT_TO_ROUTER_ONLY), connectToRouterOnly);
+
+            return res;
         }
         catch (IOException | IgniteCheckedException e) {
             throw new IgniteSpiException("Failed to resolve local host to addresses: " +
locHost, e);
@@ -1180,6 +1195,41 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (node.id().equals(locNodeId))
             notifyListener(locNodeId, msg, NOOP);
         else {
+            ClusterNode locNode = getSpiContext().localNode();
+
+            if (connectToRouterOnly) {
+                if (locNode instanceof TcpDiscoveryNode) {
+                    UUID routerNodeId = ((TcpDiscoveryNode)locNode).clientRouterNodeId();
+
+                    if (routerNodeId != null && !routerNodeId.equals(node.id()))
{
+                        ClusterNode routerNode = getSpiContext().node(routerNodeId);
+
+                        if (routerNode != null) {
+                            sendMessage(routerNode, new TcpClientMessageWrapper(msg, locNodeId,
node.id()));
+
+                            return;
+                        }
+                    }
+                }
+            }
+
+            if (Boolean.TRUE.equals(node.<Boolean>attribute(ATTR_CONNECT_TO_ROUTER_ONLY)))
{
+                UUID routerNodeId = ((TcpDiscoveryNode)node).clientRouterNodeId();
+
+                if (routerNodeId != null) {
+                    ClusterNode routerNode = getSpiContext().node(routerNodeId);
+
+                    if (routerNode != null) {
+                        if (msg instanceof TcpClientMessageWrapper)
+                            sendMessage(routerNode, msg);
+                        else
+                            sendMessage(routerNode, new TcpClientMessageWrapper(msg, locNodeId,
node.id()));
+
+                        return;
+                    }
+                }
+            }
+
             GridCommunicationClient client = null;
 
             try {
@@ -1190,7 +1240,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     UUID nodeId = null;
 
-                    if (!client.async() && !getSpiContext().localNode().version().equals(node.version()))
+                    if (!client.async() && !locNode.version().equals(node.version()))
                         nodeId = node.id();
 
                     retry = client.sendMessage(nodeId, msg);
@@ -1728,6 +1778,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         getExceptionRegistry().onException(msg, e);
     }
 
+    /**
+     * @return Use router.
+     */
+    public boolean useRouter() {
+        return connectToRouterOnly;
+    }
+
+    /**
+     * @param useRouter New use router.
+     */
+    public void useRouter(boolean useRouter) {
+        this.connectToRouterOnly = useRouter;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpCommunicationSpi.class, this);
@@ -2771,6 +2835,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 else
                     c = NOOP;
 
+                if (msg instanceof TcpClientMessageWrapper) {
+                    TcpClientMessageWrapper clientMsg = (TcpClientMessageWrapper)msg;
+
+                    if (getLocalNodeId().equals(clientMsg.destination()))
+                        notifyListener(clientMsg.sender(), clientMsg.message(), c);
+                    else {
+                        ClusterNode destNode = getSpiContext().node(clientMsg.destination());
+
+                        if (destNode != null)
+                            sendMessage(destNode, clientMsg);
+                    }
+
+                    return;
+                }
+
                 notifyListener(sndId, msg, c);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b161f1c4/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/ClientTcpCommunicationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/ClientTcpCommunicationSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/ClientTcpCommunicationSelfTest.java
new file mode 100644
index 0000000..794d846
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/ClientTcpCommunicationSelfTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.testframework.junits.spi.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
+public class ClientTcpCommunicationSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (client) {
+            TcpCommunicationSpi spi = new TcpCommunicationSpi();
+            spi.useRouter(true);
+
+            cfg.setCommunicationSpi(spi);
+
+            TcpDiscoveryVmIpFinder clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+            String addr = new ArrayList<>(ipFinder.getRegisteredAddresses()).iterator().next().toString();
+
+            if (addr.startsWith("/"))
+                addr = addr.substring(1);
+
+            clientIpFinder.setAddresses(Arrays.asList(addr));
+
+            TcpClientDiscoverySpi discoSpi = new TcpClientDiscoverySpi();
+
+            discoSpi.setIpFinder(clientIpFinder);
+
+            cfg.setDiscoverySpi(discoSpi);
+
+            cfg.setClientMode(true);
+        }
+        else {
+            TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
+
+            discoverySpi.setIpFinder(ipFinder);
+
+            cfg.setDiscoverySpi(discoverySpi);
+        }
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testClientCommunication() throws Exception {
+        startGrid(0);
+        startGrid(1);
+
+        client = true;
+
+        startGrid(2);
+        startGrid(3);
+
+        Collection<UUID> uuids = ignite(1).compute().broadcast(new IdCollector());
+
+        for (int i = 0; i < 4; i++)
+            assert uuids.contains(ignite(i).cluster().localNode().id());
+    }
+
+    /**
+     *
+     */
+    private static class IdCollector implements IgniteCallable<UUID> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public UUID call() throws Exception {
+            return ignite.cluster().localNode().id();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b161f1c4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index 1d3bfcd..d509ffe 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -45,6 +45,8 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
 
         suite.addTest(new TestSuite(GridTcpCommunicationSpiConfigSelfTest.class));
 
+        suite.addTest(new TestSuite(ClientTcpCommunicationSelfTest.class));
+
         return suite;
     }
 }


Mime
View raw message