qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From philharveyonl...@apache.org
Subject svn commit: r1484842 [3/4] - in /qpid/proton/trunk: proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/ proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/ proton-j/proton-api/src/main/resources/ proton-j/proton/ proton...
Date Tue, 21 May 2013 15:49:53 GMT
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java?rev=1484842&r1=1484841&r2=1484842&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java Tue May 21 15:49:52 2013
@@ -20,15 +20,20 @@
  */
 package org.apache.qpid.proton.engine.impl.ssl;
 
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.proton.engine.TransportResult;
 import org.apache.qpid.proton.engine.impl.TransportWrapper;
 
 public class SslHandshakeSniffingTransportWrapper implements SslTransportWrapper
 {
+    private static final int MINIMUM_LENGTH_FOR_DETERMINATION = 5;
     private final SslTransportWrapper _secureTransportWrapper;
     private final TransportWrapper _plainTransportWrapper;
 
-    private boolean _determinationMade = false;
-    private boolean _isSecure;
+    private TransportWrapper _selectedTransportWrapper;
+
+    private final ByteBuffer _determinationBuffer = ByteBuffer.allocate(MINIMUM_LENGTH_FOR_DETERMINATION);
 
     SslHandshakeSniffingTransportWrapper(
             SslTransportWrapper secureTransportWrapper,
@@ -39,64 +44,110 @@ public class SslHandshakeSniffingTranspo
     }
 
     @Override
-    public int input(byte[] sourceBuffer, int offset, int size)
+    public ByteBuffer getInputBuffer()
     {
-        if (_determinationMade==false)
+        if (isDeterminationMade())
         {
-            byte[] zeroBasedSrcBytes = new byte[size];
-            System.arraycopy(sourceBuffer, offset, zeroBasedSrcBytes, 0, size);
-
-            _isSecure = checkForSslHandshake(zeroBasedSrcBytes);
-            _determinationMade = true;
+            return _selectedTransportWrapper.getInputBuffer();
+        }
+        else
+        {
+            return _determinationBuffer;
         }
+    }
 
-        if (_isSecure)
+    @Override
+    public TransportResult processInput()
+    {
+        if (isDeterminationMade())
         {
-            return _secureTransportWrapper.input(sourceBuffer, offset, size);
+            return _selectedTransportWrapper.processInput();
         }
         else
         {
-            return _plainTransportWrapper.input(sourceBuffer, offset, size);
+            _determinationBuffer.flip();
+            byte[] bytesInput = new byte[_determinationBuffer.remaining()];
+            _determinationBuffer.get(bytesInput);
+            makeSslDetermination(bytesInput);
+            _determinationBuffer.rewind();
+
+            // TODO what if the selected transport has insufficient capacity?? Maybe use pour, and then try to finish pouring next time round.
+            _selectedTransportWrapper.getInputBuffer().put(_determinationBuffer);
+            return _selectedTransportWrapper.processInput();
         }
     }
 
     @Override
-    public int output(byte[] destinationBuffer, int offset, int size)
+    public ByteBuffer getOutputBuffer()
+    {
+        makePlainUnlessDeterminationAlreadyMade();
+
+        return _selectedTransportWrapper.getOutputBuffer();
+    }
+
+    @Override
+    public void outputConsumed()
     {
-        if (_determinationMade == false)
+        makePlainUnlessDeterminationAlreadyMade();
+
+        _selectedTransportWrapper.outputConsumed();
+    }
+
+    @Override
+    public String getCipherName()
+    {
+        if(isSecureWrapperSelected())
+        {
+            return _secureTransportWrapper.getCipherName();
+        }
+        else
         {
-            _isSecure = false;
-            _determinationMade = true;
+            return null;
         }
+    }
+
 
-        if (_isSecure)
+    @Override
+    public String getProtocolName()
+    {
+        if (isSecureWrapperSelected())
         {
-            return _secureTransportWrapper.output(destinationBuffer, offset, size);
+            return _secureTransportWrapper.getProtocolName();
         }
         else
         {
-            return _plainTransportWrapper.output(destinationBuffer, offset, size);
+            return null;
         }
     }
 
-    @Override
-    public String getCipherName()
+    private boolean isSecureWrapperSelected()
     {
-        return _secureTransportWrapper.getCipherName();
+        return _selectedTransportWrapper == _secureTransportWrapper;
     }
 
-    @Override
-    public String getProtocolName()
+    private boolean isDeterminationMade()
     {
-        return _secureTransportWrapper.getProtocolName();
+        return _selectedTransportWrapper != null;
     }
 
+    private void makeSslDetermination(byte[] bytesInput)
+    {
+        boolean isSecure = checkForSslHandshake(bytesInput);
+        if (isSecure)
+        {
+            _selectedTransportWrapper = _secureTransportWrapper;
+        }
+        else
+        {
+            _selectedTransportWrapper = _plainTransportWrapper;
+        }
+    }
     // TODO perhaps the sniffer should save up the bytes from each
     // input call until it has sufficient bytes to make the determination
     // and only then pass them to the secure or plain wrapped transport?
     private boolean checkForSslHandshake(byte[] buf)
     {
-        if (buf.length >= 5)
+        if (buf.length >= MINIMUM_LENGTH_FOR_DETERMINATION)
         {
             /*
              * SSLv2 Client Hello format
@@ -137,4 +188,12 @@ public class SslHandshakeSniffingTranspo
             throw new IllegalArgumentException("Too few bytes (" + buf.length + ") to make SSL/plain  determination.");
         }
     }
+
+    private void makePlainUnlessDeterminationAlreadyMade()
+    {
+        if (!isDeterminationMade())
+        {
+            _selectedTransportWrapper = _plainTransportWrapper;
+        }
+    }
 }

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslImpl.java?rev=1484842&r1=1484841&r2=1484842&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslImpl.java Tue May 21 15:49:52 2013
@@ -20,10 +20,13 @@
  */
 package org.apache.qpid.proton.engine.impl.ssl;
 
+import java.nio.ByteBuffer;
+
 import org.apache.qpid.proton.ProtonUnsupportedOperationException;
 import org.apache.qpid.proton.engine.Ssl;
 import org.apache.qpid.proton.engine.SslDomain;
 import org.apache.qpid.proton.engine.SslPeerDetails;
+import org.apache.qpid.proton.engine.TransportResult;
 import org.apache.qpid.proton.engine.impl.TransportInput;
 import org.apache.qpid.proton.engine.impl.TransportOutput;
 import org.apache.qpid.proton.engine.impl.TransportWrapper;
@@ -97,17 +100,32 @@ public class SslImpl implements Ssl
         }
 
         @Override
-        public int input(byte[] bytes, int offset, int size)
+        public ByteBuffer getInputBuffer()
+        {
+            initTransportWrapperOnFirstIO();
+            return _transportWrapper.getInputBuffer();
+        }
+
+
+        @Override
+        public TransportResult processInput()
+        {
+            initTransportWrapperOnFirstIO();
+            return _transportWrapper.processInput();
+        }
+
+        @Override
+        public ByteBuffer getOutputBuffer()
         {
             initTransportWrapperOnFirstIO();
-            return _transportWrapper.input(bytes, offset, size);
+            return _transportWrapper.getOutputBuffer();
         }
 
         @Override
-        public int output(byte[] bytes, int offset, int size)
+        public void outputConsumed()
         {
             initTransportWrapperOnFirstIO();
-            return _transportWrapper.output(bytes, offset, size);
+            _transportWrapper.outputConsumed();
         }
 
         @Override

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1484842&r1=1484841&r2=1484842&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Tue May 21 15:49:52 2013
@@ -50,19 +50,18 @@ import org.apache.qpid.proton.messenger.
 import org.apache.qpid.proton.messenger.MessengerFactory;
 import org.apache.qpid.proton.messenger.Status;
 import org.apache.qpid.proton.messenger.Tracker;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.Target;
 
 public class MessengerImpl implements Messenger
 {
+    @SuppressWarnings("rawtypes")
     private static ProtonFactoryLoader protonFactoryLoader = new ProtonFactoryLoader();
 
     private static final EnumSet<EndpointState> UNINIT = EnumSet.of(EndpointState.UNINITIALIZED);
     private static final EnumSet<EndpointState> ACTIVE = EnumSet.of(EndpointState.ACTIVE);
     private static final EnumSet<EndpointState> CLOSED = EnumSet.of(EndpointState.CLOSED);
     private static final EnumSet<EndpointState> ANY = EnumSet.allOf(EndpointState.class);
-    private static final Accepted ACCEPTED = new Accepted();
 
     private final Logger _logger = Logger.getLogger("proton.messenger");
     private final String _name;
@@ -76,7 +75,6 @@ public class MessengerImpl implements Me
     private boolean _unlimitedCredit = false;
     private static final int _creditBatch = 10;
     private int _credit;
-    private int _distributed;
     private TrackerQueue _incoming = new TrackerQueue();
     private TrackerQueue _outgoing = new TrackerQueue();
 
@@ -128,8 +126,12 @@ public class MessengerImpl implements Me
 
     public void stop()
     {
+        if(_logger.isLoggable(Level.FINE))
+        {
+            _logger.fine(this + " about to stop");
+        }
         //close all connections
-        for (Connector c : _driver.connectors())
+        for (Connector<?> c : _driver.connectors())
         {
             Connection connection = c.getConnection();
             connection.close();
@@ -143,7 +145,7 @@ public class MessengerImpl implements Me
             }
         }
         //stop listeners
-        for (Listener l : _driver.listeners())
+        for (Listener<?> l : _driver.listeners())
         {
             try
             {
@@ -167,6 +169,10 @@ public class MessengerImpl implements Me
 
     public void put(Message m) throws MessengerException
     {
+        if(_logger.isLoggable(Level.FINE))
+        {
+            _logger.fine(this + " about to put message: " + m);
+        }
         try
         {
             URI address = new URI(m.getAddress());
@@ -204,11 +210,19 @@ public class MessengerImpl implements Me
 
     public void send() throws TimeoutException
     {
+        if(_logger.isLoggable(Level.FINE))
+        {
+            _logger.fine(this + " about to send");
+        }
         waitUntil(_sentSettled);
     }
 
     public void recv(int n) throws TimeoutException
     {
+        if(_logger.isLoggable(Level.FINE))
+        {
+            _logger.fine(this + " about to wait for up to " + n + " messages to be received");
+        }
         if (n == -1) {
             _unlimitedCredit = true;
         } else {
@@ -222,7 +236,7 @@ public class MessengerImpl implements Me
 
     public Message get()
     {
-        for (Connector c : _driver.connectors())
+        for (Connector<?> c : _driver.connectors())
         {
             Connection connection = c.getConnection();
             _logger.log(Level.FINE, "Attempting to get message from " + connection);
@@ -236,7 +250,6 @@ public class MessengerImpl implements Me
                     Message message = _messageFactory.createMessage();
                     message.decode(_buffer, 0, size);
                     _incoming.add(delivery);
-                    _distributed--;
                     delivery.getLink().advance();
                     return message;
                 }
@@ -260,15 +273,24 @@ public class MessengerImpl implements Me
         try
         {
             URI address = new URI(listen ? source.replace("~", "") : source);
-            if (address.getHost() == null) throw new MessengerException("Invalid source address (hostname cannot be null): " + source);
+            String hostName = address.getHost();
+            if (hostName == null) throw new MessengerException("Invalid source address (hostname cannot be null): " + source);
             int port = address.getPort() < 0 ? defaultPort(address.getScheme()) : address.getPort();
             if (listen)
             {
-                _driver.createListener(address.getHost(), port, null);
+                if(_logger.isLoggable(Level.FINE))
+                {
+                    _logger.fine(this + " about to subscribe to source " + source + " using address " + hostName + ":" + port);
+                }
+                _driver.createListener(hostName, port, null);
             }
             else
             {
-                getLink(address.getHost(), port, new ReceiverFinder(cleanPath(address.getPath())));
+                if(_logger.isLoggable(Level.FINE))
+                {
+                    _logger.fine(this + " about to subscribe to source " + source);
+                }
+                getLink(hostName, port, new ReceiverFinder(cleanPath(address.getPath())));
             }
         }
         catch (URISyntaxException e)
@@ -341,7 +363,7 @@ public class MessengerImpl implements Me
     private int queued(boolean outgoing)
     {
         int count = 0;
-        for (Connector c : _driver.connectors())
+        for (Connector<?> c : _driver.connectors())
         {
             Connection connection = c.getConnection();
             for (Link link : new Links(connection, ACTIVE, ANY))
@@ -384,15 +406,9 @@ public class MessengerImpl implements Me
         return total;
     }
 
-    private void process()
-    {
-        processAllConnectors();
-        processActive();
-    }
-
     private void processAllConnectors()
     {
-        for (Connector c : _driver.connectors())
+        for (Connector<?> c : _driver.connectors())
         {
             try
             {
@@ -408,9 +424,9 @@ public class MessengerImpl implements Me
     private void processActive()
     {
         //process active listeners
-        for (Listener l = _driver.listener(); l != null; l = _driver.listener())
+        for (Listener<?> l = _driver.listener(); l != null; l = _driver.listener())
         {
-            Connector c = l.accept();
+            Connector<?> c = l.accept();
             Connection connection = _engineFactory.createConnection();
             connection.setContainer(_name);
             c.setConnection(connection);
@@ -425,7 +441,7 @@ public class MessengerImpl implements Me
             connection.open();
         }
         //process active connectors, handling opened & closed connections as needed
-        for (Connector c = _driver.connector(); c != null; c = _driver.connector())
+        for (Connector<?> c = _driver.connector(); c != null; c = _driver.connector())
         {
             _logger.log(Level.FINE, "Processing active connector " + c);
             try
@@ -536,13 +552,15 @@ public class MessengerImpl implements Me
         }
         if (!done)
         {
+            _logger.log(Level.SEVERE, String.format(
+                    "Timeout when waiting for condition %s after %s ms", condition, timeout));
             throw new TimeoutException();
         }
     }
 
     private Connection lookup(String host, String service)
     {
-        for (Connector c : _driver.connectors())
+        for (Connector<?> c : _driver.connectors())
         {
             Connection connection = c.getConnection();
             if (host.equals(connection.getRemoteContainer()) || service.equals(connection.getContext()))
@@ -567,14 +585,13 @@ public class MessengerImpl implements Me
     private void reclaimCredit(int credit)
     {
         _credit += credit;
-        _distributed -= credit;
     }
 
     private void distributeCredit()
     {
         int linkCt = 0;
         // @todo track the number of opened receive links
-        for (Connector c : _driver.connectors())
+        for (Connector<?> c : _driver.connectors())
         {
             Connection connection = c.getConnection();
             for (Link link : new Links(connection, ACTIVE, ANY))
@@ -591,7 +608,7 @@ public class MessengerImpl implements Me
         }
 
         int batch = (_credit < linkCt) ? 1 : (_credit/linkCt);
-        for (Connector c : _driver.connectors())
+        for (Connector<?> c : _driver.connectors())
         {
             Connection connection = c.getConnection();
             for (Link link : new Links(connection, ACTIVE, ANY))
@@ -604,7 +621,6 @@ public class MessengerImpl implements Me
                         int need = batch - have;
                         int amount = (_credit < need) ? _credit : need;
                         ((Receiver) link).flow(amount);
-                        _distributed += amount;
                         _credit -= amount;
                         if (_credit == 0) return;
                     }
@@ -623,7 +639,7 @@ public class MessengerImpl implements Me
         public boolean test()
         {
             //are all sent messages settled?
-            for (Connector c : _driver.connectors())
+            for (Connector<?> c : _driver.connectors())
             {
                 Connection connection = c.getConnection();
                 for (Link link : new Links(connection, ACTIVE, ANY))
@@ -681,7 +697,7 @@ public class MessengerImpl implements Me
         public boolean test()
         {
             //do we have at least one message?
-            for (Connector c : _driver.connectors())
+            for (Connector<?> c : _driver.connectors())
             {
                 Connection connection = c.getConnection();
                 Delivery delivery = connection.getWorkHead();
@@ -789,7 +805,7 @@ public class MessengerImpl implements Me
         Connection connection = lookup(host, service);
         if (connection == null)
         {
-            Connector connector = _driver.createConnector(host, port, null);
+            Connector<?> connector = _driver.createConnector(host, port, null);
             _logger.log(Level.FINE, "Connecting to " + host + ":" + port);
             connection = _engineFactory.createConnection();
             connection.setContainer(_name);
@@ -971,19 +987,30 @@ public class MessengerImpl implements Me
         else return 5672;
     }
 
+    @SuppressWarnings("unchecked")
     private static EngineFactory defaultEngineFactory()
     {
         return (EngineFactory) protonFactoryLoader.loadFactory(EngineFactory.class);
     }
 
+    @SuppressWarnings("unchecked")
     private static DriverFactory defaultDriverFactory()
     {
         return (DriverFactory) protonFactoryLoader.loadFactory(DriverFactory.class);
     }
 
+    @SuppressWarnings("unchecked")
     private static MessageFactory defaultMessageFactory()
     {
         return (MessageFactory) protonFactoryLoader.loadFactory(MessageFactory.class);
     }
 
+    @Override
+    public String toString()
+    {
+        StringBuilder builder = new StringBuilder();
+        builder.append("MessengerImpl [_name=").append(_name).append("]");
+        return builder.toString();
+    }
+
 }

Copied: qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/AmqpFramer.java (from r1483522, qpid/proton/trunk/tests/java/org/apache/qpid/proton/systemtests/engine/AmqpFramer.java)
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/AmqpFramer.java?p2=qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/AmqpFramer.java&p1=qpid/proton/trunk/tests/java/org/apache/qpid/proton/systemtests/engine/AmqpFramer.java&r1=1483522&r2=1484842&rev=1484842&view=diff
==============================================================================
--- qpid/proton/trunk/tests/java/org/apache/qpid/proton/systemtests/engine/AmqpFramer.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/AmqpFramer.java Tue May 21 15:49:52 2013
@@ -17,18 +17,22 @@
  * under the License.
  *
  */
-package org.apache.qpid.proton.systemtests.engine;
+package org.apache.qpid.proton.engine.impl;
 
 import static org.junit.Assert.assertEquals;
 
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.proton.amqp.security.SaslFrameBody;
 import org.apache.qpid.proton.amqp.transport.FrameBody;
 import org.apache.qpid.proton.codec.AMQPDefinedTypes;
 import org.apache.qpid.proton.codec.DecoderImpl;
 import org.apache.qpid.proton.codec.EncoderImpl;
 import org.apache.qpid.proton.codec.WritableBuffer;
 
+/**
+ * Generates frames as per section 2.3.1 of the AMQP spec
+ */
 public class AmqpFramer
 {
     // My test data is generated by the decoder and encoder from proton-j
@@ -56,6 +60,20 @@ public class AmqpFramer
 
     public byte[] generateFrame(int channel, byte[] extendedHeader, FrameBody frameBody)
     {
+        return generateFrame(channel, extendedHeader, frameBody, (byte)0);
+    }
+
+    public byte[] generateSaslFrame(int channel, byte[] extendedHeader, SaslFrameBody frameBody)
+    {
+        return generateFrame(channel, extendedHeader, frameBody, (byte)1);
+    }
+
+    /**
+     * @param amqpFrameType indicates either AMQP or SASL
+     * @param frameBody is currently expected to be a {@link FrameBody} or a {@link SaslFrameBody}
+     */
+    public byte[] generateFrame(int channel, byte[] extendedHeader, Object frameBody, byte amqpFrameType)
+    {
         assertEquals("Extended header must be multiple of 4 bytes", 0, extendedHeader.length % 4);
         int numberOfExtendedHeaderFourByteWords = extendedHeader.length / 4;
 
@@ -75,11 +93,10 @@ public class AmqpFramer
         int frameSize = buffer.position();
         int framePreambleSizeInFourByteWords = 2;
         byte dataOffsetFourByteWords = (byte)(framePreambleSizeInFourByteWords + numberOfExtendedHeaderFourByteWords);
-        byte amqpFrameType = 0;
         buffer.rewind();
         buffer.putInt(frameSize);
         buffer.put(dataOffsetFourByteWords);
-        buffer.put(amqpFrameType); // AMQP_FRAME_TYPE
+        buffer.put(amqpFrameType);
         buffer.putShort((short)channel);
 
         byte[] target = new byte[frameSize];

Added: qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java?rev=1484842&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java (added)
+++ qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java Tue May 21 15:49:52 2013
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.proton.engine.impl;
+
+import static org.apache.qpid.proton.engine.Transport.DEFAULT_MAX_FRAME_SIZE;
+import static org.apache.qpid.proton.engine.impl.AmqpHeader.HEADER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.matchers.JUnitMatchers.containsString;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.proton.amqp.transport.Close;
+import org.apache.qpid.proton.amqp.transport.FrameBody;
+import org.apache.qpid.proton.amqp.transport.Open;
+import org.apache.qpid.proton.codec.AMQPDefinedTypes;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.engine.TransportException;
+import org.apache.qpid.proton.engine.TransportResult;
+import org.apache.qpid.proton.engine.TransportResult.Status;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.hamcrest.Description;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.ArgumentMatcher;
+import org.mockito.InOrder;
+
+// TODO test a frame with a payload (potentially followed by another frame)
+public class FrameParserTest
+{
+    private FrameHandler _mockFrameHandler = mock(FrameHandler.class);
+    private DecoderImpl _decoder = new DecoderImpl();
+    private EncoderImpl _encoder = new EncoderImpl(_decoder);
+    private final FrameParser _frameParser = new FrameParser(_mockFrameHandler, _decoder, DEFAULT_MAX_FRAME_SIZE);
+
+    private final AmqpFramer _amqpFramer = new AmqpFramer();
+
+    @Rule
+    public ExpectedException _expectedException = ExpectedException.none();
+
+    @Before
+    public void setUp()
+    {
+        AMQPDefinedTypes.registerAllTypes(_decoder, _encoder);
+
+        when(_mockFrameHandler.isHandlingFrames()).thenReturn(true);
+    }
+
+    @Test
+    public void testInputOfInvalidProtocolHeader_causesErrorAndRefusesFurtherInput()
+    {
+        ByteBuffer buffer = _frameParser.getInputBuffer();
+        buffer.put("hello".getBytes());
+        TransportResult result = _frameParser.processInput();
+
+        String headerMismatchMessage = "AMQP header mismatch";
+        assertThat(result.getErrorDescription(), containsString(headerMismatchMessage));
+        assertEquals(Status.ERROR, result.getStatus());
+
+        _expectedException.expect(TransportException.class);
+        _expectedException.expectMessage(headerMismatchMessage);
+        _frameParser.getInputBuffer();
+    }
+
+    @Test
+    public void testInputOfValidProtocolHeader()
+    {
+        ByteBuffer buffer = _frameParser.getInputBuffer();
+        buffer.put(HEADER);
+        _frameParser.processInput().checkIsOk();
+
+        assertNotNull(_frameParser.getInputBuffer());
+    }
+
+    @Test
+    public void testInputOfValidProtocolHeaderInMultipleChunks()
+    {
+        {
+            ByteBuffer buffer = _frameParser.getInputBuffer();
+            buffer.put(HEADER, 0, 2);
+            _frameParser.processInput().checkIsOk();
+        }
+
+        {
+            ByteBuffer buffer = _frameParser.getInputBuffer();
+            buffer.put(HEADER, 2, HEADER.length - 2);
+            _frameParser.processInput().checkIsOk();
+        }
+
+        assertNotNull(_frameParser.getInputBuffer());
+    }
+
+    @Test
+    public void testInputOfValidFrame_invokesFrameTransportCallback()
+    {
+        sendHeader().checkIsOk();
+
+        // now send an open frame
+        ByteBuffer buffer = _frameParser.getInputBuffer();
+
+        Open openFrame = generateOpenFrame();
+        int channel = 0;
+        byte[] frame = _amqpFramer.generateFrame(channel, openFrame);
+        buffer.put(frame);
+
+        _frameParser.processInput().checkIsOk();
+        verify(_mockFrameHandler).handleFrame(frameMatching(channel, openFrame));
+    }
+
+    @Test
+    public void testInputOfFrameInMultipleChunks_invokesFrameTransportCallback()
+    {
+        sendHeader().checkIsOk();
+
+        Open openFrame = generateOpenFrame();
+        int channel = 0;
+        byte[] frame = _amqpFramer.generateFrame(channel, openFrame);
+        int lengthOfFirstChunk = 2;
+        int lengthOfSecondChunk = (frame.length - lengthOfFirstChunk)/2;
+        int lengthOfThirdChunk = frame.length - lengthOfFirstChunk - lengthOfSecondChunk;
+
+        // send the first chunk
+        {
+            ByteBuffer buffer = _frameParser.getInputBuffer();
+
+            buffer.put(frame, 0, lengthOfFirstChunk);
+
+            _frameParser.processInput().checkIsOk();
+
+            verify(_mockFrameHandler, never()).handleFrame(any(TransportFrame.class));
+        }
+
+        // send the second chunk
+        {
+            ByteBuffer buffer = _frameParser.getInputBuffer();
+
+            int secondChunkOffset = lengthOfFirstChunk;
+            buffer.put(frame, secondChunkOffset, lengthOfSecondChunk);
+
+            _frameParser.processInput().checkIsOk();
+            verify(_mockFrameHandler, never()).handleFrame(any(TransportFrame.class));
+        }
+
+        // send the third and final chunk
+        {
+            ByteBuffer buffer = _frameParser.getInputBuffer();
+
+            int thirdChunkOffset = lengthOfFirstChunk + lengthOfSecondChunk;
+            buffer.put(frame, thirdChunkOffset, lengthOfThirdChunk);
+
+            _frameParser.processInput().checkIsOk();
+            verify(_mockFrameHandler).handleFrame(frameMatching(channel, openFrame));
+        }
+    }
+
+    @Test
+    public void testInputOfTwoFrames_invokesFrameTransportTwice()
+    {
+        sendHeader().checkIsOk();
+
+        int channel = 0;
+        Open openFrame = generateOpenFrame();
+        byte[] openFrameBytes = _amqpFramer.generateFrame(channel, openFrame);
+
+        Close closeFrame = generateCloseFrame();
+        byte[] closeFrameBytes = _amqpFramer.generateFrame(channel, closeFrame);
+
+        _frameParser.getInputBuffer()
+            .put(openFrameBytes)
+            .put(closeFrameBytes);
+
+        _frameParser.processInput().checkIsOk();
+
+        InOrder inOrder = inOrder(_mockFrameHandler);
+        inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, openFrame));
+        inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, closeFrame));
+    }
+
+    @Test
+    public void testFrameTransportTemporarilyRefusesOpenFrame()
+    {
+        when(_mockFrameHandler.isHandlingFrames()).thenReturn(false);
+
+        sendHeader().checkIsOk();
+
+        // now send an open frame
+        int channel = 0;
+        Open openFrame = generateOpenFrame();
+        {
+            ByteBuffer buffer = _frameParser.getInputBuffer();
+
+            byte[] frame = _amqpFramer.generateFrame(channel, openFrame);
+            buffer.put(frame);
+
+            _frameParser.processInput().checkIsOk();
+        }
+
+        verify(_mockFrameHandler, never()).handleFrame(any(TransportFrame.class));
+
+        when(_mockFrameHandler.isHandlingFrames()).thenReturn(true);
+
+        // now ensure that the held frame gets sent on second input
+        Close closeFrame = generateCloseFrame();
+        {
+            ByteBuffer buffer = _frameParser.getInputBuffer();
+
+            byte[] frame = _amqpFramer.generateFrame(channel, closeFrame);
+            buffer.put(frame);
+
+            _frameParser.processInput().checkIsOk();
+        }
+
+        InOrder inOrder = inOrder(_mockFrameHandler);
+        inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, openFrame));
+        inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, closeFrame));
+    }
+
+    @Test
+    public void testFrameTransportTemporarilyRefusesOpenAndCloseFrame()
+    {
+        when(_mockFrameHandler.isHandlingFrames()).thenReturn(false);
+
+        sendHeader().checkIsOk();
+
+        // now send an open frame
+        int channel = 0;
+        Open openFrame = generateOpenFrame();
+        {
+            ByteBuffer buffer = _frameParser.getInputBuffer();
+
+            byte[] frame = _amqpFramer.generateFrame(channel, openFrame);
+            buffer.put(frame);
+
+            _frameParser.processInput().checkIsOk();
+        }
+        verify(_mockFrameHandler, never()).handleFrame(any(TransportFrame.class));
+
+        // now send a close frame
+        Close closeFrame = generateCloseFrame();
+        {
+            ByteBuffer buffer = _frameParser.getInputBuffer();
+
+            byte[] frame = _amqpFramer.generateFrame(channel, closeFrame);
+            buffer.put(frame);
+
+            _frameParser.processInput().checkIsOk();
+        }
+        verify(_mockFrameHandler, never()).handleFrame(any(TransportFrame.class));
+
+        when(_mockFrameHandler.isHandlingFrames()).thenReturn(true);
+
+        _frameParser.flush();
+
+        InOrder inOrder = inOrder(_mockFrameHandler);
+        inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, openFrame));
+        inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, closeFrame));
+    }
+
+    private TransportResult sendHeader()
+    {
+        ByteBuffer buffer = _frameParser.getInputBuffer();
+        buffer.put(HEADER);
+        return _frameParser.processInput();
+    }
+
+    private Open generateOpenFrame()
+    {
+        Open open = new Open();
+        open.setContainerId("containerid");
+        return open;
+    }
+
+    private Close generateCloseFrame()
+    {
+        Close close = new Close();
+        return close;
+    }
+
+    private TransportFrame frameMatching(int channel, FrameBody frameBody)
+    {
+        return argThat(new TransportFrameMatcher(channel, frameBody));
+    }
+
+    private class TransportFrameMatcher extends ArgumentMatcher<TransportFrame>
+    {
+        private final TransportFrame _expectedTransportFrame;
+
+        TransportFrameMatcher(int expectedChannel, FrameBody expectedFrameBody)
+        {
+            _expectedTransportFrame = new TransportFrame(expectedChannel, expectedFrameBody, null);
+        }
+
+        @Override
+        public boolean matches(Object transportFrameObj)
+        {
+            if(transportFrameObj == null)
+            {
+                return false;
+            }
+
+            TransportFrame transportFrame = (TransportFrame)transportFrameObj;
+            FrameBody actualFrame = transportFrame.getBody();
+
+            int _expectedChannel = _expectedTransportFrame.getChannel();
+            FrameBody expectedFrame = _expectedTransportFrame.getBody();
+
+            return _expectedChannel == transportFrame.getChannel()
+                    && expectedFrame.getClass().equals(actualFrame.getClass());
+        }
+
+        @Override
+        public void describeTo(Description description)
+        {
+            super.describeTo(description);
+            description.appendText("Expected: " + _expectedTransportFrame);
+        }
+    }
+}

Added: qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/SaslFrameParserTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/SaslFrameParserTest.java?rev=1484842&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/SaslFrameParserTest.java (added)
+++ qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/SaslFrameParserTest.java Tue May 21 15:49:52 2013
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.proton.engine.impl;
+
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+import static org.junit.matchers.JUnitMatchers.containsString;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.security.SaslFrameBody;
+import org.apache.qpid.proton.amqp.security.SaslInit;
+import org.apache.qpid.proton.amqp.transport.FrameBody;
+import org.apache.qpid.proton.amqp.transport.Open;
+import org.apache.qpid.proton.codec.AMQPDefinedTypes;
+import org.apache.qpid.proton.codec.ByteBufferDecoder;
+import org.apache.qpid.proton.codec.DecodeException;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.engine.TransportResult;
+import org.apache.qpid.proton.engine.TransportResult.Status;
+import org.junit.Test;
+
+/**
+ * TODO test case where header is malformed
+ * TODO test case where input provides frame and half etc
+ */
+public class SaslFrameParserTest
+{
+    private final SaslFrameHandler _mockSaslFrameHandler = mock(SaslFrameHandler.class);
+    private final ByteBufferDecoder _mockDecoder = mock(ByteBufferDecoder.class);
+    private final SaslFrameParser _frameParser;
+    private final SaslFrameParser _frameParserWithMockDecoder = new SaslFrameParser(_mockSaslFrameHandler, _mockDecoder);
+    private final AmqpFramer _amqpFramer = new AmqpFramer();
+
+    private final SaslInit _saslFrameBody;
+    private final ByteBuffer _saslFrameBytes;
+
+    public SaslFrameParserTest()
+    {
+        DecoderImpl decoder = new DecoderImpl();
+        EncoderImpl encoder = new EncoderImpl(decoder);
+        AMQPDefinedTypes.registerAllTypes(decoder,encoder);
+
+        _frameParser = new SaslFrameParser(_mockSaslFrameHandler, decoder);
+        _saslFrameBody = new SaslInit();
+        _saslFrameBody.setMechanism(Symbol.getSymbol("unused"));
+        _saslFrameBytes = ByteBuffer.wrap(_amqpFramer.generateSaslFrame(0, new byte[0], _saslFrameBody));
+    }
+
+    @Test
+    public void testInputOfValidFrame()
+    {
+        sendAmqpSaslHeader(_frameParser);
+
+        when(_mockSaslFrameHandler.isDone()).thenReturn(false);
+
+        TransportResult result = _frameParser.input(_saslFrameBytes);
+        result.checkIsOk();
+
+        verify(_mockSaslFrameHandler).handle(isA(SaslInit.class), (Binary)isNull());
+    }
+
+    @Test
+    public void testInputOfInvalidFrame_causesErrorAndRefusesFurtherInput()
+    {
+        sendAmqpSaslHeader(_frameParserWithMockDecoder);
+
+        String exceptionMessage = "dummy decode exception";
+        when(_mockDecoder.readObject()).thenThrow(new DecodeException(exceptionMessage));
+
+        // We send a valid frame but the mock decoder has been configured to reject it
+        TransportResult result = _frameParserWithMockDecoder.input(_saslFrameBytes);
+
+        assertEquals(Status.ERROR, result.getStatus());
+        assertThat(result.getErrorDescription(), containsString(exceptionMessage));
+
+        verify(_mockSaslFrameHandler, never()).handle(any(SaslFrameBody.class), any(Binary.class));
+
+        // Check that any further interaction causes an error TransportResult.
+        result = _frameParserWithMockDecoder.input(ByteBuffer.wrap("".getBytes()));
+        assertEquals(Status.ERROR, result.getStatus());
+    }
+
+    @Test
+    public void testInputOfNonSaslFrame_causesErrorAndRefusesFurtherInput()
+    {
+        sendAmqpSaslHeader(_frameParserWithMockDecoder);
+
+        FrameBody nonSaslFrame = new Open();
+        when(_mockDecoder.readObject()).thenReturn(nonSaslFrame);
+
+        // We send a valid frame but the mock decoder has been configured to reject it
+        TransportResult result = _frameParserWithMockDecoder.input(_saslFrameBytes);
+
+        assertEquals(Status.ERROR, result.getStatus());
+        assertThat(result.getErrorDescription(), containsString("Unexpected frame type encountered."));
+
+        verify(_mockSaslFrameHandler, never()).handle(any(SaslFrameBody.class), any(Binary.class));
+
+        // Check that any further interaction causes an error TransportResult.
+        result = _frameParserWithMockDecoder.input(ByteBuffer.wrap("".getBytes()));
+        assertEquals(Status.ERROR, result.getStatus());
+    }
+
+    private void sendAmqpSaslHeader(SaslFrameParser saslFrameParser)
+    {
+        saslFrameParser.input(ByteBuffer.wrap(AmqpHeader.SASL_HEADER)).checkIsOk();
+    }
+
+}

Added: qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java?rev=1484842&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java (added)
+++ qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java Tue May 21 15:49:52 2013
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.engine.impl;
+
+import static org.apache.qpid.proton.engine.impl.AmqpHeader.HEADER;
+import static org.apache.qpid.proton.engine.impl.TransportTestHelper.stringOfLength;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.proton.amqp.transport.Begin;
+import org.apache.qpid.proton.amqp.transport.Open;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.TransportException;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TransportImplTest
+{
+    @SuppressWarnings("deprecation")
+    private TransportImpl _transport = new TransportImpl();
+
+    private static final int CHANNEL_ID = 1;
+    private static final TransportFrame TRANSPORT_FRAME_BEGIN = new TransportFrame(CHANNEL_ID, new Begin(), null);
+    private static final TransportFrame TRANSPORT_FRAME_OPEN = new TransportFrame(CHANNEL_ID, new Open(), null);
+
+    @Rule
+    public ExpectedException _expectedException = ExpectedException.none();
+
+    @Test
+    public void testInput()
+    {
+        ByteBuffer buffer = _transport.getInputBuffer();
+        buffer.put(HEADER);
+        _transport.processInput().checkIsOk();
+
+        assertNotNull(_transport.getInputBuffer());
+    }
+
+    @Test
+    public void testUseOfProcessInputBeforeGetInputBuffer_causesIllegalStateException()
+    {
+        _expectedException.expect(IllegalStateException.class);
+        _transport.processInput();
+    }
+
+    /**
+     * Empty input is always allowed by {@link Transport#getInputBuffer()} and
+     * {@link Transport#processInput()}, in contrast to the old API.
+     *
+     * @see TransportImplTest#testEmptyInputBeforeBindUsingOldApi_causesTransportException()
+     */
+    @Test
+    public void testEmptyInput_isAllowed()
+    {
+        _transport.getInputBuffer();
+        _transport.processInput().checkIsOk();
+    }
+
+    /**
+     * Tests the end-of-stream behaviour specified by {@link Transport#input(byte[], int, int)}.
+     */
+    @Test
+    public void testEmptyInputBeforeBindUsingOldApi_causesTransportException()
+    {
+        _expectedException.expect(TransportException.class);
+        _expectedException.expectMessage("Unexpected EOS when remote connection not closed: connection aborted");
+        _transport.input(new byte [0], 0, 0);
+    }
+
+    /**
+     * TODO it's not clear why empty input is specifically allowed in this case.
+     */
+    @Test
+    public void testEmptyInputWhenRemoteConnectionIsClosedUsingOldApi_isAllowed()
+    {
+        @SuppressWarnings("deprecation")
+        ConnectionImpl connection = new ConnectionImpl();
+        _transport.bind(connection);
+        connection.setRemoteState(EndpointState.CLOSED);
+        _transport.input(new byte [0], 0, 0);
+    }
+
+    @Test
+    public void testOutupt()
+    {
+        {
+            // TransportImpl's underlying output spontaneously outputs the AMQP header
+            final ByteBuffer outputBuffer = _transport.getOutputBuffer();
+            assertEquals(HEADER.length, outputBuffer.remaining());
+
+            byte[] outputBytes = new byte[HEADER.length];
+            outputBuffer.get(outputBytes);
+            assertArrayEquals(HEADER, outputBytes);
+
+            _transport.outputConsumed();
+        }
+
+        {
+            final ByteBuffer outputBuffer = _transport.getOutputBuffer();
+            assertEquals(0, outputBuffer.remaining());
+            _transport.outputConsumed();
+        }
+    }
+
+    @Test
+    public void testTransportInitiallyHandlesFrames()
+    {
+        assertTrue(_transport.isHandlingFrames());
+    }
+
+    @Test
+    public void testBoundTransport_continuesToHandleFrames()
+    {
+        @SuppressWarnings("deprecation")
+        Connection connection = new ConnectionImpl();
+
+        assertTrue(_transport.isHandlingFrames());
+
+        _transport.bind(connection);
+
+        assertTrue(_transport.isHandlingFrames());
+
+        _transport.handleFrame(TRANSPORT_FRAME_OPEN);
+
+        assertTrue(_transport.isHandlingFrames());
+    }
+
+    @Test
+    public void testUnboundTransport_stopsHandlingFrames()
+    {
+        assertTrue(_transport.isHandlingFrames());
+
+        _transport.handleFrame(TRANSPORT_FRAME_OPEN);
+
+        assertFalse(_transport.isHandlingFrames());
+    }
+
+    @Test
+    public void testHandleFrameWhenNotHandling_throwsIllegalStateException()
+    {
+        assertTrue(_transport.isHandlingFrames());
+
+        _transport.handleFrame(TRANSPORT_FRAME_OPEN);
+
+        assertFalse(_transport.isHandlingFrames());
+
+        _expectedException.expect(IllegalStateException.class);
+        _transport.handleFrame(TRANSPORT_FRAME_BEGIN);
+    }
+
+    @Test
+    public void testOutputTooBigToBeWrittenInOneGo()
+    {
+        int smallMaxFrameSize = 512;
+        _transport = new TransportImpl(smallMaxFrameSize);
+
+        @SuppressWarnings("deprecation")
+        Connection conn = new ConnectionImpl();
+        _transport.bind(conn);
+
+        // Open frame sized in order to produce a frame that will almost fill output buffer
+        conn.setHostname(stringOfLength("x", 500));
+        conn.open();
+
+        // Close the connection to generate a Close frame which will cause an overflow
+        // internally - we'll get the remaining bytes on the next interaction.
+        conn.close();
+
+        ByteBuffer buf = _transport.getOutputBuffer();
+        assertEquals("Expecting buffer to be full", smallMaxFrameSize, buf.remaining());
+        buf.position(buf.limit());
+        _transport.outputConsumed();
+
+        buf  = _transport.getOutputBuffer();
+        assertTrue("Expecting second buffer to have bytes", buf.remaining() > 0);
+        assertTrue("Expecting second buffer to not be full", buf.remaining() < Transport.MIN_MAX_FRAME_SIZE);
+    }
+}

Added: qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java?rev=1484842&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java (added)
+++ qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java Tue May 21 15:49:52 2013
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.proton.engine.impl;
+
+import static java.util.Arrays.copyOfRange;
+import static org.apache.qpid.proton.engine.impl.TransportTestHelper.assertByteArrayContentEquals;
+import static org.apache.qpid.proton.engine.impl.TransportTestHelper.assertByteBufferContentEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+public class TransportOutputAdaptorTest
+{
+    private final CannedTransportOutputWriter _transportOutputWriter = new CannedTransportOutputWriter();
+    private final TransportOutput _transportOutput = new TransportOutputAdaptor(_transportOutputWriter, 1024);
+
+    @Test
+    public void testThatOutputBufferIsReadOnly()
+    {
+        assertTrue(_transportOutput.getOutputBuffer().isReadOnly());
+    }
+
+    @Test
+    public void testGetOutputBuffer_containsCorrectBytes()
+    {
+        byte[] testBytes = "testbytes".getBytes();
+        _transportOutputWriter.setNextCannedOutput(testBytes);
+
+        final ByteBuffer outputBuffer = _transportOutput.getOutputBuffer();
+        assertEquals(testBytes.length, outputBuffer.remaining());
+
+        byte[] outputBytes = new byte[testBytes.length];
+        outputBuffer.get(outputBytes);
+        assertByteArrayContentEquals(testBytes, outputBytes);
+
+        _transportOutput.outputConsumed();
+
+        final ByteBuffer outputBuffer2 = _transportOutput.getOutputBuffer();
+        assertEquals(0, outputBuffer2.remaining());
+    }
+
+    @Test
+    public void testClientConsumesOutputInMultipleChunks()
+    {
+        byte[] testBytes = "testbytes".getBytes();
+        _transportOutputWriter.setNextCannedOutput(testBytes);
+
+        // sip the first two bytes into a small byte array
+
+        int chunk1Size = 2;
+        int chunk2Size = testBytes.length - chunk1Size;
+
+        {
+            final ByteBuffer outputBuffer1 = _transportOutput.getOutputBuffer();
+            byte[] byteArray1 = new byte[chunk1Size];
+
+            outputBuffer1.get(byteArray1);
+            assertEquals(chunk2Size, outputBuffer1.remaining());
+            assertByteArrayContentEquals(copyOfRange(testBytes, 0, chunk1Size), byteArray1);
+
+            _transportOutput .outputConsumed();
+        }
+
+        {
+            final ByteBuffer outputBuffer2 = _transportOutput.getOutputBuffer();
+            int chunk2Offset = chunk1Size;
+            assertByteBufferContentEquals(copyOfRange(testBytes, chunk2Offset, testBytes.length), outputBuffer2);
+        }
+    }
+
+    @Test
+    public void testClientConsumesOutputInMultipleChunksWithAdditionalTransportWriterOutput()
+    {
+        byte[] initialBytes = "abcd".getBytes();
+        _transportOutputWriter.setNextCannedOutput(initialBytes);
+
+        // sip the first two bytes into a small byte array
+        int chunk1Size = 2;
+        int initialRemaining = initialBytes.length - chunk1Size;
+
+        {
+            final ByteBuffer outputBuffer1 = _transportOutput.getOutputBuffer();
+            byte[] byteArray1 = new byte[chunk1Size];
+
+            outputBuffer1.get(byteArray1);
+            assertEquals(initialRemaining, outputBuffer1.remaining());
+            assertByteArrayContentEquals(copyOfRange(initialBytes, 0, chunk1Size), byteArray1);
+
+            _transportOutput .outputConsumed();
+        }
+
+        byte[] additionalBytes = "wxyz".getBytes();
+        _transportOutputWriter.setNextCannedOutput(additionalBytes);
+
+        {
+            final ByteBuffer outputBuffer2 = _transportOutput.getOutputBuffer();
+
+            byte[] expectedBytes = "cdwxyz".getBytes();
+            assertByteBufferContentEquals(expectedBytes, outputBuffer2);
+        }
+    }
+
+    private static final class CannedTransportOutputWriter implements TransportOutputWriter
+    {
+        byte[] _cannedOutput = new byte[0];
+
+        @Override
+        public void writeInto(ByteBuffer outputBuffer)
+        {
+            int bytesWritten = ByteBufferUtils.pourArrayToBuffer(_cannedOutput, 0, _cannedOutput.length, outputBuffer);
+            if(bytesWritten < _cannedOutput.length)
+            {
+                fail("Unable to write all " + _cannedOutput.length + " bytes of my canned output to the provided output buffer: " + outputBuffer);
+            }
+            _cannedOutput = new byte[0];
+        }
+
+        void setNextCannedOutput(byte[] cannedOutput)
+        {
+            _cannedOutput = cannedOutput;
+        }
+    }
+}

Added: qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/TransportTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/TransportTestHelper.java?rev=1484842&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/TransportTestHelper.java (added)
+++ qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/TransportTestHelper.java Tue May 21 15:49:52 2013
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.proton.engine.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+
+public class TransportTestHelper
+{
+    public static void assertByteArrayContentEquals(byte[] expectedBytes, byte[] actualBytes)
+    {
+        assertEquals(new String(expectedBytes), new String(actualBytes));
+    }
+
+    public static void assertByteBufferContentEquals(byte[] expectedBytes, ByteBuffer actualByteBuffer)
+    {
+        ByteBuffer myByteBuffer = actualByteBuffer.duplicate();
+        byte[] actualBytes = new byte[myByteBuffer.remaining()];
+        myByteBuffer.get(actualBytes);
+
+        assertByteArrayContentEquals(expectedBytes, actualBytes);
+    }
+
+    public static String pourBufferToString(ByteBuffer source)
+    {
+        return pourBufferToString(source, source.remaining());
+    }
+
+    public static String pourBufferToString(ByteBuffer source, int sizeRequested)
+    {
+        byte[] buf = new byte[sizeRequested];
+        int numberRead = ByteBufferUtils.pourBufferToArray(source, buf, 0, sizeRequested);
+        return new String(buf, 0, numberRead);
+    }
+
+    public static String stringOfLength(String value, int repeat)
+    {
+        StringBuilder builder = new StringBuilder();
+        for(int i = 0 ; i < repeat; i++)
+        {
+            builder.append(value);
+        }
+        return builder.toString();
+    }
+}

Modified: qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CannedTransportOutput.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CannedTransportOutput.java?rev=1484842&r1=1484841&r2=1484842&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CannedTransportOutput.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CannedTransportOutput.java Tue May 21 15:49:52 2013
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.proton.engine.impl.ssl;
 
-import static org.junit.Assert.assertNotNull;
 import java.nio.ByteBuffer;
 
 import org.apache.qpid.proton.engine.impl.TransportOutput;
@@ -39,18 +38,22 @@ public class CannedTransportOutput imple
         setOutput(output);
     }
 
+    public void setOutput(String output)
+    {
+        _cannedOutput = ByteBuffer.wrap(output.getBytes());
+    }
+
     @Override
-    public int output(byte[] destination, int offset, int size)
+    public ByteBuffer getOutputBuffer()
     {
-        assertNotNull(_cannedOutput);
-        int sizeToGet = Math.min(size, _cannedOutput.remaining());
-        _cannedOutput.get(destination, offset, sizeToGet);
-        return sizeToGet;
+        return _cannedOutput;
     }
 
-    public void setOutput(String output)
+    @Override
+    public void outputConsumed()
     {
-        _cannedOutput = ByteBuffer.wrap(output.getBytes());
+        // no-op
     }
 
+
 }

Modified: qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CapitalisingDummySslEngine.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CapitalisingDummySslEngine.java?rev=1484842&r1=1484841&r2=1484842&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CapitalisingDummySslEngine.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CapitalisingDummySslEngine.java Tue May 21 15:49:52 2013
@@ -48,9 +48,10 @@ public class CapitalisingDummySslEngine 
 
     private static final int CLEAR_CHUNK_SIZE = 2;
     private static final char CLEARTEXT_PADDING = '_';
+    private SSLException _nextException;
 
     /**
-     * Converts a_b_c_ to <-A->.  z_ is special and encodes as <> (to give us packets of different lengths).
+     * Converts a_ to <-A->.  z_ is special and encodes as <> (to give us packets of different lengths).
      * If dst is not sufficiently large ({@value #SHORT_ENCODED_CHUNK_SIZE} in our encoding), we return
      * {@link Status#BUFFER_OVERFLOW}, and the src and dst ByteBuffers are unchanged.
      */
@@ -115,6 +116,11 @@ public class CapitalisingDummySslEngine 
     public SSLEngineResult unwrap(ByteBuffer src, ByteBuffer dst)
             throws SSLException
     {
+        if(_nextException != null)
+        {
+            throw _nextException;
+        }
+
         Status resultStatus;
         final int consumed;
         final int produced;
@@ -172,7 +178,12 @@ public class CapitalisingDummySslEngine 
     }
 
     @Override
-    public int getApplicationBufferSize()
+    public int getEffectiveApplicationBufferSize()
+    {
+        return getApplicationBufferSize();
+    }
+
+    private int getApplicationBufferSize()
     {
         return CLEAR_CHUNK_SIZE;
     }
@@ -228,4 +239,9 @@ public class CapitalisingDummySslEngine 
     {
         return true;
     }
+
+    public void rejectNextEncodedPacket(SSLException nextException)
+    {
+        _nextException = nextException;
+    }
 }

Modified: qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/ssl/RememberingTransportInput.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/ssl/RememberingTransportInput.java?rev=1484842&r1=1484841&r2=1484842&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/ssl/RememberingTransportInput.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/ssl/RememberingTransportInput.java Tue May 21 15:49:52 2013
@@ -20,59 +20,59 @@
  */
 package org.apache.qpid.proton.engine.impl.ssl;
 
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.proton.engine.TransportResult;
+import org.apache.qpid.proton.engine.TransportResultFactory;
 import org.apache.qpid.proton.engine.impl.TransportInput;
 
 class RememberingTransportInput implements TransportInput
 {
     private StringBuilder _receivedInput = new StringBuilder();
-    private boolean _hasAcceptedLimit = false;
-    private int _acceptLimit = 0;
+    private ByteBuffer _buffer;
+    private TransportResult _nextErrorResult;
+    private int _inputBufferSize = 1024;
 
-    @Override
-    public int input(byte[] bytes, int offset, int size)
+    String getAcceptedInput()
     {
-        final String newInput;
-        if (!_hasAcceptedLimit)
-        {
-            newInput = new String(bytes, offset, size);
-        }
-        else
-        {
-            int currentSize = _receivedInput.length();
-            int spareCapacity = _acceptLimit - currentSize;
-            if (spareCapacity < 0)
-            {
-                throw new IllegalStateException("Could not write " + size
-                        + " bytes into buffer of size " + currentSize
-                        + " with accept limit of " + _acceptLimit + ". Test error??");
-            }
-            int effectiveSize = Math.min(size, spareCapacity);
-            newInput = new String(bytes, offset, effectiveSize);
-        }
+        return _receivedInput.toString();
+    }
 
-        _receivedInput.append(newInput);
-        return newInput.length();
+    @Override
+    public String toString()
+    {
+        return "[RememberingTransportInput receivedInput (length " + _receivedInput.length() + ") is:" + _receivedInput.toString() + "]";
     }
 
-    public void removeAcceptLimit()
+    @Override
+    public ByteBuffer getInputBuffer()
     {
-        _hasAcceptedLimit = false;
+        _buffer = ByteBuffer.allocate(_inputBufferSize);
+        return _buffer;
     }
 
-    public void setAcceptLimit(int acceptLimit)
+    @Override
+    public TransportResult processInput()
     {
-        _hasAcceptedLimit = true;
-        _acceptLimit = acceptLimit;
+        if(_nextErrorResult != null)
+        {
+            return _nextErrorResult;
+        }
+
+        _buffer.flip();
+        byte[] receivedInputBuffer = new byte[_buffer.remaining()];
+        _buffer.get(receivedInputBuffer);
+        _receivedInput.append(new String(receivedInputBuffer));
+        return TransportResultFactory.ok();
     }
 
-    String getAcceptedInput()
+    public void rejectNextInput(TransportResult nextErrorResult)
     {
-        return _receivedInput.toString();
+        _nextErrorResult = nextErrorResult;
     }
 
-    @Override
-    public String toString()
+    public void setInputBufferSize(int inputBufferSize)
     {
-        return "[RememberingTransportInput receivedInput (length " + _receivedInput.length() + ") is:" + _receivedInput.toString() + "]";
+        _inputBufferSize = inputBufferSize;
     }
 }
\ No newline at end of file

Modified: qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java?rev=1484842&r1=1484841&r2=1484842&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java Tue May 21 15:49:52 2013
@@ -19,342 +19,235 @@
  *
  */
 package org.apache.qpid.proton.engine.impl.ssl;
-
-import static org.apache.qpid.proton.engine.impl.ssl.ByteTestHelper.assertArrayUntouchedExcept;
-import static org.apache.qpid.proton.engine.impl.ssl.ByteTestHelper.createFilledBuffer;
+import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pour;
+import static org.apache.qpid.proton.engine.impl.TransportTestHelper.assertByteBufferContentEquals;
+import static org.apache.qpid.proton.engine.impl.TransportTestHelper.pourBufferToString;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLException;
+
+import org.apache.qpid.proton.engine.TransportException;
+import org.apache.qpid.proton.engine.TransportResult;
+import org.apache.qpid.proton.engine.TransportResultFactory;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 /**
  * TODO unit test handshaking
  * TODO unit test closing
+ * TODO unit test graceful handling of SSLEngine.wrap throwing an SSLException
  */
 public class SimpleSslTransportWrapperTest
 {
     private RememberingTransportInput _underlyingInput = new RememberingTransportInput();
     private CannedTransportOutput _underlyingOutput = new CannedTransportOutput();
 
-    private SimpleSslTransportWrapper _transportWrapper;
+    private SimpleSslTransportWrapper _sslWrapper;
 
     private CapitalisingDummySslEngine _dummySslEngine = new CapitalisingDummySslEngine();
 
+    @Rule
+    public ExpectedException _expectedException = ExpectedException.none();
+
     @Before
     public void setUp()
     {
-        _transportWrapper = new SimpleSslTransportWrapper(_dummySslEngine, _underlyingInput, _underlyingOutput);
+        _sslWrapper = new SimpleSslTransportWrapper(_dummySslEngine, _underlyingInput, _underlyingOutput);
     }
 
     @Test
     public void testInputDecodesOnePacket()
     {
-        byte[] encodedBytes = "<-A->".getBytes();
+        String encodedBytes = "<-A->";
+
+        putBytesIntoTransport(encodedBytes);
 
-        int numberConsumed = _transportWrapper.input(encodedBytes, 0, encodedBytes.length);
-        assertEquals(encodedBytes.length, numberConsumed);
         assertEquals("a_", _underlyingInput.getAcceptedInput());
     }
 
     @Test
     public void testInputWithMultiplePackets()
     {
-        byte[] encodedBytes = "<-A-><-B-><-C-><>".getBytes();
+        String encodedBytes = "<-A-><-B-><-C-><>";
 
-        int numberConsumed = _transportWrapper.input(encodedBytes, 0, encodedBytes.length);
-        assertEquals(encodedBytes.length, numberConsumed);
-        assertEquals("a_b_c_z_", _underlyingInput.getAcceptedInput());
-    }
-
-    @Test
-    public void testInputWithMultiplePacketsUsingNonZeroOffset()
-    {
-        byte[] encodedBytes = "<-A-><-B-><-C-><-D-><-E->".getBytes();
+        putBytesIntoTransport(encodedBytes);
 
-        // try to decode the "<-B-><-C->" portion of encodedBytes
-        int numberConsumed = _transportWrapper.input(encodedBytes, 5, 10);
-        assertEquals(10, numberConsumed);
-        assertEquals("b_c_", _underlyingInput.getAcceptedInput());
+        assertEquals("a_b_c_z_", _underlyingInput.getAcceptedInput());
     }
 
     @Test
-    public void testInsufficientInputBufferUnderflow()
+    public void testInputIncompletePacket_isNotPassedToUnderlyingInputUntilCompleted()
     {
-        byte[] incompleteEncodedBytes = "<-A-><-B-><-C".getBytes(); // missing the trailing '>' to cause the underflow
-        byte[] remainingEncodedBytes = "-><-D->".getBytes();
+        String incompleteEncodedBytes = "<-A-><-B-><-C"; // missing the trailing '>' to cause the underflow
+        String remainingEncodedBytes = "-><-D->";
 
-        int numberConsumed = _transportWrapper.input(incompleteEncodedBytes, 0, incompleteEncodedBytes.length);
-        assertEquals(incompleteEncodedBytes.length, numberConsumed);
+        putBytesIntoTransport(incompleteEncodedBytes);
         assertEquals("a_b_", _underlyingInput.getAcceptedInput());
 
-        numberConsumed = _transportWrapper.input(remainingEncodedBytes, 0, remainingEncodedBytes.length);
-        assertEquals(remainingEncodedBytes.length, numberConsumed);
+        putBytesIntoTransport(remainingEncodedBytes);
         assertEquals("a_b_c_d_", _underlyingInput.getAcceptedInput());
     }
 
+    /**
+     * As per {@link #testInputIncompletePacket_isNotPassedToUnderlyingInputUntilCompleted()}
+     * but this time it takes TWO chunks to complete the "dangling" packet.
+     */
     @Test
-    public void testInsufficientInputBufferUnderflowThreePart()
+    public void testInputIncompletePacketInThreeParts()
     {
-        byte[] firstEncodedBytes = "<-A-><-B-><-".getBytes();
-        byte[] secondEncodedBytes = "C".getBytes(); // Sending this causes the impl to have to hold the data without producing more input yet
-        byte[] thirdEncodedBytes = "-><-D->".getBytes();
+        String firstEncodedBytes = "<-A-><-B-><-";
+        String secondEncodedBytes = "C"; // Sending this causes the impl to have to hold the data without producing more input yet
+        String thirdEncodedBytes = "-><-D->";
 
-        int numberConsumed = _transportWrapper.input(firstEncodedBytes, 0, firstEncodedBytes.length);
-        assertEquals(firstEncodedBytes.length, numberConsumed);
+        putBytesIntoTransport(firstEncodedBytes);
         assertEquals("a_b_", _underlyingInput.getAcceptedInput());
 
-        numberConsumed = _transportWrapper.input(secondEncodedBytes, 0, secondEncodedBytes.length);
-        assertEquals(secondEncodedBytes.length, numberConsumed);
+        putBytesIntoTransport(secondEncodedBytes);
         assertEquals("a_b_", _underlyingInput.getAcceptedInput());
 
-        numberConsumed = _transportWrapper.input(thirdEncodedBytes, 0, thirdEncodedBytes.length);
-        assertEquals(thirdEncodedBytes.length, numberConsumed);
+        putBytesIntoTransport(thirdEncodedBytes);
         assertEquals("a_b_c_d_", _underlyingInput.getAcceptedInput());
     }
 
     @Test
-    public void testUnderlyingInputBecomesTemporarilyFull()
+    public void testUnderlyingInputUsingSmallBuffer_receivesAllDecodedInput() throws Exception
     {
-        _underlyingInput.setAcceptLimit(3);
+        _underlyingInput.setInputBufferSize(1);
 
-        byte[] encodedBytes = "<-A-><-B-><-C->".getBytes();
+        putBytesIntoTransport("<-A->");
 
-        // We consume encoded A and B, but due to the limit underlying input accepts
-        // only a part of b's decoded packet.
-        int firstNumberConsumed = _transportWrapper.input(encodedBytes, 0, encodedBytes.length);
-        assertEquals(10, firstNumberConsumed);
-        assertEquals("a_b", _underlyingInput.getAcceptedInput());
-
-        _underlyingInput.removeAcceptLimit();
-
-        // Send the remaining encoded data
-        int remainingOffset = firstNumberConsumed;
-        int remainingSize = encodedBytes.length - firstNumberConsumed;
-        int secondNumberConsumed = _transportWrapper.input(encodedBytes, remainingOffset, remainingSize);
-        assertEquals(5, secondNumberConsumed);
-        assertEquals("a_b_c_", _underlyingInput.getAcceptedInput());
+        assertEquals("a_", _underlyingInput.getAcceptedInput());
     }
 
-    /**
-     * @see #testUnderlyingInputBecomesTemporarilyFull()
-     */
     @Test
-    public void testUnderlyingInputBecomesPermanentlyFull()
+    public void testSslUnwrapThrowsException_returnsErrorResultAndRefusesFurtherInput() throws Exception
     {
-        _underlyingInput.setAcceptLimit(3);
+        SSLException sslException = new SSLException("unwrap exception");
+        _dummySslEngine.rejectNextEncodedPacket(sslException);
 
-        byte[] encodedBytes = "<-A-><-B-><-C->".getBytes();
-
-        int firstNumberConsumed = _transportWrapper.input(encodedBytes, 0, encodedBytes.length);
-        assertEquals(10, firstNumberConsumed);
-        assertEquals("a_b", _underlyingInput.getAcceptedInput());
+        _sslWrapper.getInputBuffer().put("<-A->".getBytes());
+        TransportResult result = _sslWrapper.processInput();
+        assertEquals(TransportResult.Status.ERROR, result.getStatus());
+        assertSame(sslException, result.getException().getCause());
+        assertEquals("", _underlyingInput.getAcceptedInput());
 
-        // Send the remaining encoded data
-        int remainingOffset = firstNumberConsumed;
-        int remainingSize = encodedBytes.length - firstNumberConsumed;
-        int secondNumberConsumed = _transportWrapper.input(encodedBytes, remainingOffset, remainingSize);
-        assertEquals(0, secondNumberConsumed);
-        assertEquals("a_b", _underlyingInput.getAcceptedInput());
+        _expectedException.expect(TransportException.class);
+        _sslWrapper.getInputBuffer();
     }
 
-    public void testUnderlyingInputPartiallyAcceptsLeftovers()
+    @Test
+    public void testUnderlyingInputReturnsErrorResult_returnsErrorResultAndRefusesFurtherInput() throws Exception
     {
-        byte[] encodedBytes = "<A><B><C>".getBytes();
+        String underlyingErrorDescription = "dummy underlying error";
+        TransportResult underlyingErrorResult = TransportResultFactory.error(underlyingErrorDescription);
+        _underlyingInput.rejectNextInput(underlyingErrorResult);
 
-        _underlyingInput.setAcceptLimit(0);
+        _sslWrapper.getInputBuffer().put("<-A->".getBytes());
 
-        int firstNumberConsumed = _transportWrapper.input(encodedBytes, 0, encodedBytes.length);
-        assertEquals(3, firstNumberConsumed);
-        assertEquals("", _underlyingInput.getAcceptedInput());
+        TransportResult result = _sslWrapper.processInput();
 
-        // Set underlying input to accept *part* of the decoded leftovers, then try to send the remaining encoded data
-        _underlyingInput.setAcceptLimit(1);
-        int offsetAfterFirstAttempt = firstNumberConsumed;
-        int sizeAfterFirstAttempt = encodedBytes.length - firstNumberConsumed;
-        int secondNumberConsumed = _transportWrapper.input(encodedBytes, offsetAfterFirstAttempt, sizeAfterFirstAttempt);
-        assertEquals(0, secondNumberConsumed);
-        assertEquals("a", _underlyingInput.getAcceptedInput());
-
-        // Remove the limit and send the remaining data.
-        _underlyingInput.removeAcceptLimit();
-        // offset and size unchanged because second attempt consumed no bytes
-        int thirdNumberConsumed = _transportWrapper.input(encodedBytes, offsetAfterFirstAttempt, sizeAfterFirstAttempt);
-        assertEquals(6, thirdNumberConsumed);
-        assertEquals("a_b_c_", _underlyingInput.getAcceptedInput());
+        assertEquals(TransportResult.Status.ERROR, result.getStatus());
+        assertEquals(underlyingErrorDescription, result.getErrorDescription());
     }
 
     @Test
-    public void testOutputEncodesOnePacket()
+    public void testGetOutputBufferIsReadOnly()
     {
-        byte[] encodedBytes = createFilledBuffer(10);
-        String expectedOutputProduced = "<-A->";
-
-        _underlyingOutput.setOutput("a_");
-
-        int numberProduced = _transportWrapper.output(encodedBytes, 0, encodedBytes.length);
-        assertEquals(expectedOutputProduced.length(), numberProduced);
-        assertArrayUntouchedExcept(expectedOutputProduced, encodedBytes);
+        _underlyingOutput.setOutput("");
+        assertTrue(_sslWrapper.getOutputBuffer().isReadOnly());
     }
 
     @Test
-    public void testOutputEncodesOnePacketUsingNonZeroOffset()
+    public void testOutputEncodesOnePacket()
     {
-        byte[] encodedBytes = createFilledBuffer(10);
-        String expectedOutputProduced = "<-A->";
-
         _underlyingOutput.setOutput("a_");
 
-        int numberProduced = _transportWrapper.output(encodedBytes, 1, 5);
-        assertEquals(expectedOutputProduced.length(), numberProduced);
-        assertArrayUntouchedExcept(expectedOutputProduced, encodedBytes, 1);
+        ByteBuffer outputBuffer = _sslWrapper.getOutputBuffer();
+
+        assertByteBufferContentEquals("<-A->".getBytes(), outputBuffer);
     }
 
     @Test
-    public void testOutputUsingSmallBuffers()
+    public void testOutputEncodesMultiplePackets()
     {
-        String expectedOutputProducedFirstAttempt = "<-";
-        String expectedOutputProducedSecondAttempt = "A";
-        String expectedOutputProducedThirdAttempt = "->";
-
-        String expectedOutputProducedLastAttempt = "<-B->";
+        _underlyingOutput.setOutput("a_b_c_");
 
-        _underlyingOutput.setOutput("a_");
-        {
-            byte[] encodedBytesForFirstAttempt = createFilledBuffer(2);
-            int numberProducedFirstAttempt = _transportWrapper.output(encodedBytesForFirstAttempt, 0, encodedBytesForFirstAttempt.length);
-            assertEquals(expectedOutputProducedFirstAttempt.length(), numberProducedFirstAttempt);
-            assertArrayUntouchedExcept(expectedOutputProducedFirstAttempt, encodedBytesForFirstAttempt);
-        }
-
-        {
-            byte[] encodedBytesForSecondAttempt = createFilledBuffer(1);
-            int numberProducedSecondAttempt = _transportWrapper.output(encodedBytesForSecondAttempt, 0, encodedBytesForSecondAttempt.length);
-            assertEquals(expectedOutputProducedSecondAttempt.length(), numberProducedSecondAttempt);
-            assertArrayUntouchedExcept(expectedOutputProducedSecondAttempt, encodedBytesForSecondAttempt);
-        }
-
-        {
-            byte[] encodedBytesForThirdAttempt = createFilledBuffer(2);
-            int numberProducedThirdAttempt = _transportWrapper.output(encodedBytesForThirdAttempt, 0, encodedBytesForThirdAttempt.length);
-            assertEquals(expectedOutputProducedThirdAttempt.length(), numberProducedThirdAttempt);
-            assertArrayUntouchedExcept(expectedOutputProducedThirdAttempt, encodedBytesForThirdAttempt);
-        }
-
-        _underlyingOutput.setOutput("b_");
-        {
-            byte[] encodedBytesForLastAttempt = createFilledBuffer(10);
-            int numberProducedLastAttempt = _transportWrapper.output(encodedBytesForLastAttempt, 0, encodedBytesForLastAttempt.length);
-            assertEquals(expectedOutputProducedLastAttempt.length(), numberProducedLastAttempt);
-            assertArrayUntouchedExcept(expectedOutputProducedLastAttempt, encodedBytesForLastAttempt);
-        }
+        assertEquals("<-A-><-B-><-C->", getAllBytesFromTransport());
     }
 
     @Test
-    public void testOutputUsingSmallBuffersAndDecodingMoreBytesAlongTheWay()
+    public void testOutputEncodesMultiplePacketsOfVaryingSize()
     {
-        String expectedOutputProducedFirstAttempt = "<-";
-        String expectedOutputProducedSecondAttempt = "A-";
-
-        String expectedOutputProducedThirdAttempt = "><-B->";
-        String expectedOutputProducedLastAttempt = "<-C->";
+        _underlyingOutput.setOutput("z_a_b_");
 
+        assertEquals("<><-A-><-B->", getAllBytesFromTransport());
+    }
 
-        _underlyingOutput.setOutput("a_");
-        {
-            byte[] encodedBytesForFirstAttempt = createFilledBuffer(2);
-            int numberProducedFirstAttempt = _transportWrapper.output(encodedBytesForFirstAttempt, 0, encodedBytesForFirstAttempt.length);
-            assertEquals(expectedOutputProducedFirstAttempt.length(), numberProducedFirstAttempt);
-            assertArrayUntouchedExcept(expectedOutputProducedFirstAttempt, encodedBytesForFirstAttempt);
-        }
+    @Test
+    public void testClientConsumesEncodedOutputInMultipleChunks()
+    {
+        _underlyingOutput.setOutput("a_b_");
 
         {
-            byte[] encodedBytesForSecondAttempt = createFilledBuffer(2);
-            int numberProducedSecondAttempt = _transportWrapper.output(encodedBytesForSecondAttempt, 0, encodedBytesForSecondAttempt.length);
-            assertEquals(expectedOutputProducedSecondAttempt.length(), numberProducedSecondAttempt);
-            assertArrayUntouchedExcept(expectedOutputProducedSecondAttempt, encodedBytesForSecondAttempt);
+            ByteBuffer buffer = _sslWrapper.getOutputBuffer();
+            String output = pourBufferToString(buffer, 2);
+            assertEquals("<-", output);
+            _sslWrapper.outputConsumed();
         }
 
         {
-            // now get some output into a roomy buffer, which should get the left-over ">" plus some new stuff
-            _underlyingOutput.setOutput("b_");
-            byte[] encodedBytesForThirdAttempt = createFilledBuffer(10);
-            int numberProducedThirdAttempt = _transportWrapper.output(encodedBytesForThirdAttempt, 0, encodedBytesForThirdAttempt.length);
-            assertEquals(expectedOutputProducedThirdAttempt.length(), numberProducedThirdAttempt);
-            assertArrayUntouchedExcept(expectedOutputProducedThirdAttempt, encodedBytesForThirdAttempt);
+            ByteBuffer buffer = _sslWrapper.getOutputBuffer();
+            String output = pourBufferToString(buffer, 3);
+            assertEquals("A->", output);
+            _sslWrapper.outputConsumed();
         }
 
-        _underlyingOutput.setOutput("c_");
-        {
-            byte[] encodedBytesForLastAttempt = createFilledBuffer(10);
-            int numberProducedLastAttempt = _transportWrapper.output(encodedBytesForLastAttempt, 0, encodedBytesForLastAttempt.length);
-            assertEquals(expectedOutputProducedLastAttempt.length(), numberProducedLastAttempt);
-            assertArrayUntouchedExcept(expectedOutputProducedLastAttempt, encodedBytesForLastAttempt);
-        }
+        assertEquals("<-B->", getAllBytesFromTransport());
     }
 
     @Test
-    public void testOutputEncodesMultiplePackets()
+    public void testNoOutputToEncode()
     {
-        byte[] encodedBytes = createFilledBuffer(17);
-        String expectedOutputProduced = "<-A-><-B-><-C-><>";
-
-        _underlyingOutput.setOutput("a_b_c_z_");
+        _underlyingOutput.setOutput("");
 
-        int numberProduced = _transportWrapper.output(encodedBytes, 0, encodedBytes.length);
-        assertEquals(expectedOutputProduced.length(), numberProduced);
-        assertArrayUntouchedExcept(expectedOutputProduced, encodedBytes);
+        assertFalse(_sslWrapper.getOutputBuffer().hasRemaining());
     }
 
-    @Test
-    public void testOutputWritesShortEncodedPacketIntoBufferThatIsLessThanMaximumPacketSize()
+    private void putBytesIntoTransport(String encodedBytes)
     {
-        int bufferLength = _dummySslEngine.getPacketBufferSize() - 1;
-        assertTrue(bufferLength > CapitalisingDummySslEngine.SHORT_ENCODED_CHUNK_SIZE);
-
-        _underlyingOutput.setOutput("z_");
-
+        ByteBuffer byteBuffer = ByteBuffer.wrap(encodedBytes.getBytes());
+        while(byteBuffer.hasRemaining())
         {
-            byte[] encodedBytes = createFilledBuffer(bufferLength); // smaller than a packet but large enough to receive encoded bytes for z_
-            String expectedOutputProduced = "<>";
-            int numberProduced = _transportWrapper.output(encodedBytes, 0, encodedBytes.length);
-            assertEquals(expectedOutputProduced.length(), numberProduced);
-            assertArrayUntouchedExcept(expectedOutputProduced, encodedBytes);
+            int numberPoured = pour(byteBuffer, _sslWrapper.getInputBuffer());
+            assertTrue("We should be able to pour some bytes into the input buffer",
+                    numberPoured > 0);
+            _sslWrapper.processInput().checkIsOk();
         }
     }
 
-    @Test
-    public void testOutputEncodesMultiplePacketsWithInitialBufferTooSmallForAllOfSecondPacket()
+    private String getAllBytesFromTransport()
     {
-        String expectedOutputProducedFirstAttempt = "<-A-><";
-        String expectedOutputProducedSecondAttempt = "><-C->";
-
-        _underlyingOutput.setOutput("a_z_c_");
-
-        {
-            byte[] encodedBytesFirstAttempt = createFilledBuffer(6);
-            int numberProducedFirstAttempt = _transportWrapper.output(encodedBytesFirstAttempt, 0, encodedBytesFirstAttempt.length);
-            assertEquals(expectedOutputProducedFirstAttempt.length(), numberProducedFirstAttempt);
-            assertArrayUntouchedExcept(expectedOutputProducedFirstAttempt, encodedBytesFirstAttempt);
-        }
+        StringBuilder readBytes = new StringBuilder();
+        boolean continueLooping;
+        do
         {
-            byte[] encodedBytesSecondAttempt = createFilledBuffer(6);
-            int numberProducedSecondAttempt = _transportWrapper.output(encodedBytesSecondAttempt, 0, encodedBytesSecondAttempt.length);
-            assertEquals(expectedOutputProducedSecondAttempt.length(), numberProducedSecondAttempt);
-            assertArrayUntouchedExcept(expectedOutputProducedSecondAttempt, encodedBytesSecondAttempt);
-        }
-    }
+            ByteBuffer buffer = _sslWrapper.getOutputBuffer();
+            continueLooping = buffer.hasRemaining();
 
-    @Test
-    public void testNoOutputToEncode()
-    {
-        byte[] encodedBytes = createFilledBuffer(10);
-        String expectedOutputProduced = "";
+            readBytes.append(pourBufferToString(buffer));
 
-        _underlyingOutput.setOutput("");
+            _sslWrapper.outputConsumed();
+        }
+        while(continueLooping);
 
-        int numberProduced = _transportWrapper.output(encodedBytes, 0, encodedBytes.length);
-        assertEquals(expectedOutputProduced.length(), numberProduced);
-        assertArrayUntouchedExcept(expectedOutputProduced, encodedBytes);
+        return readBytes.toString();
     }
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message