Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9E47218B4F for ; Wed, 10 Jun 2015 09:23:16 +0000 (UTC) Received: (qmail 90322 invoked by uid 500); 10 Jun 2015 09:23:16 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 90291 invoked by uid 500); 10 Jun 2015 09:23:16 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 90282 invoked by uid 99); 10 Jun 2015 09:23:16 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jun 2015 09:23:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id E94D4CCF85 for ; Wed, 10 Jun 2015 09:23:15 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id UYaQJx4rsan2 for ; Wed, 10 Jun 2015 09:23:13 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 9154125405 for ; Wed, 10 Jun 2015 09:23:11 +0000 (UTC) Received: (qmail 89782 invoked by uid 99); 10 Jun 2015 09:23:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jun 2015 09:23:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 97B45E0385; Wed, 10 Jun 2015 09:23:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 10 Jun 2015 09:23:23 -0000 Message-Id: In-Reply-To: <82b053bdb44f452fa487372a6ee7469b@git.apache.org> References: <82b053bdb44f452fa487372a6ee7469b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [14/50] incubator-ignite git commit: # ignite-883 # ignite-883 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fb827a77 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fb827a77 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fb827a77 Branch: refs/heads/ignite-929 Commit: fb827a7784614343ae639ea8b856d2f9f88d46db Parents: db57652 Author: sboikov Authored: Fri Jun 5 11:41:46 2015 +0300 Committer: sboikov Committed: Fri Jun 5 15:10:00 2015 +0300 ---------------------------------------------------------------------- .../datastructures/DataStructuresProcessor.java | 31 +- .../timeout/GridSpiTimeoutObject.java | 14 + .../util/nio/GridCommunicationClient.java | 6 - .../util/nio/GridTcpCommunicationClient.java | 554 ------------------- .../util/nio/GridTcpNioCommunicationClient.java | 8 - .../org/apache/ignite/spi/IgniteSpiAdapter.java | 4 + .../communication/tcp/TcpCommunicationSpi.java | 97 +--- .../tcp/TcpCommunicationSpiMBean.java | 2 - .../internal/util/nio/GridNioSelfTest.java | 2 +- .../GridTcpCommunicationSpiAbstractTest.java | 4 +- ...mmunicationSpiConcurrentConnectSelfTest.java | 2 +- .../GridTcpCommunicationSpiConfigSelfTest.java | 2 - ...cpCommunicationSpiMultithreadedSelfTest.java | 2 +- .../discovery/AbstractDiscoverySelfTest.java | 13 +- 14 files changed, 61 insertions(+), 680 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 2138639..aa3bfe2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -101,7 +101,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { private IgniteInternalCache> utilityDataCache; /** */ - private UUID qryId; + private volatile UUID qryId; /** * @param ctx Context. @@ -144,11 +144,22 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { seqView = atomicsCache; dsCacheCtx = atomicsCache.context(); + } + } - qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(), - new DataStructuresEntryFilter(), - dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(), - false); + /** + * @throws IgniteCheckedException If failed. + */ + private void startQuery() throws IgniteCheckedException { + if (qryId == null) { + synchronized (this) { + if (qryId == null) { + qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(), + new DataStructuresEntryFilter(), + dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(), + false); + } + } } } @@ -178,6 +189,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); + startQuery(); + return getAtomic(new IgniteOutClosureX() { @Override public IgniteAtomicSequence applyx() throws IgniteCheckedException { GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); @@ -304,6 +317,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); + startQuery(); + return getAtomic(new IgniteOutClosureX() { @Override public IgniteAtomicLong applyx() throws IgniteCheckedException { final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); @@ -507,6 +522,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); + startQuery(); + return getAtomic(new IgniteOutClosureX() { @Override public IgniteAtomicReference applyx() throws IgniteCheckedException { GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); @@ -608,6 +625,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); + startQuery(); + return getAtomic(new IgniteOutClosureX() { @Override public IgniteAtomicStamped applyx() throws IgniteCheckedException { GridCacheInternalKeyImpl key = new GridCacheInternalKeyImpl(name); @@ -916,6 +935,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); + startQuery(); + return getAtomic(new IgniteOutClosureX() { @Override public IgniteCountDownLatch applyx() throws IgniteCheckedException { GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java index 82267a2..a0fd9b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java @@ -53,6 +53,20 @@ public class GridSpiTimeoutObject implements GridTimeoutObject { } /** {@inheritDoc} */ + @Override public int hashCode() { + assert false; + + return super.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + assert false; + + return super.equals(obj); + } + + /** {@inheritDoc} */ @Override public final String toString() { return S.toString(GridSpiTimeoutObject.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java index 2f7fd88..693a5a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java @@ -100,12 +100,6 @@ public interface GridCommunicationClient { public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException; /** - * @param timeout Timeout. - * @throws IOException If failed. - */ - public void flushIfNeeded(long timeout) throws IOException; - - /** * @return {@code True} if send is asynchronous. */ public boolean async(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java deleted file mode 100644 index 72c20f8..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java +++ /dev/null @@ -1,554 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.nio; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.plugin.extensions.communication.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.nio.*; -import java.util.*; -import java.util.concurrent.locks.*; - -/** - * Grid client for NIO server. - */ -public class GridTcpCommunicationClient extends GridAbstractCommunicationClient { - /** Socket. */ - private final Socket sock; - - /** Output stream. */ - private final UnsafeBufferedOutputStream out; - - /** Minimum buffered message count. */ - private final int minBufferedMsgCnt; - - /** Communication buffer size ratio. */ - private final double bufSizeRatio; - - /** */ - private final ByteBuffer writeBuf; - - /** */ - private final MessageFormatter formatter; - - /** - * @param metricsLsnr Metrics listener. - * @param addr Address. - * @param locHost Local address. - * @param connTimeout Connect timeout. - * @param tcpNoDelay Value for {@code TCP_NODELAY} socket option. - * @param sockRcvBuf Socket receive buffer. - * @param sockSndBuf Socket send buffer. - * @param bufSize Buffer size (or {@code 0} to disable buffer). - * @param minBufferedMsgCnt Minimum buffered message count. - * @param bufSizeRatio Communication buffer size ratio. - * @param formatter Message formatter. - * @throws IgniteCheckedException If failed. - */ - public GridTcpCommunicationClient( - GridNioMetricsListener metricsLsnr, - InetSocketAddress addr, - InetAddress locHost, - long connTimeout, - boolean tcpNoDelay, - int sockRcvBuf, - int sockSndBuf, - int bufSize, - int minBufferedMsgCnt, - double bufSizeRatio, - MessageFormatter formatter - ) throws IgniteCheckedException { - super(metricsLsnr); - - assert metricsLsnr != null; - assert addr != null; - assert locHost != null; - assert connTimeout >= 0; - assert bufSize >= 0; - - A.ensure(minBufferedMsgCnt >= 0, - "Value of minBufferedMessageCount property cannot be less than zero."); - A.ensure(bufSizeRatio > 0 && bufSizeRatio < 1, - "Value of bufSizeRatio property must be between 0 and 1 (exclusive)."); - - this.minBufferedMsgCnt = minBufferedMsgCnt; - this.bufSizeRatio = bufSizeRatio; - this.formatter = formatter; - - writeBuf = ByteBuffer.allocate(8 << 10); - - writeBuf.order(ByteOrder.nativeOrder()); - - sock = new Socket(); - - boolean success = false; - - try { - sock.bind(new InetSocketAddress(locHost, 0)); - - sock.setTcpNoDelay(tcpNoDelay); - - if (sockRcvBuf > 0) - sock.setReceiveBufferSize(sockRcvBuf); - - if (sockSndBuf > 0) - sock.setSendBufferSize(sockSndBuf); - - sock.connect(addr, (int)connTimeout); - - out = new UnsafeBufferedOutputStream(sock.getOutputStream(), bufSize); - - success = true; - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to connect to remote host " + - "[addr=" + addr + ", localHost=" + locHost + ']', e); - } - finally { - if (!success) - U.closeQuiet(sock); - } - } - - /** {@inheritDoc} */ - @Override public void doHandshake(IgniteInClosure2X handshakeC) throws IgniteCheckedException { - try { - handshakeC.applyx(sock.getInputStream(), sock.getOutputStream()); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to access IO streams when executing handshake with remote node: " + - sock.getRemoteSocketAddress(), e); - } - } - - /** {@inheritDoc} */ - @Override public boolean close() { - boolean res = super.close(); - - if (res) { - U.closeQuiet(out); - U.closeQuiet(sock); - } - - return res; - } - - /** {@inheritDoc} */ - @Override public void forceClose() { - super.forceClose(); - - try { - out.flush(); - } - catch (IOException ignored) { - // No-op. - } - - // Do not call (directly or indirectly) out.close() here - // since it may cause a deadlock. - out.forceClose(); - - U.closeQuiet(sock); - } - - /** {@inheritDoc} */ - @Override public void sendMessage(byte[] data, int len) throws IgniteCheckedException { - if (closed()) - throw new IgniteCheckedException("Client was closed: " + this); - - try { - out.write(data, 0, len); - - metricsLsnr.onBytesSent(len); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to send message to remote node: " + sock.getRemoteSocketAddress(), e); - } - - markUsed(); - } - - /** {@inheritDoc} */ - @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg) - throws IgniteCheckedException { - if (closed()) - throw new IgniteCheckedException("Client was closed: " + this); - - assert writeBuf.hasArray(); - - try { - int cnt = U.writeMessageFully(msg, out, writeBuf, formatter.writer()); - - metricsLsnr.onBytesSent(cnt); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to send message to remote node: " + sock.getRemoteSocketAddress(), e); - } - - markUsed(); - - return false; - } - - /** - * @param timeout Timeout. - * @throws IOException If failed. - */ - @Override public void flushIfNeeded(long timeout) throws IOException { - assert timeout > 0; - - out.flushOnTimeout(timeout); - } - - /** {@inheritDoc} */ - @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridTcpCommunicationClient.class, this, super.toString()); - } - - /** - * - */ - private class UnsafeBufferedOutputStream extends FilterOutputStream { - /** The internal buffer where data is stored. */ - private final byte buf[]; - - /** Current size. */ - private int size; - - /** Count. */ - private int cnt; - - /** Message count. */ - private int msgCnt; - - /** Total messages size. */ - private int totalCnt; - - /** Lock. */ - private final ReentrantLock lock = new ReentrantLock(); - - /** Last flushed timestamp. */ - private volatile long lastFlushed = U.currentTimeMillis(); - - /** Cached flush timeout. */ - private volatile long flushTimeout; - - /** Buffer adjusted timestamp. */ - private long lastAdjusted = U.currentTimeMillis(); - - /** - * Creates a new buffered output stream to write data to the - * specified underlying output stream. - * - * @param out The underlying output stream. - */ - UnsafeBufferedOutputStream(OutputStream out) { - this(out, 8192); - } - - /** - * Creates a new buffered output stream to write data to the - * specified underlying output stream with the specified buffer - * size. - * - * @param out The underlying output stream. - * @param size The buffer size. - */ - UnsafeBufferedOutputStream(OutputStream out, int size) { - super(out); - - assert size >= 0; - - this.size = size; - buf = size > 0 ? new byte[size] : null; - } - - /** {@inheritDoc} */ - @Override public void write(int b) throws IOException { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public void write(byte[] b, int off, int len) throws IOException { - assert b != null; - assert off == 0; - - // No buffering. - if (buf == null) { - lock.lock(); - - try { - out.write(b, 0, len); - } - finally { - lock.unlock(); - } - - return; - } - - // Buffering is enabled. - lock.lock(); - - try { - msgCnt++; - totalCnt += len; - - if (len >= size) { - flushLocked(); - - out.write(b, 0, len); - - lastFlushed = U.currentTimeMillis(); - - adjustBufferIfNeeded(); - - return; - } - - if (cnt + len > size) { - flushLocked(); - - messageToBuffer0(b, off, len, buf, 0); - - cnt = len; - - assert cnt < size; - - adjustBufferIfNeeded(); - - return; - } - - messageToBuffer0(b, 0, len, buf, cnt); - - cnt += len; - - if (cnt == size) - flushLocked(); - else - flushIfNeeded(); - } - finally { - lock.unlock(); - } - } - - /** - * @throws IOException If failed. - */ - private void flushIfNeeded() throws IOException { - assert lock.isHeldByCurrentThread(); - assert buf != null; - - long flushTimeout0 = flushTimeout; - - if (flushTimeout0 > 0) - flushOnTimeoutLocked(flushTimeout0); - } - - /** - * - */ - private void adjustBufferIfNeeded() { - assert lock.isHeldByCurrentThread(); - assert buf != null; - - long flushTimeout0 = flushTimeout; - - if (flushTimeout0 > 0) - adjustBufferLocked(flushTimeout0); - } - - /** {@inheritDoc} */ - @Override public void flush() throws IOException { - lock.lock(); - - try { - flushLocked(); - } - finally { - lock.unlock(); - } - } - - /** - * @param timeout Timeout. - * @throws IOException If failed. - */ - public void flushOnTimeout(long timeout) throws IOException { - assert buf != null; - assert timeout > 0; - - // Overwrite cached value. - flushTimeout = timeout; - - if (lastFlushed + timeout > U.currentTimeMillis() || !lock.tryLock()) - return; - - try { - flushOnTimeoutLocked(timeout); - } - finally { - lock.unlock(); - } - } - - /** - * @param timeout Timeout. - * @throws IOException If failed. - */ - private void flushOnTimeoutLocked(long timeout) throws IOException { - assert lock.isHeldByCurrentThread(); - assert timeout > 0; - - // Double check. - if (cnt == 0 || lastFlushed + timeout > U.currentTimeMillis()) - return; - - flushLocked(); - - adjustBufferLocked(timeout); - } - - /** - * @param timeout Timeout. - */ - private void adjustBufferLocked(long timeout) { - assert lock.isHeldByCurrentThread(); - assert timeout > 0; - - long time = U.currentTimeMillis(); - - if (lastAdjusted + timeout < time) { - if (msgCnt <= minBufferedMsgCnt) - size = 0; - else { - size = (int)(totalCnt * bufSizeRatio); - - if (size > buf.length) - size = buf.length; - } - - msgCnt = 0; - totalCnt = 0; - - lastAdjusted = time; - } - } - - /** - * @throws IOException If failed. - */ - private void flushLocked() throws IOException { - assert lock.isHeldByCurrentThread(); - - if (buf != null && cnt > 0) { - out.write(buf, 0, cnt); - - cnt = 0; - } - - out.flush(); - - lastFlushed = U.currentTimeMillis(); - } - - /** {@inheritDoc} */ - @Override public void close() throws IOException { - lock.lock(); - - try { - flushLocked(); - } - finally { - try { - out.close(); - } - finally { - lock.unlock(); - } - } - } - - /** - * Forcibly closes underlying stream ignoring any possible exception. - */ - public void forceClose() { - try { - out.close(); - } - catch (IOException ignored) { - // No-op. - } - } - - /** - * @param b Buffer to copy from. - * @param off Offset in source buffer. - * @param len Length. - * @param resBuf Result buffer. - * @param resOff Result offset. - */ - private void messageToBuffer(byte[] b, int off, int len, byte[] resBuf, int resOff) { - assert b.length == len; - assert off == 0; - assert resBuf.length >= resOff + len + 4; - - U.intToBytes(len, resBuf, resOff); - - U.arrayCopy(b, off, resBuf, resOff + 4, len); - } - - /** - * @param b Buffer to copy from (length included). - * @param off Offset in source buffer. - * @param len Length. - * @param resBuf Result buffer. - * @param resOff Result offset. - */ - private void messageToBuffer0(byte[] b, int off, int len, byte[] resBuf, int resOff) { - assert off == 0; - assert resBuf.length >= resOff + len; - - U.arrayCopy(b, off, resBuf, resOff, len); - } - - /** {@inheritDoc} */ - @Override public String toString() { - lock.lock(); - - try { - return S.toString(UnsafeBufferedOutputStream.class, this); - } - finally { - lock.unlock(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java index 788a8e6..abad875 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java @@ -122,14 +122,6 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie return false; } - /** - * @param timeout Timeout. - * @throws IOException If failed. - */ - @Override public void flushIfNeeded(long timeout) throws IOException { - // No-op. - } - /** {@inheritDoc} */ @Override public boolean async() { return true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index c9c633f..d095491 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -730,11 +730,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement /** {@inheritDoc} */ @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) { + assert ignite instanceof IgniteKernal : ignite; + ((IgniteKernal)ignite).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj)); } /** {@inheritDoc} */ @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) { + assert ignite instanceof IgniteKernal : ignite; + ((IgniteKernal)ignite).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj)); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index b324ab2..359de1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -157,12 +157,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Default idle connection timeout (value is 30000ms). */ public static final long DFLT_IDLE_CONN_TIMEOUT = 30000; - /** Default value for connection buffer flush frequency (value is 100 ms). */ - public static final long DFLT_CONN_BUF_FLUSH_FREQ = 100; - - /** Default value for connection buffer size (value is 0). */ - public static final int DFLT_CONN_BUF_SIZE = 0; - /** Default socket send and receive buffer size. */ public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024; @@ -603,13 +597,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Idle connection timeout. */ private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT; - /** Connection buffer flush frequency. */ - private volatile long connBufFlushFreq = DFLT_CONN_BUF_FLUSH_FREQ; - - /** Connection buffer size. */ - @SuppressWarnings("RedundantFieldInitialization") - private int connBufSize = DFLT_CONN_BUF_SIZE; - /** Connect timeout. */ private long connTimeout = DFLT_CONN_TIMEOUT; @@ -647,9 +634,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Socket write timeout. */ private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT; - /** Flush client worker. */ - private ClientFlushWorker clientFlushWorker; - /** Recovery and idle clients handler. */ private CommunicationWorker commWorker; @@ -876,31 +860,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * Sets connection buffer size. If set to {@code 0} connection buffer is disabled. - *

