qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1394477 - in /qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton: codec/CompositeWritableBuffer.java engine/impl/DeliveryImpl.java engine/impl/TransportImpl.java engine/impl/TransportSession.java
Date Fri, 05 Oct 2012 12:17:39 GMT
Author: rgodfrey
Date: Fri Oct  5 12:17:39 2012
New Revision: 1394477

URL: http://svn.apache.org/viewvc?rev=1394477&view=rev
Log:
PROTON-59 : max frame size not respected, large message support in proton-j

Modified:
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java?rev=1394477&r1=1394476&r2=1394477&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java
(original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java
Fri Oct  5 12:17:39 2012
@@ -153,7 +153,7 @@ public class CompositeWritableBuffer imp
             else
             {
                 int relativePosition = currentPosition-position;
-                if(relativePosition >= _second.position())
+                if(relativePosition <= _second.position())
                 {
                     _second.position(_second.position()-relativePosition);
                 }

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1394477&r1=1394476&r2=1394477&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
(original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
Fri Oct  5 12:17:39 2012
@@ -55,6 +55,7 @@ public class DeliveryImpl implements Del
     private boolean _complete;
     private boolean _updated;
     private boolean _done;
+    private int _offset;
 
     public DeliveryImpl(final byte[] tag, final LinkImpl link, DeliveryImpl previous)
     {
@@ -168,7 +169,8 @@ public class DeliveryImpl implements Del
             //TODO - should only be if no bytes left
             consumed = Math.min(size, _dataSize);
 
-            System.arraycopy(_data, 0, bytes, offset, consumed);
+            System.arraycopy(_data, _offset, bytes, offset, consumed);
+            _offset += consumed;
             _dataSize -= consumed;
         }
         else
@@ -298,10 +300,6 @@ public class DeliveryImpl implements Del
 
     void setTransportWorkNext(DeliveryImpl transportWorkNext)
     {
-        if(transportWorkNext == this)
-        {
-            (new Exception("Aaaargh")).printStackTrace();
-        }
         _transportWorkNext = transportWorkNext;
     }
 
@@ -340,11 +338,11 @@ public class DeliveryImpl implements Del
         {
             byte[] oldData = _data;
             _data = new byte[oldData.length + _dataSize];
-            System.arraycopy(oldData,0,_data,0,_dataSize);
+            System.arraycopy(oldData,_offset,_data,0,_dataSize);
+            _offset = 0;
         }
-        System.arraycopy(bytes,offset,_data,_dataSize,length);
+        System.arraycopy(bytes,offset,_data,_dataSize+_offset,length);
         _dataSize+=length;
-//        addToWorkList();
         addToTransportWorkList();
         return length;  //TODO - Implement.
     }
@@ -356,7 +354,7 @@ public class DeliveryImpl implements Del
 
     int getDataOffset()
     {
-        return 0;  //TODO - Implement.
+        return _offset;
     }
 
     int getDataLength()
@@ -376,7 +374,7 @@ public class DeliveryImpl implements Del
 
     public void setDataOffset(int arrayOffset)
     {
-        // TODO - implement
+        _offset = arrayOffset;
     }
 
     public boolean isWritable()

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1394477&r1=1394476&r2=1394477&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
(original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
Fri Oct  5 12:17:39 2012
@@ -17,18 +17,21 @@
 
 package org.apache.qpid.proton.engine.impl;
 
+import java.util.EnumSet;
 import org.apache.qpid.proton.codec.CompositeWritableBuffer;
 import org.apache.qpid.proton.codec.DecoderImpl;
 import org.apache.qpid.proton.codec.EncoderImpl;
 import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.engine.Accepted;
 import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.DeliveryState;
 import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.ProtonException;
 import org.apache.qpid.proton.engine.FrameTransport;
 import org.apache.qpid.proton.engine.SaslClient;
 import org.apache.qpid.proton.engine.SaslServer;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.TransportException;
 import org.apache.qpid.proton.engine.TransportInput;
@@ -274,7 +277,7 @@ public class TransportImpl extends Endpo
                             detach.setHandle(localHandle);
 
 
-                            int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(),
detach, null);
+                            int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(),
detach, null, null);
                             written += frameBytes;
                             endpoint.clearModified();
                         }
@@ -317,7 +320,7 @@ public class TransportImpl extends Endpo
                         flow.setLinkCredit(transportLink.getLinkCredit());
                         flow.setDrain(sender.getDrain());
                         flow.setNextOutgoingId(transportSession.getNextOutgoingId());
