Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 09FEF200B85 for ; Thu, 15 Sep 2016 17:58:53 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0884D160ABA; Thu, 15 Sep 2016 15:58:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A24F1160AD7 for ; Thu, 15 Sep 2016 17:58:51 +0200 (CEST) Received: (qmail 85827 invoked by uid 500); 15 Sep 2016 15:58:50 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 85814 invoked by uid 99); 15 Sep 2016 15:58:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Sep 2016 15:58:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A4A74E07FE; Thu, 15 Sep 2016 15:58:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Thu, 15 Sep 2016 15:58:52 -0000 Message-Id: <56c9b86a9f7b4797bcf57396976d0228@git.apache.org> In-Reply-To: <93260f9b37b149c29bc5a76a710c4724@git.apache.org> References: <93260f9b37b149c29bc5a76a710c4724@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/5] ignite git commit: nio balance archived-at: Thu, 15 Sep 2016 15:58:53 -0000 nio balance Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bc85af16 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bc85af16 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bc85af16 Branch: refs/heads/ignite-comm-balance Commit: bc85af1680acca18f6d764e09ca2ecacb48c2319 Parents: 0b7ff82 Author: sboikov Authored: Thu Sep 15 15:12:50 2016 +0300 Committer: sboikov Committed: Thu Sep 15 15:12:50 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/util/IgniteUtils.java | 12 + .../ignite/internal/util/nio/GridNioServer.java | 361 ++++++++++++++++++- .../internal/util/nio/GridNioSessionImpl.java | 50 +++ .../util/nio/GridSelectorNioSessionImpl.java | 9 +- .../IgniteCommunicationBalanceTest.java | 118 ++++++ 5 files changed, 536 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bc85af16/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 93acc75..c7529bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -8307,6 +8307,18 @@ public abstract class IgniteUtils { } /** + * Gets absolute value for long. If argument is {@link Long#MIN_VALUE}, then {@code 0} is returned. + * + * @param i Argument. + * @return Absolute value. + */ + public static long safeAbs(long i) { + i = Math.abs(i); + + return i < 0 ? 0 : i; + } + + /** * Gets wrapper class for a primitive type. * * @param cls Class. If {@code null}, method is no-op. http://git-wip-us.apache.org/repos/asf/ignite/blob/bc85af16/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index f18615d..60db1fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -107,9 +107,6 @@ public class GridNioServer { private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey(); /** */ - private static final boolean NIO_SES_BALANCE_ENABLED = IgniteSystemProperties.getBoolean("IGNITE_NIO_SES_BALANCE_ENABLED", true); - - /** */ private static final boolean DISABLE_KEYSET_OPTIMIZATION = IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS); @@ -213,6 +210,15 @@ public class GridNioServer { /** Optional listener to monitor outbound message queue size. */ private IgniteBiInClosure msgQueueLsnr; + /** */ + private volatile long writerMoveCnt; + + /** */ + private volatile long readerMoveCnt; + + /** */ + private final Balancer balancer; + /** * @param addr Address. * @param port Port. @@ -324,6 +330,22 @@ public class GridNioServer { this.writerFactory = writerFactory; this.skipRecoveryPred = skipRecoveryPred != null ? skipRecoveryPred : F.alwaysFalse(); + + boolean balanceEnabled = IgniteSystemProperties.getBoolean("IGNITE_NIO_SES_BALANCE_ENABLED", true); + + Balancer balancer0 = null; + + if (balanceEnabled) { + String balancerCls = IgniteSystemProperties.getString("IGNITE_NIO_SES_BALANCER_CLASS_NAME"); + + if (balancerCls != null) { + + } + else + balancer0 = new SizeBasedBalancer(this); + } + + this.balancer = balancer0; } /** @@ -399,7 +421,10 @@ public class GridNioServer { NioOperationFuture fut = new NioOperationFuture<>(impl, NioOperation.CLOSE); - clientWorkers.get(impl.selectorIndex()).offer(fut); + int idx = impl.selectorIndex(); // TODO + + if (idx != -1) + clientWorkers.get(idx).offer(fut); return fut; } @@ -459,9 +484,13 @@ public class GridNioServer { if (ses.removeFuture(fut)) fut.connectionClosed(); } - else if (msgCnt == 1) + else if (msgCnt == 1) { // Change from 0 to 1 means that worker thread should be waken up. - clientWorkers.get(ses.selectorIndex()).offer(fut); + int idx = ses.selectorIndex(); + + if (idx != -1) // TODO revisit + clientWorkers.get(idx).offer(fut); + } if (msgQueueLsnr != null) msgQueueLsnr.apply(ses, msgCnt); @@ -925,6 +954,8 @@ public class GridNioServer { metricsLsnr.onBytesReceived(cnt); ses.bytesReceived(cnt); + ses.onBytesRead(cnt, readBuf.capacity()); + onRead(cnt); readBuf.flip(); @@ -1239,6 +1270,8 @@ public class GridNioServer { metricsLsnr.onBytesSent(cnt); ses.bytesSent(cnt); + ses.onBytesWritten(cnt, buf.capacity()); + onWrite(cnt); } else { // For test purposes only (skipWrite is set to true in tests only). @@ -1276,6 +1309,13 @@ public class GridNioServer { /** Worker index. */ private final int idx; + private volatile long bytesRcvd; + private volatile long bytesSent; + private volatile long bytesRcvd0; + private volatile long bytesSent0; + + private final GridConcurrentHashSet sessions0 = new GridConcurrentHashSet<>(); + /** * @param idx Index of this worker in server's array. * @param gridName Grid name. @@ -1400,6 +1440,40 @@ public class GridNioServer { break; } + case MOVE: { + SessionMoveFuture f = (SessionMoveFuture)req; + + GridSelectorNioSessionImpl ses = f.session(); + + if (idx == f.toIdx) { + ses.selectorIndex(idx); + + sessions0.add(ses); + + SelectionKey key = f.socketChannel().register(selector, + SelectionKey.OP_READ | SelectionKey.OP_WRITE, ses); // TODO what if reads were paused? + + ses.key(key); + } + else { + assert ses.selectorIndex() == idx; // TODO replace with IF and ignore? + + // Cleanup. + ses.selectorIndex(-1); + sessions0.remove(ses); + + SelectionKey key = ses.key(); + + f.socketChannel((SocketChannel)key.channel()); + + key.cancel(); + + clientWorkers.get(f.toIndex()).offer(f); + } + + break; + } + case REQUIRE_WRITE: { //Just register write key. SelectionKey key = req.session().key(); @@ -1467,6 +1541,10 @@ public class GridNioServer { sb.append(U.nl()) .append(">> Selector info [idx=").append(idx) .append(", keysCnt=").append(keys.size()) + .append(", bytesRcvd=").append(bytesRcvd) + .append(", bytesRcvd0=").append(bytesRcvd0) + .append(", bytesSent=").append(bytesSent) + .append(", bytesSent0=").append(bytesSent0) .append("]").append(U.nl()); for (SelectionKey key : keys) { @@ -1500,8 +1578,12 @@ public class GridNioServer { sb.append(", inRecoveryDesc=null"); sb.append(", bytesRcvd=").append(ses.bytesReceived()) + .append(", bytesRcvd0=").append(ses.bytesReceived0()) .append(", bytesSent=").append(ses.bytesSent()) + .append(", bytesSent0=").append(ses.bytesSent0()) .append(", opQueueSize=").append(ses.writeQueueSize()) + .append(", writeStats=").append(Arrays.toString(ses.writeStats())) + .append(", readStats=").append(Arrays.toString(ses.readStats())) .append(", msgWriter=").append(writer != null ? writer.toString() : "null") .append(", msgReader=").append(reader != null ? reader.toString() : "null"); @@ -1764,6 +1846,7 @@ public class GridNioServer { resend(ses); sessions.add(ses); + sessions0.add(ses); try { filterChain.onSessionOpened(ses); @@ -1789,7 +1872,7 @@ public class GridNioServer { } /** - * Closes the ses and all associated resources, then notifies the listener. + * Closes the session and all associated resources, then notifies the listener. * * @param ses Session to be closed. * @param e Exception to be passed to the listener, if any. @@ -1806,12 +1889,10 @@ public class GridNioServer { } sessions.remove(ses); + sessions0.remove(ses); SelectionKey key = ses.key(); - // Shutdown input and output so that remote client will see correct socket close. - Socket sock = ((SocketChannel)key.channel()).socket(); - if (ses.setClosed()) { ses.onClosed(); @@ -1823,6 +1904,9 @@ public class GridNioServer { ((DirectBuffer)ses.readBuffer()).cleaner().clean(); } + // Shutdown input and output so that remote client will see correct socket close. + Socket sock = ((SocketChannel)key.channel()).socket(); + try { try { sock.shutdownInput(); @@ -1906,6 +1990,24 @@ public class GridNioServer { * @throws IOException If write failed. */ protected abstract void processWrite(SelectionKey key) throws IOException; + + protected void onRead(int cnt) { // TODO + bytesRcvd += cnt; + bytesRcvd0 += cnt; + } + + protected void onWrite(int cnt) { + bytesSent += cnt; + bytesSent0 += cnt; + } + + protected void reset0() { + bytesSent0 = 0; + bytesRcvd0 = 0; + + for (GridSelectorNioSessionImpl ses : sessions0) + ses.reset0(); + } } /** @@ -1976,12 +2078,17 @@ public class GridNioServer { * @throws IgniteCheckedException If failed. */ private void accept() throws IgniteCheckedException { + long lastBalance = U.currentTimeMillis(); + try { while (!closed && selector.isOpen() && !Thread.currentThread().isInterrupted()) { // Wake up every 2 seconds to check if closed. if (selector.select(2000) > 0) // Walk through the ready keys collection and process date requests. processSelectedKeys(selector.selectedKeys()); + + if (balancer != null) + balancer.balance(); } } // Ignore this exception as thread interruption is equal to 'close' call. @@ -2082,6 +2189,9 @@ public class GridNioServer { /** Register read key selection. */ REGISTER, + /** */ + MOVE, + /** Register write key selection. */ REQUIRE_WRITE, @@ -2107,7 +2217,7 @@ public class GridNioServer { /** Socket channel in register request. */ @GridToStringExclude - private SocketChannel sockCh; + protected SocketChannel sockCh; // TODO to be fixed with proper hierarchy /** Session to perform operation on. */ @GridToStringExclude @@ -2249,14 +2359,14 @@ public class GridNioServer { /** * @return Socket channel for register request. */ - private SocketChannel socketChannel() { + SocketChannel socketChannel() { return sockCh; } /** * @return Session for this change request. */ - private GridSelectorNioSessionImpl session() { + GridSelectorNioSessionImpl session() { return ses; } @@ -2303,6 +2413,41 @@ public class GridNioServer { } /** + * + */ + private static class SessionMoveFuture extends NioOperationFuture { + /** */ + private final int toIdx; + + /** + * @param ses + * @param toIdx + */ + public SessionMoveFuture( + GridSelectorNioSessionImpl ses, + int toIdx + ) { + super(ses, NioOperation.MOVE); + + this.sockCh = sockCh; + this.toIdx = toIdx; + } + + int toIndex() { + return toIdx; + } + + void socketChannel(SocketChannel sockCh) { + this.sockCh = sockCh; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SessionMoveFuture.class, this, super.toString()); + } + } + + /** * Filter forwarding messages from chain's head to this server. */ private class HeadFilter extends GridNioFilterAdapter { @@ -2708,4 +2853,194 @@ public class GridNioServer { return this; } } + + /** + * + */ + public interface Balancer { + /** + * + */ + void balance(); + } + + /** + * + */ + private static class SizeBasedBalancer implements Balancer { + /** */ + private final GridNioServer srv; + + /** */ + private final IgniteLogger log; + + /** */ + private long lastBalance; + + /** + * @param srv Server. + */ + public SizeBasedBalancer(GridNioServer srv) { + this.srv = srv; + + log = srv.log; + } + + /** {@inheritDoc} */ + @Override public void balance() { + long now = U.currentTimeMillis(); + + if (lastBalance + 5000 < now) { + lastBalance = now; + + long maxRcvd0 = -1, minRcvd0 = -1, maxSent0 = -1, minSent0 = -1; + int maxRcvdIdx = -1, minRcvdIdx = -1, maxSentIdx = -1, minSentIdx = -1; + + boolean print = Thread.currentThread().getName().contains("IgniteCommunicationBalanceTest4"); + + List clientWorkers = (List)srv.clientWorkers; + + for (int i = 0; i < clientWorkers.size(); i++) { + GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i); + + if ((i & 1) == 0) { + // Reader. + long bytesRcvd0 = worker.bytesRcvd0; + + if ((maxRcvd0 == -1 || bytesRcvd0 > maxRcvd0) && bytesRcvd0 > 0 && + worker.sessions0.size() > 1) { + maxRcvd0 = bytesRcvd0; + maxRcvdIdx = i; + + continue; + } + + if (minRcvd0 == -1 || bytesRcvd0 < minRcvd0) { + minRcvd0 = bytesRcvd0; + minRcvdIdx = i; + } + } + else { + // Writer. + long bytesSent0 = worker.bytesSent0; + + if ((maxSent0 == -1 || bytesSent0 > maxSent0) && bytesSent0 > 0 && + worker.sessions0.size() > 1) { + maxSent0 = bytesSent0; + maxSentIdx = i; + + continue; + } + + if (minSent0 == -1 || bytesSent0 < minSent0) { + minSent0 = bytesSent0; + minSentIdx = i; + } + } + } + + if (log.isDebugEnabled()) + log.debug("Balancing data [minSent0=" + minSent0 + ", minSentIdx=" + minSentIdx + + ", maxSent0=" + maxSent0 + ", maxSentIdx=" + maxSentIdx + + ", minRcvd0=" + minRcvd0 + ", minRcvdIdx=" + minRcvdIdx + + ", maxRcvd0=" + maxRcvd0 + ", maxRcvdIdx=" + maxRcvdIdx + ']'); + + if (print) + log.info("Balancing data [minSent0=" + minSent0 + ", minSentIdx=" + minSentIdx + + ", maxSent0=" + maxSent0 + ", maxSentIdx=" + maxSentIdx + + ", minRcvd0=" + minRcvd0 + ", minRcvdIdx=" + minRcvdIdx + + ", maxRcvd0=" + maxRcvd0 + ", maxRcvdIdx=" + maxRcvdIdx + ']'); + + if (maxSent0 != -1 && minSent0 != -1) { + GridSelectorNioSessionImpl ses = null; + + long sentDiff = maxSent0 - minSent0; + long delta = sentDiff; + double threshold = sentDiff * 0.9; + + GridConcurrentHashSet sessions = + clientWorkers.get(maxSentIdx).sessions0; + + for (GridSelectorNioSessionImpl ses0 : sessions) { + long bytesSent0 = ses0.bytesSent0(); + + if (bytesSent0 < threshold && + (ses == null || delta > U.safeAbs(bytesSent0 - sentDiff / 2))) { + ses = ses0; + delta = U.safeAbs(bytesSent0 - sentDiff / 2); + } + } + + if (ses != null) { + if (log.isDebugEnabled()) + log.debug("Will move session to less loaded writer [ses=" + ses + + ", from=" + maxSentIdx + ", to=" + minSentIdx + ']'); + + if (print) + log.info("Will move session to less loaded writer [diff=" + sentDiff + ", ses=" + ses + + ", from=" + maxSentIdx + ", to=" + minSentIdx + ']'); + + srv.writerMoveCnt++; + + clientWorkers.get(maxSentIdx).offer(new SessionMoveFuture(ses, minSentIdx)); + } + else { + if (log.isDebugEnabled()) + log.debug("Unable to find session to move for writers."); + + if (print) + log.info("Unable to find session to move for writers."); + } + } + + if (maxRcvd0 != -1 && minRcvd0 != -1) { + GridSelectorNioSessionImpl ses = null; + + long rcvdDiff = maxRcvd0 - minRcvd0; + long delta = rcvdDiff; + double threshold = rcvdDiff * 0.9; + + GridConcurrentHashSet sessions = + clientWorkers.get(maxRcvdIdx).sessions0; + + for (GridSelectorNioSessionImpl ses0 : sessions) { + long bytesRcvd0 = ses0.bytesReceived0(); + + if (bytesRcvd0 < threshold && + (ses == null || delta > U.safeAbs(bytesRcvd0 - rcvdDiff / 2))) { + ses = ses0; + delta = U.safeAbs(bytesRcvd0 - rcvdDiff / 2); + } + } + + if (ses != null) { + if (log.isDebugEnabled()) + log.debug("Will move session to less loaded reader [ses=" + ses + + ", from=" + maxRcvdIdx + ", to=" + minRcvdIdx + ']'); + + if (print) + log.info("Will move session to less loaded reader [diff=" + rcvdDiff + ", ses=" + ses + + ", from=" + maxSentIdx + ", to=" + minSentIdx + ']'); + + srv.readerMoveCnt++; + + clientWorkers.get(maxRcvdIdx).offer(new SessionMoveFuture(ses, minRcvdIdx)); + } + else { + if (log.isDebugEnabled()) + log.debug("Unable to find session to move for readers."); + + if (print) + log.info("Unable to find session to move for readers."); + } + } + + for (int i = 0; i < clientWorkers.size(); i++) { + GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i); + + worker.reset0(); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/bc85af16/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java index 53a624d..3f5d367 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java @@ -45,12 +45,21 @@ public class GridNioSessionImpl implements GridNioSession { /** Session close timestamp. */ private final AtomicLong closeTime = new AtomicLong(); + private final long[] writesStat = new long[25]; + private final long[] readsStat = new long[25]; + /** Sent bytes counter. */ private volatile long bytesSent; /** Received bytes counter. */ private volatile long bytesRcvd; + /** Sent bytes counter since last re-balancing. */ + private volatile long bytesSent0; + + /** Received bytes counter since last re-balancing. */ + private volatile long bytesRcvd0; + /** Last send schedule timestamp. */ private volatile long sndSchedTime; @@ -163,6 +172,19 @@ public class GridNioSessionImpl implements GridNioSession { return bytesRcvd; } + public long bytesSent0() { + return bytesSent0; + } + + public long bytesReceived0() { + return bytesRcvd0; + } + + public void reset0() { + bytesSent0 = 0; + bytesRcvd0 = 0; + } + /** {@inheritDoc} */ @Override public long createTime() { return createTime; @@ -240,10 +262,37 @@ public class GridNioSessionImpl implements GridNioSession { */ public void bytesSent(int cnt) { bytesSent += cnt; + bytesSent0 += cnt; lastSndTime = U.currentTimeMillis(); } + public void onBytesWritten(int cnt, int bufCap) { + int idx = (int)Math.floor(((cnt * 1.0) / bufCap) * writesStat.length); + + if (idx >= writesStat.length) + idx = writesStat.length - 1; + + writesStat[idx]++; + } + + public void onBytesRead(int cnt, int bufCap) { + int idx = (int)Math.floor(((cnt * 1.0) / bufCap) * readsStat.length); + + if (idx >= readsStat.length) + idx = readsStat.length - 1; + + readsStat[idx]++; + } + + public long[] readStats() { + return readsStat; + } + + public long[] writeStats() { + return writesStat; + } + /** * Adds given amount ob bytes to the received bytes counter. *

@@ -253,6 +302,7 @@ public class GridNioSessionImpl implements GridNioSession { */ public void bytesReceived(int cnt) { bytesRcvd += cnt; + bytesRcvd0 += cnt; lastRcvTime = U.currentTimeMillis(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/bc85af16/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index a680a33..8e5b93d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -44,7 +44,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { private SelectionKey key; /** Worker index for server */ - private final int selectorIdx; + private volatile int selectorIdx; /** Size counter. */ private final AtomicInteger queueSize = new AtomicInteger(); @@ -161,6 +161,13 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { } /** + * @param selectorIdx Selector index. + */ + void selectorIndex(int selectorIdx) { + this.selectorIdx = selectorIdx; + } + + /** * Adds write future at the front of the queue without acquiring back pressure semaphore. * * @param writeFut Write request. http://git-wip-us.apache.org/repos/asf/ignite/blob/bc85af16/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java new file mode 100644 index 0000000..30a6254 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.log4j.helpers.ThreadLocalMap; + +/** + * + */ +public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpCommunicationSpi commSpi = ((TcpCommunicationSpi)cfg.getCommunicationSpi()); + + commSpi.setSharedMemoryPort(-1); + commSpi.setSelectorsCount(4); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testBalance() throws Exception { + startGrid(0); + + client = true; + + Ignite client = startGrid(4); + + startGridsMultiThreaded(1, 3); + + for (int i = 0; i < 4; i++) { + ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id()); + + client.compute(client.cluster().forNode(node)).run(new DummyRunnable()); + } + +// ThreadLocalRandom rnd = ThreadLocalRandom.current(); +// +// for (int iter = 0; iter < 10; iter++) { +// log.info("Iteration: " + iter); +// +// int nodeIdx = rnd.nextInt(4); +// +// ClusterNode node = client.cluster().node(ignite(nodeIdx).cluster().localNode().id()); +// +// for (int i = 0; i < 10_000; i++) +// client.compute(client.cluster().forNode(node)).run(new DummyRunnable()); +// +// U.sleep(5000); +// } + + while (true) { + ((IgniteKernal) client).dumpDebugInfo(); + + Thread.sleep(5000); + } + + //Thread.sleep(Long.MAX_VALUE); + } + + /** + * + */ + static class DummyRunnable implements IgniteRunnable { + /** {@inheritDoc} */ + @Override public void run() { + // No-op. + } + } +}