qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1731097 - in /qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network: Disassembler.java security/ssl/SSLSender.java
Date Thu, 18 Feb 2016 17:05:14 GMT
Author: kwall
Date: Thu Feb 18 17:05:14 2016
New Revision: 1731097

URL: http://svn.apache.org/viewvc?rev=1731097&view=rev
Log:
QPID-7057: [Java Client] Have SSLSender gather buffers and wrap them all on each flush

* Changed 0-10's (Client) Disassembler to no longer assumes it can mutate the buffer after
it passes to org.apache.qpid.transport.ByteBufferSender#send

Modified:
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=1731097&r1=1731096&r2=1731097&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
(original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
Thu Feb 18 17:05:14 2016
@@ -28,7 +28,9 @@ import static org.apache.qpid.transport.
 import static org.apache.qpid.transport.network.Frame.LAST_SEG;
 
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,9 +48,6 @@ import org.apache.qpid.transport.Protoco
 import org.apache.qpid.transport.SegmentType;
 import org.apache.qpid.transport.Struct;
 import org.apache.qpid.transport.codec.BBEncoder;
-import org.apache.qpid.transport.codec.Encoder;
-import org.apache.qpid.transport.util.Functions;
-import org.apache.qpid.util.ByteBufferUtils;
 
 /**
  * Disassembler
@@ -56,9 +55,9 @@ import org.apache.qpid.util.ByteBufferUt
 public final class Disassembler implements ProtocolEventSender, ProtocolDelegate<Void>,
FrameSizeObserver
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(Disassembler.class);
-    private final ByteBufferSender sender;
-    private int maxPayload;
-    private final Object sendlock = new Object();
+    private final ByteBufferSender _sender;
+    private final Object _sendlock = new Object();
+    private volatile int _maxPayload;
     private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>()
     {
         public BBEncoder initialValue()
@@ -69,12 +68,12 @@ public final class Disassembler implemen
 
     public Disassembler(ByteBufferSender sender, int maxFrame)
     {
-        this.sender = sender;
+        _sender = sender;
         if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
         {
             throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and <
64K: " + maxFrame);
         }
-        this.maxPayload  = maxFrame - HEADER_SIZE;
+        _maxPayload = maxFrame - HEADER_SIZE;
     }
 
     public void send(ProtocolEvent event)
@@ -84,91 +83,26 @@ public final class Disassembler implemen
 
     public void flush()
     {
-        synchronized (sendlock)
+        synchronized (_sendlock)
         {
-            sender.flush();
+            _sender.flush();
         }
     }
 
     public void close()
     {
-        synchronized (sendlock)
+        synchronized (_sendlock)
         {
-            sender.close();
-        }
-    }
-
-    private final ByteBuffer _frameHeader = ByteBuffer.allocate(HEADER_SIZE);
-
-    {
-        _frameHeader.order(ByteOrder.BIG_ENDIAN);
-    }
-
-    private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer
buf)
-    {
-        synchronized (sendlock)
-        {
-            ByteBuffer data = _frameHeader;
-            _frameHeader.rewind();
-
-            
-            data.put(0, flags);
-            data.put(1, type);
-            data.putShort(2, (short) (size + HEADER_SIZE));
-            data.put(5, track);
-            data.putShort(6, (short) channel);
-
-
-            int limit = buf.limit();
-            buf.limit(buf.position() + size);
-
-            data.rewind();
-            sender.send(QpidByteBuffer.wrap(data));
-            sender.send(QpidByteBuffer.wrap(buf));
-
-            buf.limit(limit);
-
-        }
-    }
-
-    private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf)
-    {
-        byte typeb = (byte) type.getValue();
-        byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
-
-        int remaining = buf.remaining();
-        boolean first = true;
-        while (true)
-        {
-            int size = min(maxPayload, remaining);
-            remaining -= size;
-
-            byte newflags = flags;
-            if (first)
-            {
-                newflags |= FIRST_FRAME;
-                first = false;
-            }
-            if (remaining == 0)
-            {
-                newflags |= LAST_FRAME;
-            }
-
-            frame(newflags, typeb, track, event.getChannel(), size, buf);
-
-            if (remaining == 0)
-            {
-                break;
-            }
+            _sender.close();
         }
     }
 
     public void init(Void v, ProtocolHeader header)
     {
-        synchronized (sendlock)
+        synchronized (_sendlock)
         {
-            sender.send(header.toByteBuffer());
-            sender.flush();
+            _sender.send(header.toByteBuffer());
+            _sender.flush();
         }
     }
 
@@ -234,24 +168,114 @@ public final class Disassembler implemen
             headerLimit = enc.position();
         }
 
-        synchronized (sendlock)
+        synchronized (_sendlock)
         {
             ByteBuffer buf = enc.underlyingBuffer();
-            buf.position(0);
-            buf.limit(methodLimit);
-
-            fragment(flags, type, method, buf);
+            buf.flip();
+            QpidByteBuffer copy = QpidByteBuffer.allocate(_sender.isDirectBufferPreferred(),
buf.remaining());
+            copy.putCopyOf(QpidByteBuffer.wrap(buf));
+            copy.flip();
+
+            final QpidByteBuffer methodBuf = copy.view(0, methodLimit);
+            fragment(flags, type, method, Collections.singletonList(methodBuf));
+            methodBuf.dispose();
             if (payload)
             {
-                ByteBuffer body = ByteBufferUtils.combine(method.getBody());
-                buf.limit(headerLimit);
-                buf.position(methodLimit);
-                fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf);
-                if (body != null)
+                Collection<QpidByteBuffer> bodies = method.getBody();
+                QpidByteBuffer headerBuf = copy.view(methodLimit, headerLimit);
+                fragment(bodies == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, Collections.singletonList(headerBuf));
+                headerBuf.dispose();
+                if (bodies != null)
                 {
-                    fragment(LAST_SEG, SegmentType.BODY, method, body);
+                    Collection<QpidByteBuffer> dup = new ArrayList<>(bodies.size());
+                    for(QpidByteBuffer b : bodies)
+                    {
+                        dup.add(b.duplicate());
+                    }
+                    fragment(LAST_SEG, SegmentType.BODY, method, dup);
+                    for(QpidByteBuffer b : dup)
+                    {
+                        b.dispose();
+                    }
                 }
+            }
+           copy.dispose();
+        }
+    }
+
+    private void fragment(byte flags, SegmentType type, ProtocolEvent event, Collection<QpidByteBuffer>
buffers)
+    {
+        byte typeb = (byte) type.getValue();
+        byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
+
+        int remaining = 0;
+        for(QpidByteBuffer b : buffers)
+        {
+            remaining += b.remaining();
+        }
+        boolean first = true;
+        while (true)
+        {
+            int size = min(_maxPayload, remaining);
+            remaining -= size;
 
+            byte newflags = flags;
+            if (first)
+            {
+                newflags |= FIRST_FRAME;
+                first = false;
+            }
+            if (remaining == 0)
+            {
+                newflags |= LAST_FRAME;
+            }
+
+            frame(newflags, typeb, track, event.getChannel(), size, buffers);
+
+            if (remaining == 0)
+            {
+                break;
+            }
+        }
+    }
+
+    private void frame(byte flags, byte type, byte track, int channel, int size, Collection<QpidByteBuffer>
buffers)
+    {
+        QpidByteBuffer data = QpidByteBuffer.allocate(_sender.isDirectBufferPreferred(),
HEADER_SIZE);
+
+        data.put(0, flags);
+        data.put(1, type);
+        data.putShort(2, (short) (size + HEADER_SIZE));
+        data.put(4, (byte) 0);
+        data.put(5, track);
+        data.putShort(6, (short) channel);
+
+
+        _sender.send(data);
+        data.dispose();
+
+        if(size > 0)
+        {
+            int residual = size;
+            for(QpidByteBuffer b : buffers)
+            {
+                final int remaining = b.remaining();
+                if(remaining > 0 )
+                {
+                    if(remaining >= residual)
+                    {
+                        final QpidByteBuffer buffer = b.view(0, residual);
+                        _sender.send(buffer);
+                        buffer.dispose();
+                        b.position(b.position() + residual);
+                        break;
+                    }
+                    else
+                    {
+                        _sender.send(b);
+                        residual-=remaining;
+                    }
+                }
             }
         }
     }
@@ -268,7 +292,7 @@ public final class Disassembler implemen
         {
             throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and <
64K: " + maxFrame);
         }
-        this.maxPayload  = maxFrame - HEADER_SIZE;
+        _maxPayload = maxFrame - HEADER_SIZE;
 
     }
 }

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java?rev=1731097&r1=1731096&r2=1731097&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
(original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
Thu Feb 18 17:05:14 2016
@@ -20,6 +20,11 @@
 package org.apache.qpid.transport.network.security.ssl;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.ssl.SSLEngine;
@@ -43,13 +48,14 @@ public class SSLSender implements ByteBu
     private final ByteBufferSender delegate;
     private final SSLEngine engine;
     private final int sslBufSize;
-    private final ByteBuffer netData;
+    private final QpidByteBuffer netData;
     private final long timeout;
     private final SSLStatus _sslStatus;
 
     private String _hostname;
 
     private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final ConcurrentLinkedQueue<QpidByteBuffer> _pending = new ConcurrentLinkedQueue<>();
 
 
     public SSLSender(SSLEngine engine, ByteBufferSender delegate, SSLStatus sslStatus)
@@ -57,7 +63,7 @@ public class SSLSender implements ByteBu
         this.engine = engine;
         this.delegate = delegate;
         sslBufSize = engine.getSession().getPacketBufferSize();
-        netData = ByteBuffer.allocate(sslBufSize);
+        netData = QpidByteBuffer.allocate(sslBufSize);
         timeout = Long.getLong("qpid.ssl_timeout", 60000);
         _sslStatus = sslStatus;
     }
@@ -109,7 +115,9 @@ public class SSLSender implements ByteBu
 
     private void tearDownSSLConnection() throws Exception
     {
-        SSLEngineResult result = engine.wrap(ByteBuffer.allocate(0), netData);
+        SSLEngineResult result = QpidByteBuffer.encryptSSL(engine,
+                                                           Collections.singletonList(QpidByteBuffer.allocate(0)),
+                                                           netData);
         Status status = result.getStatus();
         int read   = result.bytesProduced();
         while (status != Status.CLOSED)
@@ -124,15 +132,17 @@ public class SSLSender implements ByteBu
                 netData.limit(netData.position());
                 netData.position(netData.position() - read);
 
-                ByteBuffer data = netData.slice();
+                QpidByteBuffer data = netData.slice();
 
                 netData.limit(limit);
                 netData.position(netData.position() + read);
 
-                delegate.send(QpidByteBuffer.wrap(data));
+                delegate.send(data);
                 flush();
             }
-            result = engine.wrap(ByteBuffer.allocate(0), netData);
+            result = QpidByteBuffer.encryptSSL(engine,
+                                               Collections.singletonList(QpidByteBuffer.allocate(0)),
+                                               netData);
             status = result.getStatus();
             read   = result.bytesProduced();
         }
@@ -140,7 +150,9 @@ public class SSLSender implements ByteBu
 
     public void flush()
     {
+        doSend();
         delegate.flush();
+
     }
 
     @Override
@@ -151,7 +163,12 @@ public class SSLSender implements ByteBu
 
     public void send(QpidByteBuffer appData)
     {
-        appData = appData.duplicate();
+        _pending.add(appData.duplicate());
+
+    }
+
+    public void doSend()
+    {
         if (closed.get() && !_sslStatus.getSslErrorFlag())
         {
             throw new SenderException("SSL Sender is closed");
@@ -160,12 +177,24 @@ public class SSLSender implements ByteBu
         HandshakeStatus handshakeStatus;
         Status status;
 
-        while(appData.hasRemaining() && !_sslStatus.getSslErrorFlag())
+        while(!_pending.isEmpty() && !_sslStatus.getSslErrorFlag())
         {
             int read = 0;
             try
             {
-                SSLEngineResult result = engine.wrap(appData.asByteBuffer(), netData);
+                SSLEngineResult result = QpidByteBuffer.encryptSSL(engine, _pending, netData);
+
+                while(!_pending.isEmpty())
+                {
+                    QpidByteBuffer buf = _pending.peek();
+                    if (buf.hasRemaining())
+                    {
+                        break;
+                    }
+                    buf.dispose();
+                    _pending.poll();
+                }
+
                 read   = result.bytesProduced();
                 status = result.getStatus();
                 handshakeStatus = result.getHandshakeStatus();
@@ -182,12 +211,12 @@ public class SSLSender implements ByteBu
                 netData.limit(netData.position());
                 netData.position(netData.position() - read);
 
-                ByteBuffer data = netData.slice();
+                QpidByteBuffer data = netData.slice();
 
                 netData.limit(limit);
                 netData.position(netData.position() + read);
 
-                delegate.send(QpidByteBuffer.wrap(data));
+                delegate.send(data);
             }
 
             switch(status)
@@ -219,7 +248,7 @@ public class SSLSender implements ByteBu
                     break;
 
                 case NEED_UNWRAP:
-                    flush();
+                    delegate.flush();
                     synchronized(_sslStatus.getSslLock())
                     {
                         if (_sslStatus.getSslErrorFlag())
@@ -265,11 +294,9 @@ public class SSLSender implements ByteBu
             }
 
         }
-        appData.dispose();
     }
 
 
-
     private void doTasks()
     {
         Runnable runnable;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message