Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9B7B3189D3 for ; Mon, 15 Jun 2015 07:32:49 +0000 (UTC) Received: (qmail 51602 invoked by uid 500); 15 Jun 2015 07:32:49 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 51570 invoked by uid 500); 15 Jun 2015 07:32:49 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 51561 invoked by uid 99); 15 Jun 2015 07:32:49 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Jun 2015 07:32:49 +0000 X-ASF-Spam-Status: No, hits=-2000.4 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 15 Jun 2015 07:30:08 +0000 Received: (qmail 47764 invoked by uid 99); 15 Jun 2015 07:31:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Jun 2015 07:31:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5F664E02A2; Mon, 15 Jun 2015 07:31:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yzhdanov@apache.org To: commits@ignite.incubator.apache.org Date: Mon, 15 Jun 2015 07:31:57 -0000 Message-Id: <5438ef5f562f48a3a91d6f6870e6c0d2@git.apache.org> In-Reply-To: <15d51c252320406e8a75c3939d387087@git.apache.org> References: <15d51c252320406e8a75c3939d387087@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/50] incubator-ignite git commit: # ignite-970 X-Virus-Checked: Checked by ClamAV on apache.org # ignite-970 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/104a13fd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/104a13fd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/104a13fd Branch: refs/heads/ignite-1009-v4 Commit: 104a13fd2118804e42b5035df4340d7374c36e82 Parents: 1dbdd42 Author: sboikov Authored: Tue Jun 2 14:25:08 2015 +0300 Committer: sboikov Committed: Tue Jun 2 14:25:08 2015 +0300 ---------------------------------------------------------------------- .../util/nio/GridShmemCommunicationClient.java | 151 +++++++ .../communication/tcp/TcpCommunicationSpi.java | 414 ++++++++++++++++++- .../tcp/TcpCommunicationSpiMBean.java | 8 + .../IgniteCacheMessageRecoveryAbstractTest.java | 1 + .../spi/GridTcpSpiForwardingSelfTest.java | 1 + .../GridTcpCommunicationSpiAbstractTest.java | 13 + ...mmunicationSpiConcurrentConnectSelfTest.java | 4 +- ...cpCommunicationSpiMultithreadedSelfTest.java | 11 +- ...pCommunicationSpiMultithreadedShmemTest.java | 28 ++ ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 1 + ...GridTcpCommunicationSpiRecoverySelfTest.java | 1 + .../GridTcpCommunicationSpiShmemSelfTest.java | 38 ++ .../tcp/GridTcpCommunicationSpiTcpSelfTest.java | 7 + .../IgniteSpiCommunicationSelfTestSuite.java | 2 + .../HadoopIgfs20FileSystemAbstractSelfTest.java | 13 + ...oopSecondaryFileSystemConfigurationTest.java | 14 + .../hadoop/HadoopAbstractSelfTest.java | 6 + 17 files changed, 695 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java new file mode 100644 index 0000000..f3dc46f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.ipc.shmem.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * + */ +public class GridShmemCommunicationClient extends GridAbstractCommunicationClient { + /** */ + private final IpcSharedMemoryClientEndpoint shmem; + + /** */ + private final ByteBuffer writeBuf; + + /** */ + private final MessageFormatter formatter; + + /** + * @param metricsLsnr Metrics listener. + * @param port Shared memory IPC server port. + * @param connTimeout Connection timeout. + * @param log Logger. + * @param formatter Message formatter. + * @throws IgniteCheckedException If failed. + */ + public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr, + int port, + long connTimeout, + IgniteLogger log, + MessageFormatter formatter) + throws IgniteCheckedException + { + super(metricsLsnr); + + assert metricsLsnr != null; + assert port > 0 && port < 0xffff; + assert connTimeout >= 0; + + shmem = new IpcSharedMemoryClientEndpoint(port, (int)connTimeout, log); + + writeBuf = ByteBuffer.allocate(8 << 10); + + writeBuf.order(ByteOrder.nativeOrder()); + + this.formatter = formatter; + } + + /** {@inheritDoc} */ + @Override public synchronized void doHandshake(IgniteInClosure2X handshakeC) + throws IgniteCheckedException { + handshakeC.applyx(shmem.inputStream(), shmem.outputStream()); + } + + /** {@inheritDoc} */ + @Override public boolean close() { + boolean res = super.close(); + + if (res) + shmem.close(); + + return res; + } + + /** {@inheritDoc} */ + @Override public void forceClose() { + super.forceClose(); + + // Do not call forceClose() here. + shmem.close(); + } + + /** {@inheritDoc} */ + @Override public synchronized void sendMessage(byte[] data, int len) throws IgniteCheckedException { + if (closed()) + throw new IgniteCheckedException("Communication client was closed: " + this); + + try { + shmem.outputStream().write(data, 0, len); + + metricsLsnr.onBytesSent(len); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to send message to remote node: " + shmem, e); + } + + markUsed(); + } + + /** {@inheritDoc} */ + @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg) + throws IgniteCheckedException { + if (closed()) + throw new IgniteCheckedException("Communication client was closed: " + this); + + assert writeBuf.hasArray(); + + try { + int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf, formatter.writer()); + + metricsLsnr.onBytesSent(cnt); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to send message to remote node: " + shmem, e); + } + + markUsed(); + + return false; + } + + /** {@inheritDoc} */ + @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void flushIfNeeded(long timeout) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridShmemCommunicationClient.class, this, super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 19e54c8..3768db5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -25,15 +25,19 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.ipc.shmem.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.nio.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.*; +import org.apache.ignite.thread.*; import org.jetbrains.annotations.*; import org.jsr166.*; @@ -139,6 +143,10 @@ import static org.apache.ignite.events.EventType.*; @IgniteSpiConsistencyChecked(optional = false) public class TcpCommunicationSpi extends IgniteSpiAdapter implements CommunicationSpi, TcpCommunicationSpiMBean { + /** IPC error message. */ + public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " + + "(switching to TCP, may be slower)."; + /** Node attribute that is mapped to node IP addresses (value is comm.tcp.addrs). */ public static final String ATTR_ADDRS = "comm.tcp.addrs"; @@ -148,12 +156,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Node attribute that is mapped to node port number (value is comm.tcp.port). */ public static final String ATTR_PORT = "comm.tcp.port"; + /** Node attribute that is mapped to node port number (value is comm.shmem.tcp.port). */ + public static final String ATTR_SHMEM_PORT = "comm.shmem.tcp.port"; + /** Node attribute that is mapped to node's external addresses (value is comm.tcp.ext-addrs). */ public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs"; /** Default port which node sets listener to (value is 47100). */ public static final int DFLT_PORT = 47100; + /** Default port which node sets listener for shared memory connections (value is 48100). */ + public static final int DFLT_SHMEM_PORT = 48100; + /** Default idle connection timeout (value is 30000ms). */ public static final long DFLT_IDLE_CONN_TIMEOUT = 30000; @@ -293,7 +307,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert ses.accepted(); if (msg instanceof NodeIdMessage) - sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0); + sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0); else { assert msg instanceof HandshakeMessage : msg; @@ -322,6 +336,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridCommunicationClient oldClient = clients.get(sndId); + boolean hasShmemClient = false; + if (oldClient != null) { if (oldClient instanceof GridTcpNioCommunicationClient) { if (log.isDebugEnabled()) @@ -333,6 +349,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return; } + else { + assert oldClient instanceof GridShmemCommunicationClient; + + hasShmemClient = true; + } } GridFutureAdapter fut = new GridFutureAdapter<>(); @@ -359,10 +380,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return; } + else { + assert oldClient instanceof GridShmemCommunicationClient; + + hasShmemClient = true; + } } boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), - new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut)); + new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); if (log.isDebugEnabled()) log.debug("Received incoming connection from remote node " + @@ -371,7 +397,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (reserved) { try { GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg0.received(), true); + connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); fut.onDone(client); } @@ -393,11 +419,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } else { boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), - new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut)); + new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); if (reserved) { GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg0.received(), true); + connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); fut.onDone(client); } @@ -465,6 +491,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param node Node. * @param rcvCnt Number of received messages.. * @param sndRes If {@code true} sends response for recovery handshake. + * @param createClient If {@code true} creates NIO communication client. * @return Client. */ private GridTcpNioCommunicationClient connected( @@ -472,7 +499,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridNioSession ses, ClusterNode node, long rcvCnt, - boolean sndRes) { + boolean sndRes, + boolean createClient) { recovery.onHandshake(rcvCnt); ses.recoveryDescriptor(recovery); @@ -484,12 +512,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter recovery.connected(); - GridTcpNioCommunicationClient client = new GridTcpNioCommunicationClient(ses, log); + GridTcpNioCommunicationClient client = null; + + if (createClient) { + client = new GridTcpNioCommunicationClient(ses, log); - GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client); + GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client); - assert oldClient == null : "Client already created [node=" + node + ", client=" + client + + assert oldClient == null : "Client already created [node=" + node + ", client=" + client + ", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']'; + } return client; } @@ -517,22 +549,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** */ private final GridFutureAdapter fut; + /** */ + private final boolean createClient; + /** * @param ses Incoming session. * @param recoveryDesc Recovery descriptor. * @param rmtNode Remote node. * @param msg Handshake message. + * @param createClient If {@code true} creates NIO communication client.. * @param fut Connect future. */ ConnectClosure(GridNioSession ses, GridNioRecoveryDescriptor recoveryDesc, ClusterNode rmtNode, HandshakeMessage msg, + boolean createClient, GridFutureAdapter fut) { this.ses = ses; this.recoveryDesc = recoveryDesc; this.rmtNode = rmtNode; this.msg = msg; + this.createClient = createClient; this.fut = fut; } @@ -545,7 +583,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter msgFut.get(); GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg.received(), false); + connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient); fut.onDone(client); } @@ -594,6 +632,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Local port range. */ private int locPortRange = DFLT_PORT_RANGE; + /** Local port which node uses to accept shared memory connections. */ + private int shmemPort = DFLT_SHMEM_PORT; + /** Allocate direct buffer or heap buffer. */ private boolean directBuf = true; @@ -635,6 +676,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** NIO server. */ private GridNioServer nioSrvr; + /** Shared memory server. */ + private IpcSharedMemoryServerEndpoint shmemSrv; + /** {@code TCP_NODELAY} option value for created sockets. */ private boolean tcpNoDelay = DFLT_TCP_NODELAY; @@ -647,6 +691,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Socket write timeout. */ private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT; + /** Shared memory accept worker. */ + private ShmemAcceptWorker shmemAcceptWorker; + /** Idle client worker. */ private IdleClientWorker idleClientWorker; @@ -659,6 +706,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Recovery worker. */ private RecoveryWorker recoveryWorker; + /** Shared memory workers. */ + private final Collection shmemWorkers = new ConcurrentLinkedDeque8<>(); + /** Clients. */ private final ConcurrentMap clients = GridConcurrentFactory.newMap(); @@ -668,6 +718,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Bound port. */ private int boundTcpPort = -1; + /** Bound port for shared memory server. */ + private int boundTcpShmemPort = -1; + /** Count of selectors to use in TCP server. */ private int selectorsCnt = DFLT_SELECTORS_CNT; @@ -811,6 +864,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** + * Sets local port to accept shared memory connections. + *

