qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1693129 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/transport/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpi...
Date Tue, 28 Jul 2015 16:31:19 GMT
Author: rgodfrey
Date: Tue Jul 28 16:31:18 2015
New Revision: 1693129

URL: http://svn.apache.org/r1693129
Log:
QPID-6662 : Continue to use the same buffer rather than fragmenting when there is insufficient
data to decode (0-8/9/9-1)

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
Tue Jul 28 16:31:18 2015
@@ -275,6 +275,7 @@ public class MultiVersionProtocolEngine
         public void received(ByteBuffer msg)
         {
             _logger.error("Error processing incoming data, could not negotiate a common protocol");
+            msg.position(msg.limit());
         }
 
         public void closed()
@@ -395,7 +396,7 @@ public class MultiVersionProtocolEngine
 
             if(_header.remaining() > msgheader.limit())
             {
-                msg.position(msg.limit());
+                return;
             }
             else
             {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
Tue Jul 28 16:31:18 2015
@@ -43,7 +43,7 @@ public class NonBlockingConnectionPlainD
     public boolean processData(ByteBuffer buffer)
     {
         _parent.processAmqpData(buffer);
-        buffer.position(buffer.limit());
+
         return false;
     }
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
Tue Jul 28 16:31:18 2015
@@ -212,7 +212,10 @@ public class AMQPConnection_0_10 extends
                         throw new ConnectionScopedRuntimeException(e);
                     }
                 }
-
+                finally
+                {
+                    buf.position(buf.limit());
+                }
                 return null;
             }
         });

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
Tue Jul 28 16:31:18 2015
@@ -64,7 +64,6 @@ import org.apache.qpid.server.protocol.C
 import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.connection.ConnectionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
Tue Jul 28 16:31:18 2015
@@ -159,7 +159,7 @@ public class MessageMetaData implements
         {
             try
             {
-                ByteBufferDataInput dataInput = new ByteBufferDataInput(buf);
+                ByteBufferDataInput dataInput = new ByteBufferDataInput(buf.slice());
                 int size = EncodingUtils.readInteger(dataInput);
                 ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(dataInput, size);
                 final AMQShortString exchange = EncodingUtils.readAMQShortString(dataInput);

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
Tue Jul 28 16:31:18 2015
@@ -461,6 +461,10 @@ public class AMQPConnection_1_0 extends
             LOGGER.error("Exception while processing incoming data", e);
             getNetwork().close();
         }
+        finally
+        {
+            msg.position(msg.limit());
+        }
      }
 
 

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Tue Jul 28
16:31:18 2015
@@ -61,13 +61,8 @@ public abstract class AMQDecoder<T exten
 
     private int _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode();
 
-    private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>();
-
-    private List<ByteBuffer> _incompleteBuffers = new ArrayList<ByteBuffer>();
-
     /**
      * Creates a new AMQP decoder.
-     *
      * @param expectProtocolInitiation <tt>true</tt> if this decoder needs to
handle protocol initiation.
      * @param methodProcessor method processor
      */
@@ -101,22 +96,21 @@ public abstract class AMQDecoder<T exten
     }
 
 
-    public void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException,
IOException
-    {
-
-        buf = buf.slice();
-        _incompleteBuffers.add(buf);
-        ByteBufferListDataInput msg = new ByteBufferListDataInput(_incompleteBuffers);
+    public abstract void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException,
IOException;
 
+    protected void decode(final MarkableDataInput msg) throws IOException, AMQFrameDecodingException
+    {
         // If this is the first read then we may be getting a protocol initiation back if
we tried to negotiate
         // an unsupported version
-        if(_firstRead && buf.hasRemaining())
+        if(_firstRead && msg.available()>0)
         {
+            msg.mark(1);
             _firstRead = false;
-            if(!_expectProtocolInitiation && buf.get(buf.position()) > 8)
+            if(!_expectProtocolInitiation && (((int)msg.readByte()) &0xff) >
8)
             {
                 _expectProtocolInitiation = true;
             }
+            msg.reset();
         }
 
         boolean enoughData = true;
@@ -140,24 +134,6 @@ public abstract class AMQDecoder<T exten
 
             }
         }
