ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [1/8] incubator-ignite git commit: # ignite-nio - Removing message clone
Date Sat, 14 Feb 2015 03:34:49 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-nio [created] 947723c92


# ignite-nio - Removing message clone


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/77f1695e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/77f1695e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/77f1695e

Branch: refs/heads/ignite-nio
Commit: 77f1695e843d715b6864a3bacb04b445f6e30476
Parents: 322408f
Author: Valentin Kulichenko <vkulichenko@gridgain.com>
Authored: Thu Feb 12 21:45:59 2015 -0800
Committer: Valentin Kulichenko <vkulichenko@gridgain.com>
Committed: Thu Feb 12 21:45:59 2015 -0800

----------------------------------------------------------------------
 .../CommunicationMessageCodeGenerator.java      |   5 +
 .../ignite/internal/util/nio/GridNioServer.java | 137 ++++++++++---------
 .../util/nio/GridNioSessionMetaKey.java         |   5 +-
 .../communication/MessageAdapter.java           |  25 ++--
 .../extensions/communication/MessageWriter.java |   2 +
 5 files changed, 103 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/77f1695e/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java
b/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java
index f4c8d94..54d4a30 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java
@@ -357,6 +357,11 @@ public class CommunicationMessageCodeGenerator {
     private void start(Collection<String> code, @Nullable String superMtd, boolean
write) {
         assert code != null;
 
+        if (write) {
+            code.add(builder().a("MessageWriter writer = WRITER.get();").toString());
+            code.add(EMPTY);
+        }
+
         code.add(builder().a(write ? "writer" : "reader").a(".setBuffer(").a(BUF_VAR).a(");").toString());
         code.add(EMPTY);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/77f1695e/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index e22a723..725f5e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -993,100 +993,115 @@ public class GridNioServer<T> {
          */
         @SuppressWarnings("ForLoopReplaceableByForEach")
         private void processWrite0(SelectionKey key) throws IOException {
-            WritableByteChannel sockCh = (WritableByteChannel)key.channel();
+            try {
+                WritableByteChannel sockCh = (WritableByteChannel)key.channel();
 
-            GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
-            ByteBuffer buf = ses.writeBuffer();
-            NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
+                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+                ByteBuffer buf = ses.writeBuffer();
+                NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal());
 
-            List<NioOperationFuture<?>> doneFuts = null;
+                MessageWriter writer = ses.meta(WRITER.ordinal());
 
-            while (true) {
-                if (req == null) {
-                    req = (NioOperationFuture<?>)ses.pollFuture();
+                if (writer == null) {
+                    ses.addMeta(WRITER.ordinal(), writer = formatter.writer());
 
-                    if (req == null && buf.position() == 0) {
-                        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                    MessageAdapter.WRITER.set(writer);
+                }
 
-                        break;
+                List<NioOperationFuture<?>> doneFuts = null;
+
+                while (true) {
+                    if (req == null) {
+                        req = (NioOperationFuture<?>)ses.pollFuture();
+
+                        if (req == null && buf.position() == 0) {
+                            key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+
+                            break;
+                        }
                     }
-                }
 
-                MessageAdapter msg;
-                boolean finished = false;
+                    MessageAdapter msg;
+                    boolean finished = false;
 
-                if (req != null) {
-                    msg = req.directMessage();
+                    if (req != null) {
+                        msg = req.directMessage();
 
-                    assert msg != null;
+                        assert msg != null;
 
-                    msg.setWriter(formatter.writer());
+                        finished = msg.writeTo(buf);
 
-                    finished = msg.writeTo(buf);
-                }
+                        if (finished)
+                            writer.reset();
+                    }
 
-                // Fill up as many messages as possible to write buffer.
-                while (finished) {
-                    if (doneFuts == null)
-                        doneFuts = new ArrayList<>();
+                    // Fill up as many messages as possible to write buffer.
+                    while (finished) {
+                        if (doneFuts == null)
+                            doneFuts = new ArrayList<>();
 
-                    doneFuts.add(req);
+                        doneFuts.add(req);
 
-                    req = (NioOperationFuture<?>)ses.pollFuture();
+                        req = (NioOperationFuture<?>)ses.pollFuture();
 
-                    if (req == null)
-                        break;
+                        if (req == null)
+                            break;
 
-                    msg = req.directMessage();
+                        msg = req.directMessage();
 
-                    assert msg != null;
+                        assert msg != null;
 
-                    msg.setWriter(formatter.writer());
+                        finished = msg.writeTo(buf);
 
-                    finished = msg.writeTo(buf);
-                }
+                        if (finished)
+                            writer.reset();
+                    }
 
-                buf.flip();
+                    buf.flip();
 
-                assert buf.hasRemaining();
+                    assert buf.hasRemaining();
 
-                if (!skipWrite) {
-                    int cnt = sockCh.write(buf);
+                    if (!skipWrite) {
+                        int cnt = sockCh.write(buf);
 
-                    if (!F.isEmpty(doneFuts)) {
-                        for (int i = 0; i < doneFuts.size(); i++)
-                            doneFuts.get(i).onDone();
+                        if (!F.isEmpty(doneFuts)) {
+                            for (int i = 0; i < doneFuts.size(); i++)
+                                doneFuts.get(i).onDone();
 
-                        doneFuts.clear();
-                    }
+                            doneFuts.clear();
+                        }
 
-                    if (log.isTraceEnabled())
-                        log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + cnt + ']');
+                        if (log.isTraceEnabled())
+                            log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + cnt + ']');
 
-                    if (metricsLsnr != null)
-                        metricsLsnr.onBytesSent(cnt);
+                        if (metricsLsnr != null)
+                            metricsLsnr.onBytesSent(cnt);
 
-                    ses.bytesSent(cnt);
-                }
-                else {
-                    // For test purposes only (skipWrite is set to true in tests only).
-                    try {
-                        U.sleep(50);
+                        ses.bytesSent(cnt);
                     }
-                    catch (IgniteInterruptedCheckedException e) {
-                        throw new IOException("Thread has been interrupted.", e);
+                    else {
+                        // For test purposes only (skipWrite is set to true in tests only).
+                        try {
+                            U.sleep(50);
+                        }
+                        catch (IgniteInterruptedCheckedException e) {
+                            throw new IOException("Thread has been interrupted.", e);
+                        }
                     }
-                }
 
-                if (buf.hasRemaining()) {
-                    buf.compact();
+                    if (buf.hasRemaining()) {
+                        buf.compact();
 
-                    ses.addMeta(NIO_OPERATION.ordinal(), req);
+                        ses.addMeta(NIO_OPERATION.ordinal(), req);
 
-                    break;
+                        break;
+                    }
+                    else
+                        buf.clear();
                 }
-                else
-                    buf.clear();
+            }
+            finally {
+                MessageAdapter.WRITER.remove();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/77f1695e/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
index 88ac9ba..ba568ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
@@ -39,7 +39,10 @@ public enum GridNioSessionMetaKey {
     MARSHALLER,
 
     /** Client marshaller ID. */
-    MARSHALLER_ID;
+    MARSHALLER_ID,
+
+    /** Message writer. */
+    WRITER;
 
     /** Maximum count of NIO session keys in system. */
     public static final int MAX_KEYS_CNT = 64;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/77f1695e/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
index 0603d67..466dfde 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
@@ -25,6 +25,9 @@ import java.nio.*;
  */
 public abstract class MessageAdapter implements Serializable, Cloneable {
     /** Message writer. */
+    public static final ThreadLocal<MessageWriter> WRITER = new ThreadLocal<>();
+
+    // TODO: remove
     protected MessageWriter writer;
 
     /** Message reader. */
@@ -36,20 +39,24 @@ public abstract class MessageAdapter implements Serializable, Cloneable
{
     /** Current write/read state. */
     protected int state;
 
-    /**
-     * @param writer Message writer.
-     */
-    public final void setWriter(MessageWriter writer) {
-        if (this.writer == null)
-            this.writer = writer;
-    }
+//    /**
+//     * @param writer Message writer.
+//     */
+//    public final void setWriter(MessageWriter writer) {
+//        assert writer != null;
+//
+//        WRITER.set(writer);
+//    }
 
     /**
      * @param reader Message reader.
      */
     public final void setReader(MessageReader reader) {
-        if (this.reader == null)
-            this.reader = reader;
+        assert reader != null;
+
+        assert this.reader == null;
+
+        this.reader = reader;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/77f1695e/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
index 89c55a3..66eb721 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
@@ -263,4 +263,6 @@ public interface MessageWriter {
      * @return Whether value was fully written.
      */
     public <K, V> boolean writeMap(String name, Map<K, V> map, Class<K>
keyCls, Class<V> valCls);
+
+    public void reset();
 }


Mime
View raw message