- * If not provided, default value is {@link #DFLT_CONN_BUF_SIZE}. * * @param connBufSize Connection buffer size. * @see #setConnectionBufferFlushFrequency(long) */ @IgniteSpiConfiguration(optional = true) public void setConnectionBufferSize(int connBufSize) { - this.connBufSize = connBufSize; + // No-op. } /** {@inheritDoc} */ @Override public int getConnectionBufferSize() { - return connBufSize; + return 0; } /** {@inheritDoc} */ @IgniteSpiConfiguration(optional = true) @Override public void setConnectionBufferFlushFrequency(long connBufFlushFreq) { - this.connBufFlushFreq = connBufFlushFreq; + // No-op. } /** {@inheritDoc} */ @Override public long getConnectionBufferFlushFrequency() { - return connBufFlushFreq; + return 0; } /** @@ -1168,8 +1150,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assertParameter(locPort <= 0xffff, "locPort < 0xffff"); assertParameter(locPortRange >= 0, "locPortRange >= 0"); assertParameter(idleConnTimeout > 0, "idleConnTimeout > 0"); - assertParameter(connBufFlushFreq > 0, "connBufFlushFreq > 0"); - assertParameter(connBufSize >= 0, "connBufSize >= 0"); assertParameter(sockRcvBuf >= 0, "sockRcvBuf >= 0"); assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0"); assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0"); @@ -1239,8 +1219,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug(configInfo("idleConnTimeout", idleConnTimeout)); log.debug(configInfo("directBuf", directBuf)); log.debug(configInfo("directSendBuf", directSndBuf)); - log.debug(configInfo("connBufSize", connBufSize)); - log.debug(configInfo("connBufFlushFreq", connBufFlushFreq)); log.debug(configInfo("selectorsCnt", selectorsCnt)); log.debug(configInfo("tcpNoDelay", tcpNoDelay)); log.debug(configInfo("sockSndBuf", sockSndBuf)); @@ -1255,11 +1233,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize)); } - if (connBufSize > 8192) - U.warn(log, "Specified communication IO buffer size is larger than recommended (ignore if done " + - "intentionally) [specified=" + connBufSize + ", recommended=8192]", - "Specified communication IO buffer size is larger than recommended (ignore if done intentionally)."); - if (!tcpNoDelay) U.quietAndWarn(log, "'TCP_NO_DELAY' for communication is off, which should be used with caution " + "since may produce significant delays with some scenarios."); @@ -1272,12 +1245,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter commWorker.start(); - if (connBufSize > 0) { - clientFlushWorker = new ClientFlushWorker(); - - clientFlushWorker.start(); - } - // Ack start. if (log.isDebugEnabled()) log.debug(startInfo()); @@ -1431,10 +1398,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (nioSrvr != null) nioSrvr.stop(); - U.interrupt(clientFlushWorker); U.interrupt(commWorker); - U.join(clientFlushWorker, log); U.join(commWorker, log); // Force closing on stop (safety). @@ -2023,10 +1988,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (nioSrvr != null) nioSrvr.stop(); - U.interrupt(clientFlushWorker); U.interrupt(commWorker); - U.join(clientFlushWorker, log); U.join(commWorker, log); for (GridCommunicationClient client : clients.values()) @@ -2134,58 +2097,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * */ - private class ClientFlushWorker extends IgniteSpiThread { - /** - * - */ - ClientFlushWorker() { - super(gridName, "nio-client-flusher", log); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"BusyWait"}) - @Override protected void body() throws InterruptedException { - while (!isInterrupted()) { - long connBufFlushFreq0 = connBufFlushFreq; - - for (Map.Entry entry : clients.entrySet()) { - GridCommunicationClient client = entry.getValue(); - - if (client.reserve()) { - boolean err = true; - - try { - client.flushIfNeeded(connBufFlushFreq0); - - err = false; - } - catch (IOException e) { - if (getSpiContext().pingNode(entry.getKey())) - U.error(log, "Failed to flush client: " + client, e); - else { - if (log.isDebugEnabled()) - log.debug("Failed to flush client (node left): " + client); - - onException("Failed to flush client (node left): " + client, e); - } - } - finally { - if (err) - client.forceClose(); - else - client.release(); - } - } - } - - Thread.sleep(connBufFlushFreq0); - } - } - } - - /** - * - */ private class CommunicationWorker extends IgniteSpiThread { /** */ private final BlockingQueue q = new LinkedBlockingQueue<>(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java index 5c80e6e..6f5a738 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java @@ -171,8 +171,6 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean { * This frequency defines how often system will advice to flush * connection buffer. *

- * If not provided, default value is {@link TcpCommunicationSpi#DFLT_CONN_BUF_FLUSH_FREQ}. - *

* This property is used only if {@link #getConnectionBufferSize()} is greater than {@code 0}. * * @param connBufFlushFreq Flush frequency. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java index e3baeb0..bdf9929 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java @@ -1286,7 +1286,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest { } /** - * Test client to use instead of {@link GridTcpCommunicationClient} + * Test client to use instead of {@link GridTcpNioCommunicationClient} */ private static class TestClient implements AutoCloseable { /** Socket implementation to use. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java index ea51aff..8d27485 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java @@ -64,7 +64,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica // Test idle clients remove. for (CommunicationSpi spi : spis.values()) { - ConcurrentMap clients = U.field(spi, "clients"); + ConcurrentMap clients = U.field(spi, "clients"); assertEquals(2, clients.size()); @@ -77,7 +77,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica super.afterTest(); for (CommunicationSpi spi : spis.values()) { - ConcurrentMap clients = U.field(spi, "clients"); + ConcurrentMap clients = U.field(spi, "clients"); for (int i = 0; i < 20 && !clients.isEmpty(); i++) { info("Check failed for SPI [grid=" + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java index c038ee7..2d175f5 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java @@ -256,7 +256,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest clients = U.field(spi, "clients"); + ConcurrentMap clients = U.field(spi, "clients"); assertEquals(1, clients.size()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java index 34fa610..c4a0916 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java @@ -32,8 +32,6 @@ public class GridTcpCommunicationSpiConfigSelfTest extends GridSpiAbstractConfig checkNegativeSpiProperty(new TcpCommunicationSpi(), "localPort", 65636); checkNegativeSpiProperty(new TcpCommunicationSpi(), "localPortRange", -1); checkNegativeSpiProperty(new TcpCommunicationSpi(), "idleConnectionTimeout", 0); - checkNegativeSpiProperty(new TcpCommunicationSpi(), "connectionBufferSize", -1); - checkNegativeSpiProperty(new TcpCommunicationSpi(), "connectionBufferFlushFrequency", 0); checkNegativeSpiProperty(new TcpCommunicationSpi(), "socketReceiveBuffer", -1); checkNegativeSpiProperty(new TcpCommunicationSpi(), "socketSendBuffer", -1); checkNegativeSpiProperty(new TcpCommunicationSpi(), "messageQueueLimit", -1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java index e7ae957..3916f02 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java @@ -501,7 +501,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac } for (CommunicationSpi spi : spis.values()) { - final ConcurrentMap clients = U.field(spi, "clients"); + final ConcurrentMap clients = U.field(spi, "clients"); assert GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java index 9c6fbb4..61bb944 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.spi.*; +import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.config.*; import org.apache.ignite.testframework.junits.*; import org.apache.ignite.testframework.junits.spi.*; @@ -267,9 +268,8 @@ public abstract class AbstractDiscoverySelfTest extends Gri Collection nodeIds = new HashSet<>(); - for (IgniteTestResources rsrc : spiRsrcs) { + for (IgniteTestResources rsrc : spiRsrcs) nodeIds.add(rsrc.getNodeId()); - } for (ClusterNode node : spi.getRemoteNodes()) { if (nodeIds.contains(node.id())) { @@ -390,6 +390,10 @@ public abstract class AbstractDiscoverySelfTest extends Gri } }); + GridSpiTestContext ctx = initSpiContext(); + + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "spiCtx", ctx); + spi.spiStart(getTestGridName() + i); spis.add(spi); @@ -397,7 +401,7 @@ public abstract class AbstractDiscoverySelfTest extends Gri spiRsrcs.add(rsrcMgr); // Force to use test context instead of default dummy context. - spi.onContextInitialized(initSpiContext()); + spi.onContextInitialized(ctx); } } catch (Throwable e) { @@ -438,9 +442,8 @@ public abstract class AbstractDiscoverySelfTest extends Gri spi.spiStop(); } - for (IgniteTestResources rscrs : spiRsrcs) { + for (IgniteTestResources rscrs : spiRsrcs) rscrs.stopThreads(); - } // Clear. spis.clear();