ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [38/50] [abbrv] ignite git commit: io optimizations
Date Fri, 21 Oct 2016 15:55:05 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index fc4f5b1..1dd270b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -266,7 +266,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     public static final int DFLT_PORT = 47100;
 
     /** Default port which node sets listener for shared memory connections (value is <tt>48100</tt>).
*/
-    public static final int DFLT_SHMEM_PORT = 48100;
+    public static final int DFLT_SHMEM_PORT = -1;
 
     /** Default idle connection timeout (value is <tt>30000</tt>ms). */
     public static final long DFLT_IDLE_CONN_TIMEOUT = 30000;
@@ -288,9 +288,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /**
      * Default count of selectors for TCP server equals to
-     * {@code "Math.min(8, Runtime.getRuntime().availableProcessors())"}.
+     * {@code "Runtime.getRuntime().availableProcessors()"}.
      */
-    public static final int DFLT_SELECTORS_CNT = Math.min(8, Runtime.getRuntime().availableProcessors());
+    public static final int DFLT_SELECTORS_CNT = Runtime.getRuntime().availableProcessors();
 
     /** Connection index meta for session. */
     private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
@@ -358,7 +358,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (log.isDebugEnabled())
                         log.debug("Sending local node ID to newly accepted session: " + ses);
 
-                    ses.send(nodeIdMessage());
+                    try {
+                        ses.sendNoFuture(nodeIdMessage());
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send message: " + e, e);
+                    }
                 }
             }
 
@@ -636,7 +641,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                         ", rcvCnt=" + rcvCnt + ']');
                                 }
 
-                                nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt));
+                                ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt));
 
                                 recovery.lastAcknowledged(rcvCnt);
                             }
@@ -688,8 +693,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 nioSrvr.resend(ses);
 
-                if (sndRes)
-                    nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
+                try {
+                    if (sndRes)
+                        nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send message: " + e, e);
+                }
 
                 recovery.onConnected();
 
@@ -713,12 +723,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 GridNioRecoveryDescriptor recovery,
                 GridNioSession ses,
                 boolean sndRes) {
-                ses.inRecoveryDescriptor(recovery);
+                try {
+                    ses.inRecoveryDescriptor(recovery);
 
-                if (sndRes)
-                    nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
+                    if (sndRes)
+                        nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
 
-                recovery.onConnected();
+                    recovery.onConnected();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send message: " + e, e);
+                }
             }
 
             /**
@@ -755,30 +770,35 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 /** {@inheritDoc} */
                 @Override public void apply(Boolean success) {
-                    failed = !success;
+                    try {
+                        failed = !success;
 
-                    if (success) {
-                        IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>()
{
-                            @Override public void apply(IgniteInternalFuture<?> msgFut)
{
-                                try {
-                                    msgFut.get();
+                        if (success) {
+                            IgniteInClosure<IgniteInternalFuture<?>> lsnr = new
IgniteInClosure<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?>
msgFut) {
+                                    try {
+                                        msgFut.get();
 
-                                    connectedNew(recoveryDesc, ses, false);
-                                }
-                                catch (IgniteCheckedException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Failed to send recovery handshake " +
-                                            "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
+                                        connectedNew(recoveryDesc, ses, false);
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Failed to send recovery handshake
" +
+                                                    "[rmtNode=" + rmtNode.id() + ", err="
+ e + ']');
 
-                                    recoveryDesc.release();
+                                        recoveryDesc.release();
+                                    }
                                 }
-                            }
-                        };
+                            };
 
-                        nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()),
lsnr);
+                            nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()),
lsnr);
+                        }
+                        else
+                            nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1));
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send message: " + e, e);
                     }
-                    else
-                        nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1));
                 }
             }
 
@@ -839,32 +859,37 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 /** {@inheritDoc} */
                 @Override public void apply(Boolean success) {
                     if (success) {
-                        IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>()
{
-                            @Override public void apply(IgniteInternalFuture<?> msgFut)
{
-                                try {
-                                    msgFut.get();
+                        try {
+                            IgniteInClosure<IgniteInternalFuture<?>> lsnr = new
IgniteInClosure<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?>
msgFut) {
+                                    try {
+                                        msgFut.get();
 
-                                    GridTcpNioCommunicationClient client =
-                                        connected(recoveryDesc, ses, rmtNode, msg.received(),
false, createClient);
+                                        GridTcpNioCommunicationClient client =
+                                                connected(recoveryDesc, ses, rmtNode, msg.received(),
false, createClient);
 
-                                    fut.onDone(client);
-                                }
-                                catch (IgniteCheckedException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Failed to send recovery handshake " +
-                                            "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
+                                        fut.onDone(client);
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Failed to send recovery handshake
" +
+                                                    "[rmtNode=" + rmtNode.id() + ", err="
+ e + ']');
 
-                                    recoveryDesc.release();
+                                        recoveryDesc.release();
 
-                                    fut.onDone();
-                                }
-                                finally {
-                                    clientFuts.remove(connKey, fut);
+                                        fut.onDone();
+                                    }
+                                    finally {
+                                        clientFuts.remove(connKey, fut);
+                                    }
                                 }
-                            }
-                        };
+                            };
 
-                        nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()),
lsnr);
+                            nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()),
lsnr);
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to send message: " + e, e);
+                        }
                     }
                     else {
                         try {
@@ -3508,9 +3533,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                 log.debug("Send recovery acknowledgement on timeout [rmtNode="
+ nodeId +
                                     ", rcvCnt=" + msg.received() + ']');
 
-                            nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(),
msg);
+                            try {
+                                nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(),
msg);
 
-                            recovery.lastAcknowledged(msg.received());
+                                recovery.lastAcknowledged(msg.received());
+                            }
+                            catch (IgniteCheckedException err) {
+                                U.error(log, "Failed to send message: " + err, err);
+                            }
 
                             continue;
                         }