-                        int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(),
flow, null);
+                        int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(),
flow, null, null);
                         written += frameBytes;
                         endpoint.clearModified();
                     }
@@ -361,7 +364,7 @@ public class TransportImpl extends Endpo
                     }
                     int frameBytes = writeFrame(buffer, delivery.getLink().getSession()
                                                                       .getTransportSession().getLocalChannel(),
-                                       disposition, null);
+                                       disposition, null, null);
                     written += frameBytes;
                     delivery = delivery.clearTransportWork();
                 }
@@ -399,10 +402,11 @@ public class TransportImpl extends Endpo
                     delivery.setTransportDelivery(transportDelivery);
                     sender.getSession().getTransportSession().addUnsettledOutgoing(deliveryId,
delivery);
 
-                    Transfer transfer = new Transfer();
+                    final Transfer transfer = new Transfer();
                     transfer.setDeliveryId(deliveryId);
                     transfer.setDeliveryTag(new Binary(delivery.getTag()));
                     transfer.setHandle(transportLink.getLocalHandle());
+
                     if(delivery.isSettled())
                     {
                         transfer.setSettled(Boolean.TRUE);
@@ -411,6 +415,7 @@ public class TransportImpl extends Endpo
                     {
                         transfer.setMore(true);
                     }
+
                     transfer.setMessageFormat(UnsignedInteger.ZERO);
 
                     // TODO - large frames
@@ -418,23 +423,34 @@ public class TransportImpl extends Endpo
 
                     int frameBytes = writeFrame(buffer,
                                                 sender.getSession().getTransportSession().getLocalChannel(),
-                                                transfer, payload);
+                                                transfer, payload, new PartialTransfer(transfer));
                     sender.getSession().getTransportSession().incrementOutgoingId();
 
                     written += frameBytes;
 
-                    // TODO partial consumption
-                    delivery.setData(null);
-                    delivery.setDataLength(0);
-                    delivery.setDone();
+                    if(payload == null || !payload.hasRemaining())
+                    {
+                        delivery.setData(null);
+                        delivery.setDataLength(0);
+                        delivery.setDone();
+
+                        if(delivery.getLink().current() != delivery)
+                        {
+                            transportLink.setDeliveryCount(transportLink.getDeliveryCount().add(UnsignedInteger.ONE));
+                            transportLink.setLinkCredit(transportLink.getLinkCredit().subtract(UnsignedInteger.ONE));
+                        }
 
-                    if(delivery.getLink().current() != delivery)
+                        delivery = delivery.clearTransportWork();
+
+                    }
+                    else
                     {
-                        transportLink.setDeliveryCount(transportLink.getDeliveryCount().add(UnsignedInteger.ONE));
-                        transportLink.setLinkCredit(transportLink.getLinkCredit().subtract(UnsignedInteger.ONE));
+                        delivery.setDataOffset(delivery.getDataOffset()+delivery.getDataLength()-payload.remaining());
+                        delivery.setDataLength(payload.remaining());
                     }
 
-                    delivery = delivery.clearTransportWork();
+
+
 
 
                 }
@@ -476,7 +492,7 @@ public class TransportImpl extends Endpo
                     }
                     int frameBytes = writeFrame(buffer, delivery.getLink().getSession()
                                                                       .getTransportSession().getLocalChannel(),
-                                       disposition, null);
+                                       disposition, null, null);
                     written += frameBytes;
                     if(delivery.isSettled())
                     {
@@ -526,7 +542,7 @@ public class TransportImpl extends Endpo
                             flow.setLinkCredit(transportLink.getLinkCredit());
                             flow.setDrain(receiver.getDrain());
                             flow.setNextOutgoingId(transportSession.getNextOutgoingId());
-                            int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(),
flow, null);
+                            int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(),
flow, null, null);
                             written += frameBytes;
                             if(receiver.getLocalState() == EndpointState.ACTIVE)
                             {
@@ -558,7 +574,7 @@ public class TransportImpl extends Endpo
                             flow.setOutgoingWindow(transportSession.getOutgoingWindowSize());
                             flow.setNextOutgoingId(transportSession.getNextOutgoingId());
                             flow.setNextIncomingId(transportSession.getNextIncomingId());
-                            int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(),
flow, null);
+                            int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(),
flow, null, null);
                             written += frameBytes;
                         }
                     }