-
-        ListIterator<ByteBuffer> iter = _incompleteBuffers.listIterator();
-        while(iter.hasNext())
-        {
-            ByteBuffer next = iter.next();
-            if(next.hasRemaining())
-            {
-                if(next.position() != 0)
-                {
-                    iter.set(next.slice());
-                }
-                break;
-            }
-            else
-            {
-                iter.remove();
-            }
-        }
     }
 
 

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java Tue Jul
28 16:31:18 2015
@@ -21,11 +21,16 @@
 package org.apache.qpid.codec;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
 
 import org.apache.qpid.framing.*;
 
 public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends ClientChannelMethodProcessor>>
 {
+    private List<ByteBuffer> _incompleteBuffers = new ArrayList<ByteBuffer>();
 
     /**
      * Creates a new AMQP decoder.
@@ -37,6 +42,36 @@ public class ClientDecoder extends AMQDe
         super(false, methodProcessor);
     }
 
+    @Override
+    public void decodeBuffer(ByteBuffer buf)
+            throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
+    {
+        buf = buf.slice();
+        _incompleteBuffers.add(buf);
+        MarkableDataInput msg = new ByteBufferListDataInput(_incompleteBuffers);
+
+        decode(msg);
+
+
+        ListIterator<ByteBuffer> iter = _incompleteBuffers.listIterator();
+        while (iter.hasNext())
+        {
+            ByteBuffer next = iter.next();
+            if (next.hasRemaining())
+            {
+                if (next.position() != 0)
+                {
+                    iter.set(next.slice());
+                }
+                break;
+            }
+            else
+            {
+                iter.remove();
+            }
+        }
+
+    }
 
     void processMethod(int channelId,
                        MarkableDataInput in)

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java Tue
Jul 28 16:31:18 2015
@@ -28,7 +28,7 @@ import java.nio.ByteBuffer;
 
 public interface MarkableDataInput extends DataInput
 {
-    public void mark(int pos);
+    public void mark(int readAhead);
     public void reset() throws IOException;
 
     int available() throws IOException;

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java Tue Jul
28 16:31:18 2015
@@ -21,6 +21,7 @@
 package org.apache.qpid.codec;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.qpid.framing.*;
 
@@ -37,6 +38,12 @@ public class ServerDecoder extends AMQDe
         super(true, methodProcessor);
     }
 
+    public void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException,
IOException
+    {
+        decode(new ByteBufferDataInput(buf));
+    }
+
+
     void processMethod(int channelId,
                        MarkableDataInput in)
             throws AMQFrameDecodingException, IOException

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
(original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
Tue Jul 28 16:31:18 2015
@@ -503,7 +503,7 @@ public class BasicContentHeaderPropertie
 
         _encodedForm = buffer.readAsByteBuffer(size);
 
-        ByteBufferDataInput input = new ByteBufferDataInput(_encodedForm);
+        ByteBufferDataInput input = new ByteBufferDataInput(_encodedForm.slice());
 
         decode(input);
 

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java?rev=1693129&r1=1693128&r2=1693129&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java
(original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java
Tue Jul 28 16:31:18 2015
@@ -28,10 +28,12 @@ public class ByteBufferDataInput impleme
 {
     private final ByteBuffer _underlying;
     private int _mark;
+    private final int _offset;
 
     public ByteBufferDataInput(ByteBuffer underlying)
     {
-        _underlying = underlying.slice();
+        _underlying = underlying;
+        _offset = underlying.position();
     }
 
     public void readFully(byte[] b)
@@ -55,7 +57,7 @@ public class ByteBufferDataInput impleme
     public int skipBytes(int n)
     {
         _underlying.position(_underlying.position()+n);
-        return _underlying.position();
+        return _underlying.position()-_offset;
     }
 
     public boolean readBoolean()
@@ -143,12 +145,12 @@ public class ByteBufferDataInput impleme
 
     public int position()
     {
-        return _underlying.position();
+        return _underlying.position()-_offset;
     }
 
     public void position(int position)
     {
-        _underlying.position(position);
+        _underlying.position(position+_offset);
     }
 
     public int length()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message