ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [14/50] incubator-ignite git commit: # ignite-883
Date Wed, 10 Jun 2015 09:23:23 GMT
# ignite-883


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fb827a77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fb827a77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fb827a77

Branch: refs/heads/ignite-929
Commit: fb827a7784614343ae639ea8b856d2f9f88d46db
Parents: db57652
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Jun 5 11:41:46 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Jun 5 15:10:00 2015 +0300

----------------------------------------------------------------------
 .../datastructures/DataStructuresProcessor.java |  31 +-
 .../timeout/GridSpiTimeoutObject.java           |  14 +
 .../util/nio/GridCommunicationClient.java       |   6 -
 .../util/nio/GridTcpCommunicationClient.java    | 554 -------------------
 .../util/nio/GridTcpNioCommunicationClient.java |   8 -
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |   4 +
 .../communication/tcp/TcpCommunicationSpi.java  |  97 +---
 .../tcp/TcpCommunicationSpiMBean.java           |   2 -
 .../internal/util/nio/GridNioSelfTest.java      |   2 +-
 .../GridTcpCommunicationSpiAbstractTest.java    |   4 +-
 ...mmunicationSpiConcurrentConnectSelfTest.java |   2 +-
 .../GridTcpCommunicationSpiConfigSelfTest.java  |   2 -
 ...cpCommunicationSpiMultithreadedSelfTest.java |   2 +-
 .../discovery/AbstractDiscoverySelfTest.java    |  13 +-
 14 files changed, 61 insertions(+), 680 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 2138639..aa3bfe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -101,7 +101,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
     private IgniteInternalCache<CacheDataStructuresCacheKey, List<CacheCollectionInfo>>
utilityDataCache;
 
     /** */
-    private UUID qryId;
+    private volatile UUID qryId;
 
     /**
      * @param ctx Context.
@@ -144,11 +144,22 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
             seqView = atomicsCache;
 
             dsCacheCtx = atomicsCache.context();
+        }
+    }
 
-            qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(),
-                new DataStructuresEntryFilter(),
-                dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(),
-                false);
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    private void startQuery() throws IgniteCheckedException {
+        if (qryId == null) {
+            synchronized (this) {
+                if (qryId == null) {
+                    qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(),
+                        new DataStructuresEntryFilter(),
+                        dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(),
+                        false);
+                }
+            }
         }
     }
 
@@ -178,6 +189,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
 
         checkAtomicsConfiguration();
 
+        startQuery();
+
         return getAtomic(new IgniteOutClosureX<IgniteAtomicSequence>() {
             @Override public IgniteAtomicSequence applyx() throws IgniteCheckedException
{
                 GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
@@ -304,6 +317,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
 
         checkAtomicsConfiguration();
 
+        startQuery();
+
         return getAtomic(new IgniteOutClosureX<IgniteAtomicLong>() {
             @Override public IgniteAtomicLong applyx() throws IgniteCheckedException {
                 final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
@@ -507,6 +522,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
 
         checkAtomicsConfiguration();
 
+        startQuery();
+
         return getAtomic(new IgniteOutClosureX<IgniteAtomicReference>() {
             @Override public IgniteAtomicReference<T> applyx() throws IgniteCheckedException
{
                 GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
@@ -608,6 +625,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
 
         checkAtomicsConfiguration();
 
+        startQuery();
+
         return getAtomic(new IgniteOutClosureX<IgniteAtomicStamped>() {
             @Override public IgniteAtomicStamped<T, S> applyx() throws IgniteCheckedException
{
                 GridCacheInternalKeyImpl key = new GridCacheInternalKeyImpl(name);
@@ -916,6 +935,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
 
         checkAtomicsConfiguration();
 
+        startQuery();
+
         return getAtomic(new IgniteOutClosureX<IgniteCountDownLatch>() {
             @Override public IgniteCountDownLatch applyx() throws IgniteCheckedException
{
                 GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
index 82267a2..a0fd9b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
@@ -53,6 +53,20 @@ public class GridSpiTimeoutObject implements GridTimeoutObject {
     }
 
     /** {@inheritDoc} */