@@ -625,7 +641,7 @@ public class TransportImpl extends Endpo
                                 attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
                             }
 
-                            int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(),
attach, null);
+                            int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(),
attach, null, null);
                             written += frameBytes;
                             transportLink.sentAttach();
                             if(link.getLocalState() == EndpointState.ACTIVE && (link
instanceof SenderImpl || !link.hasCredit()))
@@ -678,7 +694,7 @@ public class TransportImpl extends Endpo
 
             _isOpenSent = true;
 
-            return  writeFrame(buffer, 0, open, null);
+            return  writeFrame(buffer, 0, open, null, null);
 
         }
         return 0;
@@ -715,7 +731,7 @@ public class TransportImpl extends Endpo
                         begin.setOutgoingWindow(transportSession.getOutgoingWindowSize());
                         begin.setNextOutgoingId(transportSession.getNextOutgoingId());
 
-                        written += writeFrame(buffer, channelId, begin, null);
+                        written += writeFrame(buffer, channelId, begin, null, null);
                         transportSession.sentBegin();
                         if(session.getLocalState() == EndpointState.ACTIVE)
                         {
@@ -780,7 +796,7 @@ public class TransportImpl extends Endpo
 
                     End end = new End();
 
-                    int frameBytes = writeFrame(buffer, channel, end, null);
+                    int frameBytes = writeFrame(buffer, channel, end, null, null);
                     written += frameBytes;
                     endpoint.clearModified();
 
@@ -829,21 +845,39 @@ public class TransportImpl extends Endpo
 
                 _isCloseSent = true;
 
-                return  writeFrame(buffer, 0, close, null);
+                return  writeFrame(buffer, 0, close, null, null);
             }
         }
         return 0;
 
     }
 
-    private int writeFrame(WritableBuffer buffer, int channel, DescribedType frameBody, ByteBuffer
payload)
+    private int writeFrame(WritableBuffer buffer,
+                           int channel,
+                           DescribedType frameBody,
+                           ByteBuffer payload,
+                           Runnable onPayloadTooLarge)
     {
         int oldPosition = buffer.position();
         buffer.position(buffer.position()+8);
         _encoder.setByteBuffer(buffer);
-        _encoder.writeDescribedType(frameBody);
 
-        int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), _maxFrameSize);
+        if(payload == null || payload.remaining() < _maxFrameSize)
+        {
+            _encoder.writeDescribedType(frameBody);
+        }
+
+        if(payload != null && (payload.remaining() + buffer.position() - oldPosition)
> _maxFrameSize)
+        {
+            if(onPayloadTooLarge != null)
+            {
+                onPayloadTooLarge.run();
+            }
+            buffer.position(oldPosition+8);
+            _encoder.writeDescribedType(frameBody);
+        }
+
+        int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), _maxFrameSize
- (buffer.position() - oldPosition));
         if(payloadSize > 0)
         {
             int oldLimit = payload.limit();
@@ -1096,4 +1130,20 @@ public class TransportImpl extends Endpo
         }
     }
 
-}
+    private static class PartialTransfer implements Runnable
+    {
+        private final Transfer _transfer;
+
+        public PartialTransfer(Transfer transfer)
+        {
+            _transfer = transfer;
+        }
+
+        @Override
+        public void run()
+        {
+            _transfer.setMore(true);
+        }
+    }
+
+   }

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java?rev=1394477&r1=1394476&r2=1394477&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
(original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
Fri Oct  5 12:17:39 2012
@@ -235,9 +235,21 @@ class TransportSession
         // TODO - should this be a copy?
         if(payload != null)
         {
-            delivery.setData(payload.getArray());
-            delivery.setDataLength(payload.getLength());
-            delivery.setDataOffset(payload.getArrayOffset());
+            if(delivery.getDataLength() == 0)
+            {
+                delivery.setData(payload.getArray());
+                delivery.setDataLength(payload.getLength());
+                delivery.setDataOffset(payload.getArrayOffset());
+            }
+            else
+            {
+                byte[] data = new byte[delivery.getDataLength() + payload.getLength()];
+                System.arraycopy(delivery.getData(), delivery.getDataOffset(), data, 0, delivery.getDataLength());
+                System.arraycopy(payload.getArray(), payload.getArrayOffset(), data, delivery.getDataLength(),
payload.getLength());
+                delivery.setData(data);
+                delivery.setDataOffset(0);
+                delivery.setDataLength(data.length);
+            }
         }
         delivery.addIOWork();
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message