Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1CA931746C for ; Thu, 11 Jun 2015 09:52:50 +0000 (UTC) Received: (qmail 33704 invoked by uid 500); 11 Jun 2015 09:52:50 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 33670 invoked by uid 500); 11 Jun 2015 09:52:50 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 33659 invoked by uid 99); 11 Jun 2015 09:52:49 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jun 2015 09:52:49 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 5FB52181BE0 for ; Thu, 11 Jun 2015 09:52:49 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.791 X-Spam-Level: * X-Spam-Status: No, score=1.791 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id HW32mO1V6ckS for ; Thu, 11 Jun 2015 09:52:35 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 5C9EA2662E for ; Thu, 11 Jun 2015 09:52:35 +0000 (UTC) Received: (qmail 32765 invoked by uid 99); 11 Jun 2015 09:52:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jun 2015 09:52:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F2DF2E04B0; Thu, 11 Jun 2015 09:52:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 11 Jun 2015 09:52:36 -0000 Message-Id: In-Reply-To: <9b7f24f1005b41d39c1cd65767395513@git.apache.org> References: <9b7f24f1005b41d39c1cd65767395513@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/5] incubator-ignite git commit: ignite-sprint-6: merge from ignite-sprint-5 http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java index dc00ca6..d4ae147 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.util.ipc.shmem; import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; @@ -25,6 +26,8 @@ import java.net.*; import java.nio.channels.*; import java.security.*; import java.util.*; +import java.util.jar.*; +import java.util.zip.*; import static org.apache.ignite.internal.IgniteVersionUtils.*; @@ -36,6 +39,9 @@ public class IpcSharedMemoryNativeLoader { /** Library name base. */ private static final String LIB_NAME_BASE = "igniteshmem"; + /** Library jar name base. */ + private static final String JAR_NAME_BASE = "shmem"; + /** Library name. */ static final String LIB_NAME = LIB_NAME_BASE + "-" + VER_STR; @@ -84,9 +90,10 @@ public class IpcSharedMemoryNativeLoader { } /** + * @param log Logger, if available. If null, warnings will be printed out to console. * @throws IgniteCheckedException If failed. */ - public static void load() throws IgniteCheckedException { + public static void load(IgniteLogger log) throws IgniteCheckedException { if (loaded) return; @@ -94,7 +101,7 @@ public class IpcSharedMemoryNativeLoader { if (loaded) return; - doLoad(); + doLoad(log); loaded = true; } @@ -103,7 +110,7 @@ public class IpcSharedMemoryNativeLoader { /** * @throws IgniteCheckedException If failed. */ - private static void doLoad() throws IgniteCheckedException { + private static void doLoad(IgniteLogger log) throws IgniteCheckedException { assert Thread.holdsLock(IpcSharedMemoryNativeLoader.class); Collection errs = new ArrayList<>(); @@ -124,7 +131,7 @@ public class IpcSharedMemoryNativeLoader { // Obtain lock on file to prevent concurrent extracts. try (RandomAccessFile randomAccessFile = new RandomAccessFile(lockFile, "rws"); - FileLock ignored = randomAccessFile.getChannel().lock()) { + FileLock ignored = randomAccessFile.getChannel().lock()) { if (extractAndLoad(errs, tmpDir, platformSpecificResourcePath())) return; @@ -134,6 +141,31 @@ public class IpcSharedMemoryNativeLoader { if (extractAndLoad(errs, tmpDir, resourcePath())) return; + try { + if (log != null) + LT.warn(log, null, "Failed to load 'igniteshmem' library from classpath. Will try to load it from IGNITE_HOME."); + + String igniteHome = X.resolveIgniteHome(); + + File shmemJar = findShmemJar(errs, igniteHome); + + if (shmemJar != null) { + try (JarFile jar = new JarFile(shmemJar, false, JarFile.OPEN_READ)) { + if (extractAndLoad(errs, jar, tmpDir, platformSpecificResourcePath())) + return; + + if (extractAndLoad(errs, jar, tmpDir, osSpecificResourcePath())) + return; + + if (extractAndLoad(errs, jar, tmpDir, resourcePath())) + return; + } + } + } + catch (IgniteCheckedException ignore) { + // No-op. + } + // Failed to find the library. assert !errs.isEmpty(); @@ -145,6 +177,32 @@ public class IpcSharedMemoryNativeLoader { } /** + * Tries to find shmem jar in IGNITE_HOME/libs folder. + * + * @param errs Collection of errors to add readable exception to. + * @param igniteHome Resolver IGNITE_HOME variable. + * @return File, if found. + */ + private static File findShmemJar(Collection errs, String igniteHome) { + File libs = new File(igniteHome, "libs"); + + if (!libs.exists() || libs.isFile()) { + errs.add(new IllegalStateException("Failed to find libs folder in resolved IGNITE_HOME: " + igniteHome)); + + return null; + } + + for (File lib : libs.listFiles()) { + if (lib.getName().endsWith(".jar") && lib.getName().contains(JAR_NAME_BASE)) + return lib; + } + + errs.add(new IllegalStateException("Failed to find shmem jar in resolved IGNITE_HOME: " + igniteHome)); + + return null; + } + + /** * Gets temporary directory unique for each OS user. * The directory guaranteed to exist, though may not be empty. */ @@ -220,6 +278,24 @@ public class IpcSharedMemoryNativeLoader { /** * @param errs Errors collection. + * @param rsrcPath Path. + * @return {@code True} if library was found and loaded. + */ + private static boolean extractAndLoad(Collection errs, JarFile jar, File tmpDir, String rsrcPath) { + ZipEntry rsrc = jar.getEntry(rsrcPath); + + if (rsrc != null) + return extract(errs, rsrc, jar, new File(tmpDir, mapLibraryName(LIB_NAME))); + else { + errs.add(new IllegalStateException("Failed to find resource within specified jar file " + + "[rsrc=" + rsrcPath + ", jar=" + jar.getName() + ']')); + + return false; + } + } + + /** + * @param errs Errors collection. * @param src Source. * @param target Target. * @return {@code True} if resource was found and loaded. @@ -230,7 +306,7 @@ public class IpcSharedMemoryNativeLoader { InputStream is = null; try { - if (!target.exists() || !haveEqualMD5(target, src)) { + if (!target.exists() || !haveEqualMD5(target, src.openStream())) { is = src.openStream(); if (is != null) { @@ -265,20 +341,69 @@ public class IpcSharedMemoryNativeLoader { } /** - * @param target Target. + * @param errs Errors collection. * @param src Source. + * @param target Target. + * @return {@code True} if resource was found and loaded. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + private static boolean extract(Collection errs, ZipEntry src, JarFile jar, File target) { + FileOutputStream os = null; + InputStream is = null; + + try { + if (!target.exists() || !haveEqualMD5(target, jar.getInputStream(src))) { + is = jar.getInputStream(src); + + if (is != null) { + os = new FileOutputStream(target); + + int read; + + byte[] buf = new byte[4096]; + + while ((read = is.read(buf)) != -1) + os.write(buf, 0, read); + } + } + + // chmod 775. + if (!U.isWindows()) + Runtime.getRuntime().exec(new String[] {"chmod", "775", target.getCanonicalPath()}).waitFor(); + + System.load(target.getPath()); + + return true; + } + catch (IOException | UnsatisfiedLinkError | InterruptedException | NoSuchAlgorithmException e) { + errs.add(e); + } + finally { + U.closeQuiet(os); + U.closeQuiet(is); + } + + return false; + } + + /** + * @param target Target. + * @param srcIS Source input stream. * @return {@code True} if target md5-sum equal to source md5-sum. * @throws NoSuchAlgorithmException If md5 algorithm was not found. * @throws IOException If an I/O exception occurs. */ - private static boolean haveEqualMD5(File target, URL src) throws NoSuchAlgorithmException, IOException { - try (InputStream targetIS = new FileInputStream(target); - InputStream srcIS = src.openStream()) { - - String targetMD5 = U.calculateMD5(targetIS); - String srcMD5 = U.calculateMD5(srcIS); + private static boolean haveEqualMD5(File target, InputStream srcIS) throws NoSuchAlgorithmException, IOException { + try { + try (InputStream targetIS = new FileInputStream(target)) { + String targetMD5 = U.calculateMD5(targetIS); + String srcMD5 = U.calculateMD5(srcIS); - return targetMD5.equals(srcMD5); + return targetMD5.equals(srcMD5); + } + } + finally { + srcIS.close(); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java index 5185856..102c5b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java @@ -146,7 +146,7 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint { /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(log); pid = IpcSharedMemoryUtils.pid(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java new file mode 100644 index 0000000..e05c37a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.ipc.shmem.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * + */ +public class GridShmemCommunicationClient extends GridAbstractCommunicationClient { + /** */ + private final IpcSharedMemoryClientEndpoint shmem; + + /** */ + private final ByteBuffer writeBuf; + + /** */ + private final MessageFormatter formatter; + + /** + * @param metricsLsnr Metrics listener. + * @param port Shared memory IPC server port. + * @param connTimeout Connection timeout. + * @param log Logger. + * @param formatter Message formatter. + * @throws IgniteCheckedException If failed. + */ + public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr, + int port, + long connTimeout, + IgniteLogger log, + MessageFormatter formatter) + throws IgniteCheckedException + { + super(metricsLsnr); + + assert metricsLsnr != null; + assert port > 0 && port < 0xffff; + assert connTimeout >= 0; + + shmem = new IpcSharedMemoryClientEndpoint(port, (int)connTimeout, log); + + writeBuf = ByteBuffer.allocate(8 << 10); + + writeBuf.order(ByteOrder.nativeOrder()); + + this.formatter = formatter; + } + + /** {@inheritDoc} */ + @Override public synchronized void doHandshake(IgniteInClosure2X handshakeC) + throws IgniteCheckedException { + handshakeC.applyx(shmem.inputStream(), shmem.outputStream()); + } + + /** {@inheritDoc} */ + @Override public boolean close() { + boolean res = super.close(); + + if (res) + shmem.close(); + + return res; + } + + /** {@inheritDoc} */ + @Override public void forceClose() { + super.forceClose(); + + // Do not call forceClose() here. + shmem.close(); + } + + /** {@inheritDoc} */ + @Override public synchronized void sendMessage(byte[] data, int len) throws IgniteCheckedException { + if (closed()) + throw new IgniteCheckedException("Communication client was closed: " + this); + + try { + shmem.outputStream().write(data, 0, len); + + metricsLsnr.onBytesSent(len); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to send message to remote node: " + shmem, e); + } + + markUsed(); + } + + /** {@inheritDoc} */ + @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg) + throws IgniteCheckedException { + if (closed()) + throw new IgniteCheckedException("Communication client was closed: " + this); + + assert writeBuf.hasArray(); + + try { + int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf, formatter.writer()); + + metricsLsnr.onBytesSent(cnt); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to send message to remote node: " + shmem, e); + } + + markUsed(); + + return false; + } + + /** {@inheritDoc} */ + @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridShmemCommunicationClient.class, this, super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 359de1c..a661965 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -25,15 +25,19 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.ipc.shmem.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.nio.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.*; +import org.apache.ignite.thread.*; import org.jetbrains.annotations.*; import org.jsr166.*; @@ -139,6 +143,10 @@ import static org.apache.ignite.events.EventType.*; @IgniteSpiConsistencyChecked(optional = false) public class TcpCommunicationSpi extends IgniteSpiAdapter implements CommunicationSpi, TcpCommunicationSpiMBean { + /** IPC error message. */ + public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " + + "(switching to TCP, may be slower)."; + /** Node attribute that is mapped to node IP addresses (value is comm.tcp.addrs). */ public static final String ATTR_ADDRS = "comm.tcp.addrs"; @@ -148,12 +156,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Node attribute that is mapped to node port number (value is comm.tcp.port). */ public static final String ATTR_PORT = "comm.tcp.port"; + /** Node attribute that is mapped to node port number (value is comm.shmem.tcp.port). */ + public static final String ATTR_SHMEM_PORT = "comm.shmem.tcp.port"; + /** Node attribute that is mapped to node's external addresses (value is comm.tcp.ext-addrs). */ public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs"; /** Default port which node sets listener to (value is 47100). */ public static final int DFLT_PORT = 47100; + /** Default port which node sets listener for shared memory connections (value is 48100). */ + public static final int DFLT_SHMEM_PORT = 48100; + /** Default idle connection timeout (value is 30000ms). */ public static final long DFLT_IDLE_CONN_TIMEOUT = 30000; @@ -287,7 +301,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert ses.accepted(); if (msg instanceof NodeIdMessage) - sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0); + sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0); else { assert msg instanceof HandshakeMessage : msg; @@ -316,6 +330,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridCommunicationClient oldClient = clients.get(sndId); + boolean hasShmemClient = false; + if (oldClient != null) { if (oldClient instanceof GridTcpNioCommunicationClient) { if (log.isDebugEnabled()) @@ -327,6 +343,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return; } + else { + assert oldClient instanceof GridShmemCommunicationClient; + + hasShmemClient = true; + } } GridFutureAdapter fut = new GridFutureAdapter<>(); @@ -353,10 +374,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return; } + else { + assert oldClient instanceof GridShmemCommunicationClient; + + hasShmemClient = true; + } } boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), - new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut)); + new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); if (log.isDebugEnabled()) log.debug("Received incoming connection from remote node " + @@ -365,7 +391,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (reserved) { try { GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg0.received(), true); + connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); fut.onDone(client); } @@ -387,11 +413,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } else { boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), - new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut)); + new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); if (reserved) { GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg0.received(), true); + connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); fut.onDone(client); } @@ -459,6 +485,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param node Node. * @param rcvCnt Number of received messages.. * @param sndRes If {@code true} sends response for recovery handshake. + * @param createClient If {@code true} creates NIO communication client. * @return Client. */ private GridTcpNioCommunicationClient connected( @@ -466,7 +493,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridNioSession ses, ClusterNode node, long rcvCnt, - boolean sndRes) { + boolean sndRes, + boolean createClient) { recovery.onHandshake(rcvCnt); ses.recoveryDescriptor(recovery); @@ -478,12 +506,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter recovery.connected(); - GridTcpNioCommunicationClient client = new GridTcpNioCommunicationClient(ses, log); + GridTcpNioCommunicationClient client = null; - GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client); + if (createClient) { + client = new GridTcpNioCommunicationClient(ses, log); - assert oldClient == null : "Client already created [node=" + node + ", client=" + client + + GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client); + + assert oldClient == null : "Client already created [node=" + node + ", client=" + client + ", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']'; + } return client; } @@ -511,22 +543,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** */ private final GridFutureAdapter fut; + /** */ + private final boolean createClient; + /** * @param ses Incoming session. * @param recoveryDesc Recovery descriptor. * @param rmtNode Remote node. * @param msg Handshake message. + * @param createClient If {@code true} creates NIO communication client.. * @param fut Connect future. */ ConnectClosure(GridNioSession ses, GridNioRecoveryDescriptor recoveryDesc, ClusterNode rmtNode, HandshakeMessage msg, + boolean createClient, GridFutureAdapter fut) { this.ses = ses; this.recoveryDesc = recoveryDesc; this.rmtNode = rmtNode; this.msg = msg; + this.createClient = createClient; this.fut = fut; } @@ -539,7 +577,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter msgFut.get(); GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg.received(), false); + connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient); fut.onDone(client); } @@ -588,6 +626,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Local port range. */ private int locPortRange = DFLT_PORT_RANGE; + /** Local port which node uses to accept shared memory connections. */ + private int shmemPort = DFLT_SHMEM_PORT; + /** Allocate direct buffer or heap buffer. */ private boolean directBuf = true; @@ -622,6 +663,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** NIO server. */ private GridNioServer nioSrvr; + /** Shared memory server. */ + private IpcSharedMemoryServerEndpoint shmemSrv; + /** {@code TCP_NODELAY} option value for created sockets. */ private boolean tcpNoDelay = DFLT_TCP_NODELAY; @@ -636,6 +680,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Recovery and idle clients handler. */ private CommunicationWorker commWorker; + + /** Shared memory accept worker. */ + private ShmemAcceptWorker shmemAcceptWorker; + + /** Shared memory workers. */ + private final Collection shmemWorkers = new ConcurrentLinkedDeque8<>(); /** Clients. */ private final ConcurrentMap clients = GridConcurrentFactory.newMap(); @@ -646,6 +696,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Bound port. */ private int boundTcpPort = -1; + /** Bound port for shared memory server. */ + private int boundTcpShmemPort = -1; + /** Count of selectors to use in TCP server. */ private int selectorsCnt = DFLT_SELECTORS_CNT; @@ -789,6 +842,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** + * Sets local port to accept shared memory connections. + *

+ * If set to {@code -1} shared memory communication will be disabled. + *

+ * If not provided, default value is {@link #DFLT_SHMEM_PORT}. + * + * @param shmemPort Port number. + */ + @IgniteSpiConfiguration(optional = true) + public void setSharedMemoryPort(int shmemPort) { + this.shmemPort = shmemPort; + } + + /** {@inheritDoc} */ + @Override public int getSharedMemoryPort() { + return shmemPort; + } + + /** * Sets maximum idle connection timeout upon which a connection * to client will be closed. *

@@ -1153,6 +1225,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assertParameter(sockRcvBuf >= 0, "sockRcvBuf >= 0"); assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0"); assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0"); + assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1"); assertParameter(reconCnt > 0, "reconnectCnt > 0"); assertParameter(selectorsCnt > 0, "selectorsCnt > 0"); assertParameter(minBufferedMsgCnt >= 0, "minBufferedMsgCnt >= 0"); @@ -1178,6 +1251,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } try { + shmemSrv = resetShmemServer(); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to start shared memory communication server.", e); + } + + try { // This method potentially resets local port to the value // local node was bound to. nioSrvr = resetNioServer(); @@ -1197,6 +1277,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter createSpiAttributeName(ATTR_ADDRS), addrs.get1(), createSpiAttributeName(ATTR_HOST_NAMES), addrs.get2(), createSpiAttributeName(ATTR_PORT), boundTcpPort, + createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort : null, createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs); } catch (IOException | IgniteCheckedException e) { @@ -1223,6 +1304,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug(configInfo("tcpNoDelay", tcpNoDelay)); log.debug(configInfo("sockSndBuf", sockSndBuf)); log.debug(configInfo("sockRcvBuf", sockRcvBuf)); + log.debug(configInfo("shmemPort", shmemPort)); log.debug(configInfo("msgQueueLimit", msgQueueLimit)); log.debug(configInfo("minBufferedMsgCnt", minBufferedMsgCnt)); log.debug(configInfo("connTimeout", connTimeout)); @@ -1239,6 +1321,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter registerMBean(gridName, this, TcpCommunicationSpiMBean.class); + if (shmemSrv != null) { + shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv); + + new IgniteThread(shmemAcceptWorker).start(); + } + nioSrvr.start(); commWorker = new CommunicationWorker(); @@ -1254,6 +1342,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { spiCtx.registerPort(boundTcpPort, IgnitePortProtocol.TCP); + // SPI can start without shmem port. + if (boundTcpShmemPort > 0) + spiCtx.registerPort(boundTcpShmemPort, IgnitePortProtocol.TCP); + spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); ctxInitLatch.countDown(); @@ -1294,7 +1386,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // If configured TCP port is busy, find first available in range. for (int port = locPort; port < locPort + locPortRange; port++) { try { - MessageFactory messageFactory = new MessageFactory() { + MessageFactory msgFactory = new MessageFactory() { private MessageFactory impl; @Nullable @Override public Message create(byte type) { @@ -1307,7 +1399,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } }; - MessageFormatter messageFormatter = new MessageFormatter() { + MessageFormatter msgFormatter = new MessageFormatter() { private MessageFormatter impl; @Override public MessageWriter writer() { @@ -1329,7 +1421,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } }; - GridDirectParser parser = new GridDirectParser(messageFactory, messageFormatter); + GridDirectParser parser = new GridDirectParser(msgFactory, msgFormatter); IgnitePredicate skipRecoveryPred = new IgnitePredicate() { @Override public boolean apply(Message msg) { @@ -1356,7 +1448,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .writeTimeout(sockWriteTimeout) .filters(new GridNioCodecFilter(parser, log, true), new GridConnectionBytesVerifyFilter(log)) - .messageFormatter(messageFormatter) + .messageFormatter(msgFormatter) .skipRecoveryPredicate(skipRecoveryPred) .build(); @@ -1388,6 +1480,55 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx); } + /** + * Creates new shared memory communication server. + * @return Server. + * @throws IgniteCheckedException If failed. + */ + @Nullable private IpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException { + if (boundTcpShmemPort >= 0) + throw new IgniteCheckedException("Shared memory server was already created on port " + boundTcpShmemPort); + + if (shmemPort == -1 || U.isWindows()) + return null; + + IgniteCheckedException lastEx = null; + + // If configured TCP port is busy, find first available in range. + for (int port = shmemPort; port < shmemPort + locPortRange; port++) { + try { + IpcSharedMemoryServerEndpoint srv = + new IpcSharedMemoryServerEndpoint(log, ignite.configuration().getNodeId(), gridName); + + srv.setPort(port); + + srv.omitOutOfResourcesWarning(true); + + srv.start(); + + boundTcpShmemPort = port; + + // Ack Port the TCP server was bound to. + if (log.isInfoEnabled()) + log.info("Successfully bound shared memory communication to TCP port [port=" + boundTcpShmemPort + + ", locHost=" + locHost + ']'); + + return srv; + } + catch (IgniteCheckedException e) { + lastEx = e; + + if (log.isDebugEnabled()) + log.debug("Failed to bind to local port (will try next port within range) [port=" + port + + ", locHost=" + locHost + ']'); + } + } + + // If free port wasn't found. + throw new IgniteCheckedException("Failed to bind shared memory communication to any port within range [startPort=" + + locPort + ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx); + } + /** {@inheritDoc} */ @Override public void spiStop() throws IgniteSpiException { assert isNodeStopping(); @@ -1399,9 +1540,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter nioSrvr.stop(); U.interrupt(commWorker); - U.join(commWorker, log); + U.cancel(shmemAcceptWorker); + U.join(shmemAcceptWorker, log); + + U.cancel(shmemWorkers); + U.join(shmemWorkers, log); + + shmemWorkers.clear(); + // Force closing on stop (safety). for (GridCommunicationClient client : clients.values()) client.forceClose(); @@ -1612,13 +1760,110 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @Nullable protected GridCommunicationClient createNioClient(ClusterNode node) throws IgniteCheckedException { assert node != null; - if (getSpiContext().localNode() == null) + Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT)); + + ClusterNode locNode = getSpiContext().localNode(); + + if (locNode == null) throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)"); + // If remote node has shared memory server enabled and has the same set of MACs + // then we are likely to run on the same host and shared memory communication could be tried. + if (shmemPort != null && U.sameMacs(locNode, node)) { + try { + return createShmemClient(node, shmemPort); + } + catch (IgniteCheckedException e) { + if (e.hasCause(IpcOutOfSystemResourcesException.class)) + // Has cause or is itself the IpcOutOfSystemResourcesException. + LT.warn(log, null, OUT_OF_RESOURCES_TCP_MSG); + else if (getSpiContext().node(node.id()) != null) + LT.warn(log, null, e.getMessage()); + else if (log.isDebugEnabled()) + log.debug("Failed to establish shared memory connection with local node (node has left): " + + node.id()); + } + } + return createTcpClient(node); } /** + * @param node Node. + * @param port Port. + * @return Client. + * @throws IgniteCheckedException If failed. + */ + @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node, Integer port) throws IgniteCheckedException { + int attempt = 1; + + int connectAttempts = 1; + + long connTimeout0 = connTimeout; + + while (true) { + GridCommunicationClient client; + + try { + client = new GridShmemCommunicationClient(metricsLsnr, + port, + connTimeout, + log, + getSpiContext().messageFormatter()); + } + catch (IgniteCheckedException e) { + // Reconnect for the second time, if connection is not established. + if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) { + connectAttempts++; + + continue; + } + + throw e; + } + + try { + safeHandshake(client, null, node.id(), connTimeout0); + } + catch (HandshakeTimeoutException e) { + if (log.isDebugEnabled()) + log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 + + ", err=" + e.getMessage() + ", client=" + client + ']'); + + client.forceClose(); + + if (attempt == reconCnt || connTimeout0 > maxConnTimeout) { + if (log.isDebugEnabled()) + log.debug("Handshake timedout (will stop attempts to perform the handshake) " + + "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout + + ", attempt=" + attempt + ", reconCnt=" + reconCnt + + ", err=" + e.getMessage() + ", client=" + client + ']'); + + throw e; + } + else { + attempt++; + + connTimeout0 *= 2; + + continue; + } + } + catch (IgniteCheckedException | RuntimeException | Error e) { + if (log.isDebugEnabled()) + log.debug( + "Caught exception (will close client) [err=" + e.getMessage() + ", client=" + client + ']'); + + client.forceClose(); + + throw e; + } + + return client; + } + } + + /** * Establish TCP connection to remote node and returns client. * * @param node Remote node. @@ -2095,6 +2340,144 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** + * This worker takes responsibility to shut the server down when stopping, + * No other thread shall stop passed server. + */ + private class ShmemAcceptWorker extends GridWorker { + /** */ + private final IpcSharedMemoryServerEndpoint srv; + + /** + * @param srv Server. + */ + ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) { + super(gridName, "shmem-communication-acceptor", TcpCommunicationSpi.this.log); + + this.srv = srv; + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + try { + while (!Thread.interrupted()) { + ShmemWorker e = new ShmemWorker(srv.accept()); + + shmemWorkers.add(e); + + new IgniteThread(e).start(); + } + } + catch (IgniteCheckedException e) { + if (!isCancelled()) + U.error(log, "Shmem server failed.", e); + } + finally { + srv.close(); + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + super.cancel(); + + srv.close(); + } + } + + /** + * + */ + private class ShmemWorker extends GridWorker { + /** */ + private final IpcEndpoint endpoint; + + /** + * @param endpoint Endpoint. + */ + private ShmemWorker(IpcEndpoint endpoint) { + super(gridName, "shmem-worker", TcpCommunicationSpi.this.log); + + this.endpoint = endpoint; + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + try { + MessageFactory msgFactory = new MessageFactory() { + private MessageFactory impl; + + @Nullable @Override public Message create(byte type) { + if (impl == null) + impl = getSpiContext().messageFactory(); + + assert impl != null; + + return impl.create(type); + } + }; + + MessageFormatter msgFormatter = new MessageFormatter() { + private MessageFormatter impl; + + @Override public MessageWriter writer() { + if (impl == null) + impl = getSpiContext().messageFormatter(); + + assert impl != null; + + return impl.writer(); + } + + @Override public MessageReader reader(MessageFactory factory) { + if (impl == null) + impl = getSpiContext().messageFormatter(); + + assert impl != null; + + return impl.reader(factory); + } + }; + + IpcToNioAdapter adapter = new IpcToNioAdapter<>( + metricsLsnr, + log, + endpoint, + srvLsnr, + msgFormatter, + new GridNioCodecFilter(new GridDirectParser(msgFactory, msgFormatter), log, true), + new GridConnectionBytesVerifyFilter(log) + ); + + adapter.serve(); + } + finally { + shmemWorkers.remove(this); + + endpoint.close(); + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + super.cancel(); + + endpoint.close(); + } + + /** @{@inheritDoc} */ + @Override protected void cleanup() { + super.cleanup(); + + endpoint.close(); + } + + /** @{@inheritDoc} */ + @Override public String toString() { + return S.toString(ShmemWorker.class, this); + } + } + + /** * */ private class CommunicationWorker extends IgniteSpiThread { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java index 6f5a738..fe4f581 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java @@ -44,6 +44,14 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean { public int getLocalPort(); /** + * Gets local port for shared memory communication. + * + * @return Port number. + */ + @MXBeanDescription("Shared memory endpoint port number.") + public int getSharedMemoryPort(); + + /** * Gets maximum number of local ports tried if all previously * tried ports are occupied. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 9bfbd15..128d452 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -356,6 +356,21 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract } /** + * @throws Exception If failed. + */ + public void testRemoveAllSkipStore() throws Exception { + IgniteCache jcache = jcache(); + + jcache.putAll(F.asMap("1", 1, "2", 2, "3", 3)); + + jcache.withSkipStore().removeAll(); + + assertEquals((Integer)1, jcache.get("1")); + assertEquals((Integer)2, jcache.get("2")); + assertEquals((Integer)3, jcache.get("3")); + } + + /** * @throws IgniteCheckedException If failed. */ public void testAtomicOps() throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index db9e6a8..7905565 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -807,6 +807,25 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testGetOrCreateMultiNodeTemplate() throws Exception { + final AtomicInteger idx = new AtomicInteger(); + + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Object call() throws Exception { + int idx0 = idx.getAndIncrement(); + + ignite(idx0 % nodeCount()).getOrCreateCache(DYNAMIC_CACHE_NAME); + + return null; + } + }, nodeCount() * 4, "runner"); + + ignite(0).destroyCache(DYNAMIC_CACHE_NAME); + } + + /** + * @throws Exception If failed. + */ public void testGetOrCreateNearOnlyMultiNode() throws Exception { checkGetOrCreateNear(true); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java new file mode 100644 index 0000000..24ebb7c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * + */ +public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int SRVS = 4; + + /** */ + private boolean client; + + /** */ + private boolean clientDiscovery; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + if (!clientDiscovery) + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true); + + cfg.setClientMode(client); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(PRIMARY_SYNC); + ccfg.setBackups(1); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(SRVS); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testManyClients() throws Exception { + manyClientsPutGet(); + } + + /** + * @throws Exception If failed. + */ + public void testManyClientsClientDiscovery() throws Exception { + clientDiscovery = true; + + manyClientsPutGet(); + } + + /** + * @throws Exception If failed. + */ + private void manyClientsPutGet() throws Exception { + client = true; + + final AtomicInteger idx = new AtomicInteger(SRVS); + + final AtomicBoolean stop = new AtomicBoolean(); + + final int THREADS = 30; + + final CountDownLatch latch = new CountDownLatch(THREADS); + + try { + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Object call() throws Exception { + try (Ignite ignite = startGrid(idx.getAndIncrement())) { + log.info("Started node: " + ignite.name()); + + assertTrue(ignite.configuration().isClientMode()); + + IgniteCache cache = ignite.cache(null); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int iter = 0; + + Integer key = rnd.nextInt(0, 1000); + + cache.put(key, iter++); + + assertNotNull(cache.get(key)); + + latch.countDown(); + + while (!stop.get()) { + key = rnd.nextInt(0, 1000); + + cache.put(key, iter++); + + assertNotNull(cache.get(key)); + } + + log.info("Stopping node: " + ignite.name()); + } + + return null; + } + }, THREADS, "client-thread"); + + latch.await(); + + Thread.sleep(10_000); + + log.info("Stop clients."); + + stop.set(true); + + fut.get(); + } + finally { + stop.set(true); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java index 96abe5f..8031315 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java @@ -50,6 +50,7 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); commSpi.setSocketWriteTimeout(1000); + commSpi.setSharedMemoryPort(-1); cfg.setCommunicationSpi(commSpi); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java index 9d41074..4601586 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java @@ -176,7 +176,7 @@ public class GridCachePartitionedPreloadLifecycleSelfTest extends GridCachePrelo for (int j = 0; j < G.allGrids().size(); j++) { GridCacheAdapter c2 = ((IgniteKernal)grid(j)).internalCache("two"); - CacheQuery> qry = c2.context().queries().createScanQuery(null, false); + CacheQuery> qry = c2.context().queries().createScanQuery(null, null, false); int totalCnt = F.sumInt(qry.execute(new IgniteReducer, Integer>() { @IgniteInstanceResource http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java index 62bf3f7..cc8217d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java @@ -179,7 +179,7 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa for (int j = 0; j < G.allGrids().size(); j++) { GridCacheAdapter c2 = ((IgniteKernal)grid(j)).internalCache("two"); - CacheQuery> qry = c2.context().queries().createScanQuery(null, false); + CacheQuery> qry = c2.context().queries().createScanQuery(null, null, false); final int i0 = j; final int j0 = i; @@ -207,8 +207,8 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa Object v1 = e.getValue(); Object v2 = ((IgniteKernal)grid).getCache("one").get(key); - assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 + - ", missedKey=" + key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2); + assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 + ", missedKey=" + + key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2); assertEquals(v1, v2); } catch (IgniteCheckedException e1) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java index 068a46c..6ccfbc2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java @@ -115,49 +115,81 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA * @throws Exception If failed. */ public void testQuery() throws Exception { - checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME)); + checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), false); - checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME)); + checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), false); + + checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), true); + + checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), true); } /** * @param cache Cache. + * @param scanPartitions Scan partitions. * @throws Exception If failed. */ @SuppressWarnings("unchecked") - private void checkQuery(GridCacheAdapter cache) throws Exception { + private void checkQuery(GridCacheAdapter cache, boolean scanPartitions) throws Exception { final int ENTRY_CNT = 500; - for (int i = 0; i < ENTRY_CNT; i++) - cache.getAndPut(new Key(i), new Person("p-" + i, i)); + Map> entries = new HashMap<>(); + + for (int i = 0; i < ENTRY_CNT; i++) { + Key key = new Key(i); + Person val = new Person("p-" + i, i); + + int part = cache.context().affinity().partition(key); + + cache.getAndPut(key, val); + + Map partEntries = entries.get(part); + + if (partEntries == null) + entries.put(part, partEntries = new HashMap<>()); + + partEntries.put(key, val); + } try { - CacheQuery> qry = cache.context().queries().createScanQuery( - new IgniteBiPredicate() { - @Override public boolean apply(Key key, Person p) { - assertEquals(key.id, (Integer)p.salary); + int partitions = scanPartitions ? cache.context().affinity().partitions() : 1; - return key.id % 2 == 0; - } - }, false); + for (int i = 0; i < partitions; i++) { + CacheQuery> qry = cache.context().queries().createScanQuery( + new IgniteBiPredicate() { + @Override public boolean apply(Key key, Person p) { + assertEquals(key.id, (Integer)p.salary); - Collection> res = qry.execute().get(); + return key.id % 2 == 0; + } + }, (scanPartitions ? i : null), false); - assertEquals(ENTRY_CNT / 2, res.size()); + Collection> res = qry.execute().get(); - for (Map.Entry e : res) { - Key k = e.getKey(); - Person p = e.getValue(); + if (!scanPartitions) + assertEquals(ENTRY_CNT / 2, res.size()); - assertEquals(k.id, (Integer)p.salary); - assertEquals(0, k.id % 2); - } + for (Map.Entry e : res) { + Key k = e.getKey(); + Person p = e.getValue(); - qry = cache.context().queries().createScanQuery(null, false); + assertEquals(k.id, (Integer)p.salary); + assertEquals(0, k.id % 2); - res = qry.execute().get(); + if (scanPartitions) { + Map partEntries = entries.get(i); - assertEquals(ENTRY_CNT, res.size()); + assertEquals(p, partEntries.get(k)); + } + } + + qry = cache.context().queries().createScanQuery(null, (scanPartitions ? i : null), false); + + res = qry.execute().get(); + + if (!scanPartitions) + assertEquals(ENTRY_CNT, res.size()); + } testMultithreaded(cache, ENTRY_CNT / 2); } @@ -185,7 +217,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA return key.id % 2 == 0; } - }, false); + }, null, false); for (int i = 0; i < 250; i++) { Collection> res = qry.execute().get(); @@ -229,7 +261,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA return val % 2 == 0; } - }, false); + }, null, false); Collection> res = qry.execute().get(); @@ -244,7 +276,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA assertEquals(0, val % 2); } - qry = cache.context().queries().createScanQuery(null, false); + qry = cache.context().queries().createScanQuery(null, null, false); res = qry.execute().get(); @@ -284,7 +316,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA return key % 2 == 0; } - }, false); + }, null, false); Collection> res = qry.execute().get(); @@ -299,7 +331,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA assertEquals(0, key % 2); } - qry = cache.context().queries().createScanQuery(null, false); + qry = cache.context().queries().createScanQuery(null, null, false); res = qry.execute().get(); @@ -367,5 +399,29 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA this.name = name; this.salary = salary; } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Person person = (Person)o; + + if (salary != person.salary) + return false; + + return !(name != null ? !name.equals(person.name) : person.name != null); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = name != null ? name.hashCode() : 0; + + return 31 * result + salary; + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IgfsSharedMemoryTestServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IgfsSharedMemoryTestServer.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IgfsSharedMemoryTestServer.java index 1a8fd10..e220031 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IgfsSharedMemoryTestServer.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IgfsSharedMemoryTestServer.java @@ -49,6 +49,8 @@ public class IgfsSharedMemoryTestServer { srv.start(); + System.out.println("IPC shared memory server endpoint started"); + IpcEndpoint clientEndpoint = srv.accept(); is = clientEndpoint.inputStream(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java index 2ddf6f3..c6f590e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java @@ -42,7 +42,7 @@ public class IpcSharedMemoryCrashDetectionSelfTest extends GridCommonAbstractTes @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(log()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java index 7dc0870..4afb64b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java @@ -51,7 +51,7 @@ public class IpcSharedMemorySpaceSelfTest extends GridCommonAbstractTest { @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(log()); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java index 4c5413c..176429e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java @@ -31,7 +31,7 @@ public class IpcSharedMemoryUtilsSelfTest extends GridCommonAbstractTest { @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(log()); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java index 8ff827b..8fee239 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java @@ -37,7 +37,7 @@ public class LoadWithCorruptedLibFileTestRunner { createCorruptedLibFile(); - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(null); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java index 28495af..89eeda1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java @@ -43,7 +43,7 @@ public class IpcSharedMemoryBenchmarkReader implements IpcSharedMemoryBenchmarkP * @throws IgniteCheckedException If failed. */ public static void main(String[] args) throws IgniteCheckedException { - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(null); int nThreads = (args.length > 0 ? Integer.parseInt(args[0]) : 1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java index 2ade145..e8a8402 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java @@ -42,7 +42,7 @@ public class IpcSharedMemoryBenchmarkWriter implements IpcSharedMemoryBenchmarkP * @throws IgniteCheckedException If failed. */ public static void main(String[] args) throws IgniteCheckedException { - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(null); int nThreads = args.length > 0 ? Integer.parseInt(args[0]) : 1; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java index 422d608..ea5b716 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java @@ -455,6 +455,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest { spi.setTcpNoDelay(true); spi.setConnectionBufferSize(0); + spi.setSharedMemoryPort(-1); info("Comm SPI: " + spi); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java index ed9e0cf..744635d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java @@ -115,6 +115,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { commSpi.setLocalAddress("127.0.0.1"); commSpi.setLocalPort(commLocPort); commSpi.setLocalPortRange(1); + commSpi.setSharedMemoryPort(-1); cfg.setCommunicationSpi(commSpi); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java index 8d27485..eee38a5 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java @@ -37,10 +37,23 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica /** */ public static final int IDLE_CONN_TIMEOUT = 2000; + /** */ + private final boolean useShmem; + + /** + * @param useShmem Use shared mem flag. + */ + protected GridTcpCommunicationSpiAbstractTest(boolean useShmem) { + this.useShmem = useShmem; + } + /** {@inheritDoc} */ @Override protected CommunicationSpi getSpi(int idx) { TcpCommunicationSpi spi = new TcpCommunicationSpi(); + if (!useShmem) + spi.setSharedMemoryPort(-1); + spi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); spi.setIdleConnectionTimeout(IDLE_CONN_TIMEOUT); spi.setTcpNoDelay(tcpNoDelay()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java index 2d175f5..a5cd7ae 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java @@ -181,8 +181,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest() { - @Override - public Long call() throws Exception { + @Override public Long call() throws Exception { long dummyRes = 0; List list = new ArrayList<>(); @@ -300,6 +299,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest spiRsrcs = new ArrayList<>(); /** SPIs */ - private static final Map> spis = - new ConcurrentHashMap<>(); + private static final Map> spis = new ConcurrentHashMap<>(); /** Listeners. */ private static final Map lsnrs = new HashMap<>(); @@ -80,9 +82,19 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac } /** + * @param useShmem Use shared mem. */ - public GridTcpCommunicationSpiMultithreadedSelfTest() { + protected GridTcpCommunicationSpiMultithreadedSelfTest(boolean useShmem) { super(false); + + this.useShmem = useShmem; + } + + /** + * + */ + public GridTcpCommunicationSpiMultithreadedSelfTest() { + this(false); } /** @@ -413,6 +425,9 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac private CommunicationSpi newCommunicationSpi() { TcpCommunicationSpi spi = new TcpCommunicationSpi(); + if (!useShmem) + spi.setSharedMemoryPort(-1); + spi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); spi.setIdleConnectionTimeout(IDLE_CONN_TIMEOUT); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java new file mode 100644 index 0000000..590b426 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedShmemTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +/** + * + */ +public class GridTcpCommunicationSpiMultithreadedShmemTest extends GridTcpCommunicationSpiMultithreadedSelfTest { + /** */ + public GridTcpCommunicationSpiMultithreadedShmemTest() { + super(true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index c0f0b11..1a4ba22 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -324,6 +324,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest protected TcpCommunicationSpi getSpi(int idx) { TcpCommunicationSpi spi = new TcpCommunicationSpi(); + spi.setSharedMemoryPort(-1); spi.setLocalPort(port++); spi.setIdleConnectionTimeout(10_000); spi.setConnectTimeout(10_000); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java new file mode 100644 index 0000000..5746a3c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiShmemSelfTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.testframework.junits.spi.*; + +/** + * + */ +@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI") +public class GridTcpCommunicationSpiShmemSelfTest extends GridTcpCommunicationSpiAbstractTest { + /** + * + */ + public GridTcpCommunicationSpiShmemSelfTest() { + super(true); + } + + /** {@inheritDoc} */ + @Override protected boolean tcpNoDelay() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java index 32bced2..c27a86f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpSelfTest.java @@ -24,6 +24,13 @@ import org.apache.ignite.testframework.junits.spi.*; */ @GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI") public class GridTcpCommunicationSpiTcpSelfTest extends GridTcpCommunicationSpiAbstractTest { + /** + * + */ + public GridTcpCommunicationSpiTcpSelfTest() { + super(false); + } + /** {@inheritDoc} */ @Override protected boolean tcpNoDelay() { return true;