+    @Override public int hashCode() {
+        assert false;
+
+        return super.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        assert false;
+
+        return super.equals(obj);
+    }
+
+    /** {@inheritDoc} */
     @Override public final String toString() {
         return S.toString(GridSpiTimeoutObject.class, this);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index 2f7fd88..693a5a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -100,12 +100,6 @@ public interface GridCommunicationClient {
     public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException;
 
     /**
-     * @param timeout Timeout.
-     * @throws IOException If failed.
-     */
-    public void flushIfNeeded(long timeout) throws IOException;
-
-    /**
      * @return {@code True} if send is asynchronous.
      */
     public boolean async();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
deleted file mode 100644
index 72c20f8..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
+++ /dev/null
@@ -1,554 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.util.nio;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.nio.*;
-import java.util.*;
-import java.util.concurrent.locks.*;
-
-/**
- * Grid client for NIO server.
- */
-public class GridTcpCommunicationClient extends GridAbstractCommunicationClient {
-    /** Socket. */
-    private final Socket sock;
-
-    /** Output stream. */
-    private final UnsafeBufferedOutputStream out;
-
-    /** Minimum buffered message count. */
-    private final int minBufferedMsgCnt;
-
-    /** Communication buffer size ratio. */
-    private final double bufSizeRatio;
-
-    /** */
-    private final ByteBuffer writeBuf;
-
-    /** */
-    private final MessageFormatter formatter;
-
-    /**
-     * @param metricsLsnr Metrics listener.
-     * @param addr Address.
-     * @param locHost Local address.
-     * @param connTimeout Connect timeout.
-     * @param tcpNoDelay Value for {@code TCP_NODELAY} socket option.
-     * @param sockRcvBuf Socket receive buffer.
-     * @param sockSndBuf Socket send buffer.
-     * @param bufSize Buffer size (or {@code 0} to disable buffer).
-     * @param minBufferedMsgCnt Minimum buffered message count.
-     * @param bufSizeRatio Communication buffer size ratio.
-     * @param formatter Message formatter.
-     * @throws IgniteCheckedException If failed.
-     */
-    public GridTcpCommunicationClient(
-        GridNioMetricsListener metricsLsnr,
-        InetSocketAddress addr,
-        InetAddress locHost,
-        long connTimeout,
-        boolean tcpNoDelay,
-        int sockRcvBuf,
-        int sockSndBuf,
-        int bufSize,
-        int minBufferedMsgCnt,
-        double bufSizeRatio,
-        MessageFormatter formatter
-    ) throws IgniteCheckedException {
-        super(metricsLsnr);
-
-        assert metricsLsnr != null;
-        assert addr != null;
-        assert locHost != null;
-        assert connTimeout >= 0;
-        assert bufSize >= 0;
-
-        A.ensure(minBufferedMsgCnt >= 0,
-            "Value of minBufferedMessageCount property cannot be less than zero.");
-        A.ensure(bufSizeRatio > 0 && bufSizeRatio < 1,
-            "Value of bufSizeRatio property must be between 0 and 1 (exclusive).");
-
-        this.minBufferedMsgCnt = minBufferedMsgCnt;
-        this.bufSizeRatio = bufSizeRatio;
-        this.formatter = formatter;
-
-        writeBuf = ByteBuffer.allocate(8 << 10);
-
-        writeBuf.order(ByteOrder.nativeOrder());
-
-        sock = new Socket();
-
-        boolean success = false;
-
-        try {
-            sock.bind(new InetSocketAddress(locHost, 0));
-
-            sock.setTcpNoDelay(tcpNoDelay);
-
-            if (sockRcvBuf > 0)
-                sock.setReceiveBufferSize(sockRcvBuf);
-
-            if (sockSndBuf > 0)
-                sock.setSendBufferSize(sockSndBuf);
-
-            sock.connect(addr, (int)connTimeout);
-
-            out = new UnsafeBufferedOutputStream(sock.getOutputStream(), bufSize);
-
-            success = true;
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to connect to remote host " +
-                "[addr=" + addr + ", localHost=" + locHost + ']', e);
-        }
-        finally {
-            if (!success)
-                U.closeQuiet(sock);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void doHandshake(IgniteInClosure2X<InputStream, OutputStream>
handshakeC) throws IgniteCheckedException {
-        try {
-            handshakeC.applyx(sock.getInputStream(), sock.getOutputStream());
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to access IO streams when executing
handshake with remote node: " +
-                sock.getRemoteSocketAddress(), e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean close() {
-        boolean res = super.close();
-
-        if (res) {
-            U.closeQuiet(out);
-            U.closeQuiet(sock);
-        }
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void forceClose() {
-        super.forceClose();
-
-        try {
-            out.flush();
-        }
-        catch (IOException ignored) {
-            // No-op.
-        }
-
-        // Do not call (directly or indirectly) out.close() here
-        // since it may cause a deadlock.
-        out.forceClose();
-
-        U.closeQuiet(sock);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void sendMessage(byte[] data, int len) throws IgniteCheckedException
{
-        if (closed())
-            throw new IgniteCheckedException("Client was closed: " + this);
-
-        try {
-            out.write(data, 0, len);
-
-            metricsLsnr.onBytesSent(len);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to send message to remote node: " +
sock.getRemoteSocketAddress(), e);
-        }
-
-        markUsed();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg)
-        throws IgniteCheckedException {
-        if (closed())
-            throw new IgniteCheckedException("Client was closed: " + this);
-
-        assert writeBuf.hasArray();
-
-        try {
-            int cnt = U.writeMessageFully(msg, out, writeBuf, formatter.writer());
-
-            metricsLsnr.onBytesSent(cnt);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to send message to remote node: " +
sock.getRemoteSocketAddress(), e);
-        }
-
-        markUsed();
-
-        return false;
-    }
-
-    /**
-     * @param timeout Timeout.
-     * @throws IOException If failed.
-     */
-    @Override public void flushIfNeeded(long timeout) throws IOException {
-        assert timeout > 0;
-
-        out.flushOnTimeout(timeout);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridTcpCommunicationClient.class, this, super.toString());
-    }
-
-    /**
-     *
-     */
-    private class UnsafeBufferedOutputStream extends FilterOutputStream {
-        /** The internal buffer where data is stored. */
-        private final byte buf[];
-
-        /** Current size. */
-        private int size;
-
-        /** Count. */
-        private int cnt;
-
-        /** Message count. */
-        private int msgCnt;
-
-        /** Total messages size. */
-        private int totalCnt;
-
-        /** Lock. */
-        private final ReentrantLock lock = new ReentrantLock();
-
-        /** Last flushed timestamp. */
-        private volatile long lastFlushed = U.currentTimeMillis();
-
-        /** Cached flush timeout. */
-        private volatile long flushTimeout;
-
-        /** Buffer adjusted timestamp. */
-        private long lastAdjusted = U.currentTimeMillis();
-
-        /**
-         * Creates a new buffered output stream to write data to the
-         * specified underlying output stream.
-         *
-         * @param out The underlying output stream.
-         */
-        UnsafeBufferedOutputStream(OutputStream out) {
-            this(out, 8192);
-        }
-
-        /**
-         * Creates a new buffered output stream to write data to the
-         * specified underlying output stream with the specified buffer
-         * size.
-         *
-         * @param out The underlying output stream.
-         * @param size The buffer size.
-         */
-        UnsafeBufferedOutputStream(OutputStream out, int size) {
-            super(out);
-
-            assert size >= 0;
-
-            this.size = size;
-            buf = size > 0 ? new byte[size] : null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(int b) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(byte[] b, int off, int len) throws IOException {
-            assert b != null;
-            assert off == 0;
-
-            // No buffering.
-            if (buf == null) {
-                lock.lock();
-
-                try {
-                    out.write(b, 0, len);
-                }
-                finally {
-                    lock.unlock();
-                }
-
-                return;
-            }
-
-            // Buffering is enabled.
-            lock.lock();
-
-            try {
-                msgCnt++;
-                totalCnt += len;
-
-                if (len >= size) {
-                    flushLocked();
-
-                    out.write(b, 0, len);
-
-                    lastFlushed = U.currentTimeMillis();
-
-                    adjustBufferIfNeeded();
-
-                    return;
-                }
-
-                if (cnt + len > size) {
-                    flushLocked();
-
-                    messageToBuffer0(b, off, len, buf, 0);
-
-                    cnt = len;
-
-                    assert cnt < size;
-
-                    adjustBufferIfNeeded();
-
-                    return;
-                }
-
-                messageToBuffer0(b, 0, len, buf, cnt);
-
-                cnt += len;
-
-                if (cnt == size)
-                    flushLocked();
-                else
-                    flushIfNeeded();
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-
-        /**
-         * @throws IOException If failed.
-         */
-        private void flushIfNeeded() throws IOException {
-            assert lock.isHeldByCurrentThread();
-            assert buf != null;
-
-            long flushTimeout0 = flushTimeout;
-
-            if (flushTimeout0 > 0)
-                flushOnTimeoutLocked(flushTimeout0);
-        }
-
-        /**
-         *
-         */
-        private void adjustBufferIfNeeded() {
-            assert lock.isHeldByCurrentThread();
-            assert buf != null;
-
-            long flushTimeout0 = flushTimeout;
-
-            if (flushTimeout0 > 0)
-                adjustBufferLocked(flushTimeout0);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void flush() throws IOException {
-            lock.lock();
-
-            try {
-                flushLocked();
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-
-        /**
-         * @param timeout Timeout.
-         * @throws IOException If failed.
-         */
-        public void flushOnTimeout(long timeout) throws IOException {
-            assert buf != null;
-            assert timeout > 0;
-
-            // Overwrite cached value.
-            flushTimeout = timeout;
-
-            if (lastFlushed + timeout > U.currentTimeMillis() || !lock.tryLock())
-                return;
-
-            try {
-                flushOnTimeoutLocked(timeout);
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-
-        /**
-         * @param timeout Timeout.
-         * @throws IOException If failed.
-         */
-        private void flushOnTimeoutLocked(long timeout) throws IOException {
-            assert lock.isHeldByCurrentThread();
-            assert timeout > 0;
-
-            // Double check.
-            if (cnt == 0 || lastFlushed + timeout > U.currentTimeMillis())
-                return;
-
-            flushLocked();
-
-            adjustBufferLocked(timeout);
-        }
-
-        /**
-         * @param timeout Timeout.
-         */
-        private void adjustBufferLocked(long timeout) {
-            assert lock.isHeldByCurrentThread();
-            assert timeout > 0;
-
-            long time = U.currentTimeMillis();
-
-            if (lastAdjusted + timeout < time) {
-                if (msgCnt <= minBufferedMsgCnt)
-                    size = 0;
-                else {
-                    size = (int)(totalCnt * bufSizeRatio);
-
-                    if (size > buf.length)
-                        size = buf.length;
-                }
-
-                msgCnt = 0;
-                totalCnt = 0;
-
-                lastAdjusted = time;
-            }
-        }
-
-        /**
-         * @throws IOException If failed.
-         */
-        private void flushLocked() throws IOException {
-            assert lock.isHeldByCurrentThread();
-
-            if (buf != null && cnt > 0) {
-                out.write(buf, 0, cnt);
-
-                cnt = 0;
-            }
-
-            out.flush();
-
-            lastFlushed = U.currentTimeMillis();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IOException {
-            lock.lock();
-
-            try {
-                flushLocked();
-            }
-            finally {
-                try {
-                    out.close();
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
-        }
-
-        /**
-         * Forcibly closes underlying stream ignoring any possible exception.
-         */
-        public void forceClose() {
-            try {
-                out.close();
-            }
-            catch (IOException ignored) {
-                // No-op.
-            }
-        }
-
-        /**
-         * @param b Buffer to copy from.
-         * @param off Offset in source buffer.
-         * @param len Length.
-         * @param resBuf Result buffer.
-         * @param resOff Result offset.
-         */
-        private void messageToBuffer(byte[] b, int off, int len, byte[] resBuf, int resOff)
{
-            assert b.length == len;
-            assert off == 0;
-            assert resBuf.length >= resOff + len + 4;
-
-            U.intToBytes(len, resBuf, resOff);
-
-            U.arrayCopy(b, off, resBuf, resOff + 4, len);
-        }
-
-        /**
-         * @param b Buffer to copy from (length included).
-         * @param off Offset in source buffer.
-         * @param len Length.
-         * @param resBuf Result buffer.
-         * @param resOff Result offset.
-         */
-        private void messageToBuffer0(byte[] b, int off, int len, byte[] resBuf, int resOff)
{
-            assert off == 0;
-            assert resBuf.length >= resOff + len;
-
-            U.arrayCopy(b, off, resBuf, resOff, len);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            lock.lock();
-
-            try {
-                return S.toString(UnsafeBufferedOutputStream.class, this);
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 788a8e6..abad875 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -122,14 +122,6 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
         return false;
     }
 
-    /**
-     * @param timeout Timeout.
-     * @throws IOException If failed.
-     */
-    @Override public void flushIfNeeded(long timeout) throws IOException {
-        // No-op.
-    }
-
     /** {@inheritDoc} */
     @Override public boolean async() {
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index c9c633f..d095491 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -730,11 +730,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
 
         /** {@inheritDoc} */
         @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+            assert ignite instanceof IgniteKernal : ignite;
+
             ((IgniteKernal)ignite).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
         }
 
         /** {@inheritDoc} */
         @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+            assert ignite instanceof IgniteKernal : ignite;
+
             ((IgniteKernal)ignite).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index b324ab2..359de1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -157,12 +157,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Default idle connection timeout (value is <tt>30000</tt>ms). */
     public static final long DFLT_IDLE_CONN_TIMEOUT = 30000;
 
-    /** Default value for connection buffer flush frequency (value is <tt>100</tt>
ms). */
-    public static final long DFLT_CONN_BUF_FLUSH_FREQ = 100;
-
-    /** Default value for connection buffer size (value is <tt>0</tt>). */
-    public static final int DFLT_CONN_BUF_SIZE = 0;
-
     /** Default socket send and receive buffer size. */
     public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;
 
@@ -603,13 +597,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Idle connection timeout. */
     private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT;
 
-    /** Connection buffer flush frequency. */
-    private volatile long connBufFlushFreq = DFLT_CONN_BUF_FLUSH_FREQ;
-
-    /** Connection buffer size. */
-    @SuppressWarnings("RedundantFieldInitialization")
-    private int connBufSize = DFLT_CONN_BUF_SIZE;
-
     /** Connect timeout. */
     private long connTimeout = DFLT_CONN_TIMEOUT;
 
@@ -647,9 +634,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Socket write timeout. */
     private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT;
 
-    /** Flush client worker. */
-    private ClientFlushWorker clientFlushWorker;
-
     /** Recovery and idle clients handler. */
     private CommunicationWorker commWorker;
 
@@ -876,31 +860,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /**
      * Sets connection buffer size. If set to {@code 0} connection buffer is disabled.
-     * <p>
-     * If not provided, default value is {@link #DFLT_CONN_BUF_SIZE}.
      *
      * @param connBufSize Connection buffer size.
      * @see #setConnectionBufferFlushFrequency(long)
      */
     @IgniteSpiConfiguration(optional = true)
     public void setConnectionBufferSize(int connBufSize) {
-        this.connBufSize = connBufSize;
+        // No-op.
     }
 
     /** {@inheritDoc} */
     @Override public int getConnectionBufferSize() {
-        return connBufSize;
+        return 0;
     }
 
     /** {@inheritDoc} */
     @IgniteSpiConfiguration(optional = true)
     @Override public void setConnectionBufferFlushFrequency(long connBufFlushFreq) {
-        this.connBufFlushFreq = connBufFlushFreq;
+        // No-op.
     }
 
     /** {@inheritDoc} */
     @Override public long getConnectionBufferFlushFrequency() {
-        return connBufFlushFreq;
+        return 0;
     }
 
     /**
@@ -1168,8 +1150,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         assertParameter(locPort <= 0xffff, "locPort < 0xffff");
         assertParameter(locPortRange >= 0, "locPortRange >= 0");
         assertParameter(idleConnTimeout > 0, "idleConnTimeout > 0");
-        assertParameter(connBufFlushFreq > 0, "connBufFlushFreq > 0");
-        assertParameter(connBufSize >= 0, "connBufSize >= 0");
         assertParameter(sockRcvBuf >= 0, "sockRcvBuf >= 0");
         assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
         assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
@@ -1239,8 +1219,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             log.debug(configInfo("idleConnTimeout", idleConnTimeout));
             log.debug(configInfo("directBuf", directBuf));
             log.debug(configInfo("directSendBuf", directSndBuf));
-            log.debug(configInfo("connBufSize", connBufSize));
-            log.debug(configInfo("connBufFlushFreq", connBufFlushFreq));
             log.debug(configInfo("selectorsCnt", selectorsCnt));
             log.debug(configInfo("tcpNoDelay", tcpNoDelay));
             log.debug(configInfo("sockSndBuf", sockSndBuf));
@@ -1255,11 +1233,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize));
         }
 
-        if (connBufSize > 8192)
-            U.warn(log, "Specified communication IO buffer size is larger than recommended
(ignore if done " +
-                "intentionally) [specified=" + connBufSize + ", recommended=8192]",
-                "Specified communication IO buffer size is larger than recommended (ignore
if done intentionally).");
-
         if (!tcpNoDelay)
             U.quietAndWarn(log, "'TCP_NO_DELAY' for communication is off, which should be
used with caution " +
                 "since may produce significant delays with some scenarios.");
@@ -1272,12 +1245,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         commWorker.start();
 
-        if (connBufSize > 0) {
-            clientFlushWorker = new ClientFlushWorker();
-
-            clientFlushWorker.start();
-        }
-
         // Ack start.
         if (log.isDebugEnabled())
             log.debug(startInfo());
@@ -1431,10 +1398,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (nioSrvr != null)
             nioSrvr.stop();
 
-        U.interrupt(clientFlushWorker);
         U.interrupt(commWorker);
 
-        U.join(clientFlushWorker, log);
         U.join(commWorker, log);
 
         // Force closing on stop (safety).
@@ -2023,10 +1988,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (nioSrvr != null)
             nioSrvr.stop();
 
-        U.interrupt(clientFlushWorker);
         U.interrupt(commWorker);
 
-        U.join(clientFlushWorker, log);
         U.join(commWorker, log);
 
         for (GridCommunicationClient client : clients.values())
@@ -2134,58 +2097,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /**
      *
      */
-    private class ClientFlushWorker extends IgniteSpiThread {
-        /**
-         *
-         */
-        ClientFlushWorker() {
-            super(gridName, "nio-client-flusher", log);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings({"BusyWait"})
-        @Override protected void body() throws InterruptedException {
-            while (!isInterrupted()) {
-                long connBufFlushFreq0 = connBufFlushFreq;
-
-                for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet())
{
-                    GridCommunicationClient client = entry.getValue();
-
-                    if (client.reserve()) {
-                        boolean err = true;
-
-                        try {
-                            client.flushIfNeeded(connBufFlushFreq0);
-
-                            err = false;
-                        }
-                        catch (IOException e) {
-                            if (getSpiContext().pingNode(entry.getKey()))
-                                U.error(log, "Failed to flush client: " + client, e);
-                            else {
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to flush client (node left): " + client);
-
-                                onException("Failed to flush client (node left): " + client,
e);
-                            }
-                        }
-                        finally {
-                            if (err)
-                                client.forceClose();
-                            else
-                                client.release();
-                        }
-                    }
-                }
-
-                Thread.sleep(connBufFlushFreq0);
-            }
-        }
-    }
-
-    /**
-     *
-     */
     private class CommunicationWorker extends IgniteSpiThread {
         /** */
         private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index 5c80e6e..6f5a738 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -171,8 +171,6 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean
{
      * This frequency defines how often system will advice to flush
      * connection buffer.
      * <p>
-     * If not provided, default value is {@link TcpCommunicationSpi#DFLT_CONN_BUF_FLUSH_FREQ}.
-     * <p>
      * This property is used only if {@link #getConnectionBufferSize()} is greater than {@code
0}.
      *
      * @param connBufFlushFreq Flush frequency.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
index e3baeb0..bdf9929 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
@@ -1286,7 +1286,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Test client to use instead of {@link GridTcpCommunicationClient}
+     * Test client to use instead of {@link GridTcpNioCommunicationClient}
      */
     private static class TestClient implements AutoCloseable {
         /** Socket implementation to use. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index ea51aff..8d27485 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -64,7 +64,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
 
         // Test idle clients remove.
         for (CommunicationSpi spi : spis.values()) {
-            ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi,
"clients");
+            ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
 
             assertEquals(2, clients.size());
 
@@ -77,7 +77,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
         super.afterTest();
 
         for (CommunicationSpi spi : spis.values()) {
-            ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi,
"clients");
+            ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
 
             for (int i = 0; i < 20 && !clients.isEmpty(); i++) {
                 info("Check failed for SPI [grid=" +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index c038ee7..2d175f5 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -256,7 +256,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends
Communic
                     assertTrue(latch.await(10, TimeUnit.SECONDS));
 
                     for (CommunicationSpi spi : spis) {
-                        ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi,
"clients");
+                        ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi,
"clients");
 
                         assertEquals(1, clients.size());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
index 34fa610..c4a0916 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
@@ -32,8 +32,6 @@ public class GridTcpCommunicationSpiConfigSelfTest extends GridSpiAbstractConfig
         checkNegativeSpiProperty(new TcpCommunicationSpi(), "localPort", 65636);
         checkNegativeSpiProperty(new TcpCommunicationSpi(), "localPortRange", -1);
         checkNegativeSpiProperty(new TcpCommunicationSpi(), "idleConnectionTimeout", 0);
-        checkNegativeSpiProperty(new TcpCommunicationSpi(), "connectionBufferSize", -1);
-        checkNegativeSpiProperty(new TcpCommunicationSpi(), "connectionBufferFlushFrequency",
0);
         checkNegativeSpiProperty(new TcpCommunicationSpi(), "socketReceiveBuffer", -1);
         checkNegativeSpiProperty(new TcpCommunicationSpi(), "socketSendBuffer", -1);
         checkNegativeSpiProperty(new TcpCommunicationSpi(), "messageQueueLimit", -1);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index e7ae957..3916f02 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -501,7 +501,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
         }
 
         for (CommunicationSpi spi : spis.values()) {
-            final ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi,
"clients");
+            final ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi,
"clients");
 
             assert GridTestUtils.waitForCondition(new PA() {
                 @Override public boolean apply() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
index 9c6fbb4..61bb944 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.marshaller.*;
 import org.apache.ignite.spi.*;
+import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.config.*;
 import org.apache.ignite.testframework.junits.*;
 import org.apache.ignite.testframework.junits.spi.*;
@@ -267,9 +268,8 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi>
extends Gri
 
             Collection<UUID> nodeIds = new HashSet<>();
 
-            for (IgniteTestResources rsrc : spiRsrcs) {
+            for (IgniteTestResources rsrc : spiRsrcs)
                 nodeIds.add(rsrc.getNodeId());
-            }
 
             for (ClusterNode node : spi.getRemoteNodes()) {
                 if (nodeIds.contains(node.id())) {
@@ -390,6 +390,10 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi>
extends Gri
                     }
                 });
 
+                GridSpiTestContext ctx = initSpiContext();
+
+                GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "spiCtx", ctx);
+
                 spi.spiStart(getTestGridName() + i);
 
                 spis.add(spi);
@@ -397,7 +401,7 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi>
extends Gri
                 spiRsrcs.add(rsrcMgr);
 
                 // Force to use test context instead of default dummy context.
-                spi.onContextInitialized(initSpiContext());
+                spi.onContextInitialized(ctx);
             }
         }
         catch (Throwable e) {
@@ -438,9 +442,8 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi>
extends Gri
             spi.spiStop();
         }
 
-        for (IgniteTestResources rscrs : spiRsrcs) {
+        for (IgniteTestResources rscrs : spiRsrcs)
             rscrs.stopThreads();
-        }
 
         // Clear.
         spis.clear();



Mime
View raw message