@@ -3566,9 +3596,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         ", lastAcked=" + recovery.lastAcknowledged() + ']');
                 }
 
-                nioSrvr.sendSystem(ses, msg);
+                try {
+                    nioSrvr.sendSystem(ses, msg);
 
-                recovery.lastAcknowledged(msg.received());
+                    recovery.lastAcknowledged(msg.received());
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send message: " + e, e);
+                }
             }
         }
 
@@ -3944,7 +3979,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
-     *
+     * Updated handshake message.
      */
     @SuppressWarnings("PublicInnerClass")
     public static class HandshakeMessage2 extends HandshakeMessage {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
index 58b91e4..d403784 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
@@ -114,7 +114,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest
{
                 proceedExceptionCaught(ses, ex);
             }
 
-            @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object
msg) {
+            @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object
msg, boolean fut) {
                 sndEvt.compareAndSet(null, ses.<String>meta(MESSAGE_WRITE_META_NAME));
 
                 sndMsgObj.compareAndSet(null, msg);
@@ -155,7 +155,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest
{
         chain.onSessionIdleTimeout(ses);
         chain.onSessionWriteTimeout(ses);
         assertNull(chain.onSessionClose(ses));
-        assertNull(chain.onSessionWrite(ses, snd));
+        assertNull(chain.onSessionWrite(ses, snd, true));
 
         assertEquals("DCBA", connectedEvt.get());
         assertEquals("DCBA", disconnectedEvt.get());
@@ -210,10 +210,10 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest
{
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object
msg) throws IgniteCheckedException {
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object
msg, boolean fut) throws IgniteCheckedException {
             chainMeta(ses, MESSAGE_WRITE_META_NAME);
 
-            return proceedSessionWrite(ses, msg);
+            return proceedSessionWrite(ses, msg, fut);
         }
 
         /** {@inheritDoc} */
@@ -349,6 +349,11 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest
{
         }
 
         /** {@inheritDoc} */
+        @Override public void sendNoFuture(Object msg) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
         @Override public GridNioFuture<Object> resumeReads() {
             return null;
         }
@@ -387,5 +392,10 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest
{
         @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
             return null;
         }
+
+        /** {@inheritDoc} */
+        @Override public void systemMessage(Object msg) {
+            // No-op.
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestAbstractBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestAbstractBenchmark.java
b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestAbstractBenchmark.java
new file mode 100644
index 0000000..8791c83
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestAbstractBenchmark.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.io;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.yardstick.IgniteAbstractBenchmark;
+import org.yardstickframework.BenchmarkConfiguration;
+import org.yardstickframework.BenchmarkUtils;
+
+/**
+ *
+ */
+public abstract class IgniteIoTestAbstractBenchmark extends IgniteAbstractBenchmark {
+    /** */
+    protected final List<ClusterNode> targetNodes = new ArrayList<>();
+
+    /** */
+    protected IgniteKernal ignite;
+
+    /** {@inheritDoc} */
+    @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+        super.setUp(cfg);
+
+        ignite = (IgniteKernal)ignite();
+
+        ClusterNode loc = ignite().cluster().localNode();
+
+        Collection<ClusterNode> nodes = ignite().cluster().forServers().nodes();
+
+        for (ClusterNode node : nodes) {
+            if (!loc.equals(node))
+                targetNodes.add(node);
+        }
+
+        if (targetNodes.isEmpty())
+            throw new IgniteException("Failed to find remote server nodes [nodes=" + nodes
+ ']');
+
+        BenchmarkUtils.println(cfg, "Initialized target nodes: " + F.nodeIds(targetNodes)
+ ']');
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestSendAllBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestSendAllBenchmark.java
b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestSendAllBenchmark.java
new file mode 100644
index 0000000..9011910
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestSendAllBenchmark.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.io;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public class IgniteIoTestSendAllBenchmark extends IgniteIoTestAbstractBenchmark {
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        ignite.sendIoTest(targetNodes, null, false).get();
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestSendRandomBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestSendRandomBenchmark.java
b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestSendRandomBenchmark.java
new file mode 100644
index 0000000..88368e0
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestSendRandomBenchmark.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.io;
+
+import java.util.Map;
+import org.apache.ignite.cluster.ClusterNode;
+
+/**
+ *
+ */
+public class IgniteIoTestSendRandomBenchmark extends IgniteIoTestAbstractBenchmark {
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        ClusterNode node = targetNodes.get(nextRandom(targetNodes.size()));
+
+        ignite.sendIoTest(node, null, false).get();
+
+        return true;
+    }
+}


Mime
View raw message