+ * If set to {@code -1} shared memory communication will be disabled. + *

+ * If not provided, default value is {@link #DFLT_SHMEM_PORT}. + * + * @param shmemPort Port number. + */ + @IgniteSpiConfiguration(optional = true) + public void setSharedMemoryPort(int shmemPort) { + this.shmemPort = shmemPort; + } + + /** {@inheritDoc} */ + @Override public int getSharedMemoryPort() { + return shmemPort; + } + + /** * Sets maximum idle connection timeout upon which a connection * to client will be closed. *

@@ -1179,6 +1251,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assertParameter(sockRcvBuf >= 0, "sockRcvBuf >= 0"); assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0"); assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0"); + assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1"); assertParameter(reconCnt > 0, "reconnectCnt > 0"); assertParameter(selectorsCnt > 0, "selectorsCnt > 0"); assertParameter(minBufferedMsgCnt >= 0, "minBufferedMsgCnt >= 0"); @@ -1204,6 +1277,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } try { + shmemSrv = resetShmemServer(); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to start shared memory communication server.", e); + } + + try { // This method potentially resets local port to the value // local node was bound to. nioSrvr = resetNioServer(); @@ -1223,6 +1303,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter createSpiAttributeName(ATTR_ADDRS), addrs.get1(), createSpiAttributeName(ATTR_HOST_NAMES), addrs.get2(), createSpiAttributeName(ATTR_PORT), boundTcpPort, + createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort : null, createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs); } catch (IOException | IgniteCheckedException e) { @@ -1251,6 +1332,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug(configInfo("tcpNoDelay", tcpNoDelay)); log.debug(configInfo("sockSndBuf", sockSndBuf)); log.debug(configInfo("sockRcvBuf", sockRcvBuf)); + log.debug(configInfo("shmemPort", shmemPort)); log.debug(configInfo("msgQueueLimit", msgQueueLimit)); log.debug(configInfo("minBufferedMsgCnt", minBufferedMsgCnt)); log.debug(configInfo("connTimeout", connTimeout)); @@ -1272,6 +1354,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter registerMBean(gridName, this, TcpCommunicationSpiMBean.class); + if (shmemSrv != null) { + shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv); + + new IgniteThread(shmemAcceptWorker).start(); + } + nioSrvr.start(); idleClientWorker = new IdleClientWorker(); @@ -1301,6 +1389,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { spiCtx.registerPort(boundTcpPort, IgnitePortProtocol.TCP); + // SPI can start without shmem port. + if (boundTcpShmemPort > 0) + spiCtx.registerPort(boundTcpShmemPort, IgnitePortProtocol.TCP); + spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); ctxInitLatch.countDown(); @@ -1341,7 +1433,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // If configured TCP port is busy, find first available in range. for (int port = locPort; port < locPort + locPortRange; port++) { try { - MessageFactory messageFactory = new MessageFactory() { + MessageFactory msgFactory = new MessageFactory() { private MessageFactory impl; @Nullable @Override public Message create(byte type) { @@ -1354,7 +1446,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } }; - MessageFormatter messageFormatter = new MessageFormatter() { + MessageFormatter msgFormatter = new MessageFormatter() { private MessageFormatter impl; @Override public MessageWriter writer() { @@ -1376,7 +1468,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } }; - GridDirectParser parser = new GridDirectParser(messageFactory, messageFormatter); + GridDirectParser parser = new GridDirectParser(msgFactory, msgFormatter); IgnitePredicate skipRecoveryPred = new IgnitePredicate() { @Override public boolean apply(Message msg) { @@ -1403,7 +1495,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .writeTimeout(sockWriteTimeout) .filters(new GridNioCodecFilter(parser, log, true), new GridConnectionBytesVerifyFilter(log)) - .messageFormatter(messageFormatter) + .messageFormatter(msgFormatter) .skipRecoveryPredicate(skipRecoveryPred) .build(); @@ -1435,6 +1527,55 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx); } + /** + * Creates new shared memory communication server. + * @return Server. + * @throws IgniteCheckedException If failed. + */ + @Nullable private IpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException { + if (boundTcpShmemPort >= 0) + throw new IgniteCheckedException("Shared memory server was already created on port " + boundTcpShmemPort); + + if (shmemPort == -1 || U.isWindows()) + return null; + + IgniteCheckedException lastEx = null; + + // If configured TCP port is busy, find first available in range. + for (int port = shmemPort; port < shmemPort + locPortRange; port++) { + try { + IpcSharedMemoryServerEndpoint srv = + new IpcSharedMemoryServerEndpoint(log, ignite.configuration().getNodeId(), gridName); + + srv.setPort(port); + + srv.omitOutOfResourcesWarning(true); + + srv.start(); + + boundTcpShmemPort = port; + + // Ack Port the TCP server was bound to. + if (log.isInfoEnabled()) + log.info("Successfully bound shared memory communication to TCP port [port=" + boundTcpShmemPort + + ", locHost=" + locHost + ']'); + + return srv; + } + catch (IgniteCheckedException e) { + lastEx = e; + + if (log.isDebugEnabled()) + log.debug("Failed to bind to local port (will try next port within range) [port=" + port + + ", locHost=" + locHost + ']'); + } + } + + // If free port wasn't found. + throw new IgniteCheckedException("Failed to bind shared memory communication to any port within range [startPort=" + + locPort + ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx); + } + /** {@inheritDoc} */ @Override public void spiStop() throws IgniteSpiException { assert isNodeStopping(); @@ -1445,6 +1586,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (nioSrvr != null) nioSrvr.stop(); + U.cancel(shmemAcceptWorker); + U.join(shmemAcceptWorker, log); + U.interrupt(idleClientWorker); U.interrupt(clientFlushWorker); U.interrupt(sockTimeoutWorker); @@ -1455,6 +1599,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter U.join(sockTimeoutWorker, log); U.join(recoveryWorker, log); + U.cancel(shmemWorkers); + U.join(shmemWorkers, log); + + shmemWorkers.clear(); + // Force closing on stop (safety). for (GridCommunicationClient client : clients.values()) client.forceClose(); @@ -1665,13 +1814,110 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @Nullable protected GridCommunicationClient createNioClient(ClusterNode node) throws IgniteCheckedException { assert node != null; - if (getSpiContext().localNode() == null) + Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT)); + + ClusterNode locNode = getSpiContext().localNode(); + + if (locNode == null) throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)"); + // If remote node has shared memory server enabled and has the same set of MACs + // then we are likely to run on the same host and shared memory communication could be tried. + if (shmemPort != null && U.sameMacs(locNode, node)) { + try { + return createShmemClient(node, shmemPort); + } + catch (IgniteCheckedException e) { + if (e.hasCause(IpcOutOfSystemResourcesException.class)) + // Has cause or is itself the IpcOutOfSystemResourcesException. + LT.warn(log, null, OUT_OF_RESOURCES_TCP_MSG); + else if (getSpiContext().node(node.id()) != null) + LT.warn(log, null, e.getMessage()); + else if (log.isDebugEnabled()) + log.debug("Failed to establish shared memory connection with local node (node has left): " + + node.id()); + } + } + return createTcpClient(node); } /** + * @param node Node. + * @param port Port. + * @return Client. + * @throws IgniteCheckedException If failed. + */ + @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node, Integer port) throws IgniteCheckedException { + int attempt = 1; + + int connectAttempts = 1; + + long connTimeout0 = connTimeout; + + while (true) { + GridCommunicationClient client; + + try { + client = new GridShmemCommunicationClient(metricsLsnr, + port, + connTimeout, + log, + getSpiContext().messageFormatter()); + } + catch (IgniteCheckedException e) { + // Reconnect for the second time, if connection is not established. + if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) { + connectAttempts++; + + continue; + } + + throw e; + } + + try { + safeHandshake(client, null, node.id(), connTimeout0); + } + catch (HandshakeTimeoutException e) { + if (log.isDebugEnabled()) + log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 + + ", err=" + e.getMessage() + ", client=" + client + ']'); + + client.forceClose(); + + if (attempt == reconCnt || connTimeout0 > maxConnTimeout) { + if (log.isDebugEnabled()) + log.debug("Handshake timedout (will stop attempts to perform the handshake) " + + "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout + + ", attempt=" + attempt + ", reconCnt=" + reconCnt + + ", err=" + e.getMessage() + ", client=" + client + ']'); + + throw e; + } + else { + attempt++; + + connTimeout0 *= 2; + + continue; + } + } + catch (IgniteCheckedException | RuntimeException | Error e) { + if (log.isDebugEnabled()) + log.debug( + "Caught exception (will close client) [err=" + e.getMessage() + ", client=" + client + ']'); + + client.forceClose(); + + throw e; + } + + return client; + } + } + + /** * Establish TCP connection to remote node and returns client. * * @param node Remote node. @@ -2154,6 +2400,144 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** + * This worker takes responsibility to shut the server down when stopping, + * No other thread shall stop passed server. + */ + private class ShmemAcceptWorker extends GridWorker { + /** */ + private final IpcSharedMemoryServerEndpoint srv; + + /** + * @param srv Server. + */ + ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) { + super(gridName, "shmem-communication-acceptor", TcpCommunicationSpi.this.log); + + this.srv = srv; + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + try { + while (!Thread.interrupted()) { + ShmemWorker e = new ShmemWorker(srv.accept()); + + shmemWorkers.add(e); + + new IgniteThread(e).start(); + } + } + catch (IgniteCheckedException e) { + if (!isCancelled()) + U.error(log, "Shmem server failed.", e); + } + finally { + srv.close(); + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + super.cancel(); + + srv.close(); + } + } + + /** + * + */ + private class ShmemWorker extends GridWorker { + /** */ + private final IpcEndpoint endpoint; + + /** + * @param endpoint Endpoint. + */ + private ShmemWorker(IpcEndpoint endpoint) { + super(gridName, "shmem-worker", TcpCommunicationSpi.this.log); + + this.endpoint = endpoint; + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + try { + MessageFactory msgFactory = new MessageFactory() { + private MessageFactory impl; + + @Nullable @Override public Message create(byte type) { + if (impl == null) + impl = getSpiContext().messageFactory(); + + assert impl != null; + + return impl.create(type); + } + }; + + MessageFormatter msgFormatter = new MessageFormatter() { + private MessageFormatter impl; + + @Override public MessageWriter writer() { + if (impl == null) + impl = getSpiContext().messageFormatter(); + + assert impl != null; + + return impl.writer(); + } + + @Override public MessageReader reader(MessageFactory factory) { + if (impl == null) + impl = getSpiContext().messageFormatter(); + + assert impl != null; + + return impl.reader(factory); + } + }; + + IpcToNioAdapter adapter = new IpcToNioAdapter<>( + metricsLsnr, + log, + endpoint, + srvLsnr, + msgFormatter, + new GridNioCodecFilter(new GridDirectParser(msgFactory, msgFormatter), log, true), + new GridConnectionBytesVerifyFilter(log) + ); + + adapter.serve(); + } + finally { + shmemWorkers.remove(this); + + endpoint.close(); + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + super.cancel(); + + endpoint.close(); + } + + /** @{@inheritDoc} */ + @Override protected void cleanup() { + super.cleanup(); + + endpoint.close(); + } + + /** @{@inheritDoc} */ + @Override public String toString() { + return S.toString(ShmemWorker.class, this); + } + } + + /** * */ private class IdleClientWorker extends IgniteSpiThread { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java index 5c80e6e..3c6b64e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java @@ -44,6 +44,14 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean { public int getLocalPort(); /** + * Gets local port for shared memory communication. + * + * @return Port number. + */ + @MXBeanDescription("Shared memory endpoint port number.") + public int getSharedMemoryPort(); + + /** * Gets maximum number of local ports tried if all previously * tried ports are occupied. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java index 96abe5f..8031315 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java @@ -50,6 +50,7 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); commSpi.setSocketWriteTimeout(1000); + commSpi.setSharedMemoryPort(-1); cfg.setCommunicationSpi(commSpi); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java index ed9e0cf..744635d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java @@ -115,6 +115,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { commSpi.setLocalAddress("127.0.0.1"); commSpi.setLocalPort(commLocPort); commSpi.setLocalPortRange(1); + commSpi.setSharedMemoryPort(-1); cfg.setCommunicationSpi(commSpi); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java index ea51aff..2d3f506 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java @@ -37,10 +37,23 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica /** */ public static final int IDLE_CONN_TIMEOUT = 2000; + /** */ + private final boolean useShmem; + + /** + * @param useShmem Use shared mem flag. + */ + protected GridTcpCommunicationSpiAbstractTest(boolean useShmem) { + this.useShmem = useShmem; + } + /** {@inheritDoc} */ @Override protected CommunicationSpi getSpi(int idx) { TcpCommunicationSpi spi = new TcpCommunicationSpi(); + if (!useShmem) + spi.setSharedMemoryPort(-1); + spi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); spi.setIdleConnectionTimeout(IDLE_CONN_TIMEOUT); spi.setTcpNoDelay(tcpNoDelay()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java index c038ee7..26e1120 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java @@ -181,8 +181,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest() { - @Override - public Long call() throws Exception { + @Override public Long call() throws Exception { long dummyRes = 0; List list = new ArrayList<>(); @@ -300,6 +299,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest spiRsrcs = new ArrayList<>(); @@ -80,9 +83,12 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac } /** + * @param useShmem Use shared mem. */ - public GridTcpCommunicationSpiMultithreadedSelfTest() { + protected GridTcpCommunicationSpiMultithreadedSelfTest(boolean useShmem) { super(false); + + this.useShmem = useShmem; } /** @@ -413,6 +419,9 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac private CommunicationSpi newCommunicationSpi() { TcpCommunicationSpi spi = new TcpCommunicationSpi(); + if (!useShmem) + spi.setSharedMemoryPort(-1); + spi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); spi.setIdleConnectionTimeout(IDLE_CONN_TIMEOUT); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java new file mode 100644 index 0000000..590b426 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +/** + * + */ +public class GridTcpCommunicationSpiMultithreadedShmemTest extends GridTcpCommunicationSpiMultithreadedSelfTest { + /** */ + public GridTcpCommunicationSpiMultithreadedShmemTest() { + super(true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index c0f0b11..1a4ba22 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -324,6 +324,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest protected TcpCommunicationSpi getSpi(int idx) { TcpCommunicationSpi spi = new TcpCommunicationSpi(); + spi.setSharedMemoryPort(-1); spi.setLocalPort(port++); spi.setIdleConnectionTimeout(10_000); spi.setConnectTimeout(10_000); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java new file mode 100644 index 0000000..5746a3c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.testframework.junits.spi.*; + +/** + * + */ +@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI") +public class GridTcpCommunicationSpiShmemSelfTest extends GridTcpCommunicationSpiAbstractTest { + /** + * + */ + public GridTcpCommunicationSpiShmemSelfTest() { + super(true); + } + + /** {@inheritDoc} */ + @Override protected boolean tcpNoDelay() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java index 32bced2..c27a86f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java @@ -24,6 +24,13 @@ import org.apache.ignite.testframework.junits.spi.*; */ @GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI") public class GridTcpCommunicationSpiTcpSelfTest extends GridTcpCommunicationSpiAbstractTest { + /** + * + */ + public GridTcpCommunicationSpiTcpSelfTest() { + super(false); + } + /** {@inheritDoc} */ @Override protected boolean tcpNoDelay() { return true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java index 1d3bfcd..ff86bda 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java @@ -38,10 +38,12 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(GridTcpCommunicationSpiTcpSelfTest.class)); suite.addTest(new TestSuite(GridTcpCommunicationSpiTcpNoDelayOffSelfTest.class)); + suite.addTest(new TestSuite(GridTcpCommunicationSpiShmemSelfTest.class)); suite.addTest(new TestSuite(GridTcpCommunicationSpiStartStopSelfTest.class)); suite.addTest(new TestSuite(GridTcpCommunicationSpiMultithreadedSelfTest.class)); + suite.addTest(new TestSuite(GridTcpCommunicationSpiMultithreadedShmemTest.class)); suite.addTest(new TestSuite(GridTcpCommunicationSpiConfigSelfTest.class)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java index 9bcd5de..a1535ed 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java @@ -31,6 +31,8 @@ import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; +import org.apache.ignite.spi.communication.*; +import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; @@ -186,6 +188,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA cfg.setFileSystemConfiguration(igfsCfg); cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); cfg.setLocalHost(U.getLocalHost().getHostAddress()); + cfg.setCommunicationSpi(communicationSpi()); G.start(cfg); } @@ -211,6 +214,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA cfg.setFileSystemConfiguration(igfsConfiguration(gridName)); cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); cfg.setLocalHost("127.0.0.1"); + cfg.setCommunicationSpi(communicationSpi()); return cfg; } @@ -270,6 +274,15 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA return cfg; } + /** @return Communication SPI. */ + private CommunicationSpi communicationSpi() { + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + return commSpi; + } + /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { G.stopAll(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java index b089995..8c33679 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java @@ -31,6 +31,8 @@ import org.apache.ignite.internal.processors.hadoop.igfs.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.communication.*; +import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; @@ -279,6 +281,8 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra cfg.setFileSystemConfiguration(igfsCfg); cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + cfg.setCommunicationSpi(communicationSpi()); + G.start(cfg); } @@ -314,6 +318,7 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra cfg.setCacheConfiguration(cacheConfiguration()); cfg.setFileSystemConfiguration(fsConfiguration(gridName)); cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + cfg.setCommunicationSpi(communicationSpi()); return cfg; } @@ -371,6 +376,15 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra return cfg; } + /** @return Communication SPI. */ + private CommunicationSpi communicationSpi() { + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + return commSpi; + } + /** * Case #SecondaryFileSystemProvider(null, path) * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/104a13fd/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java index af1a1e1..7fda532 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java @@ -94,6 +94,12 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { cfg.setHadoopConfiguration(hadoopConfiguration(gridName)); + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); discoSpi.setIpFinder(IP_FINDER);