activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: Clean up a bit, remove commented out code from other transports.
Date Wed, 28 May 2014 19:27:50 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk b5c6c1eae -> f2653e693


Clean up a bit, remove commented out code from other transports.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f2653e69
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f2653e69
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f2653e69

Branch: refs/heads/trunk
Commit: f2653e69362cef9de49e6b0a5a248c7ad0602b6b
Parents: b5c6c1e
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed May 28 15:27:34 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Wed May 28 15:27:34 2014 -0400

----------------------------------------------------------------------
 .../amqp/AMQPProtocolDiscriminator.java         | 16 ++--
 .../transport/amqp/AMQPSslTransportFactory.java | 16 ----
 .../transport/amqp/ActiveMQJMSVendor.java       | 22 ++---
 .../activemq/transport/amqp/AmqpHeader.java     | 29 ++++---
 .../transport/amqp/AmqpNioSslTransport.java     | 12 +--
 .../amqp/AmqpNioSslTransportFactory.java        |  4 +-
 .../transport/amqp/AmqpNioTransport.java        | 28 ++++---
 .../transport/amqp/AmqpNioTransportFactory.java | 39 ++++-----
 .../transport/amqp/AmqpNioTransportHelper.java  | 50 ++++++-----
 .../transport/amqp/AmqpProtocolConverter.java   | 88 +++++++++++---------
 .../transport/amqp/AmqpProtocolException.java   |  1 -
 .../activemq/transport/amqp/AmqpSupport.java    | 12 ++-
 .../activemq/transport/amqp/AmqpTransport.java  |  7 +-
 .../transport/amqp/AmqpTransportFactory.java    | 28 ++-----
 .../transport/amqp/AmqpTransportFilter.java     | 33 ++++----
 .../transport/amqp/AmqpWireFormatFactory.java   |  2 +
 .../transport/amqp/ResponseHandler.java         |  3 +-
 17 files changed, 189 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
index 8a03d09..bf0c655 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
@@ -16,11 +16,11 @@
  */
 package org.apache.activemq.transport.amqp;
 
-import org.apache.activemq.command.Command;
-
 import java.io.IOException;
 import java.util.ArrayList;
 
+import org.apache.activemq.command.Command;
+
 /**
  * Used to assign the best implementation of a AmqpProtocolConverter to the
  * AmqpTransport based on the AmqpHeader that the client sends us.
@@ -31,12 +31,13 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
 
     interface Discriminator {
         boolean matches(AmqpHeader header);
+
         IAmqpProtocolConverter create(AmqpTransport transport);
     }
 
     static final private ArrayList<Discriminator> DISCRIMINATORS = new ArrayList<Discriminator>();
     static {
-        DISCRIMINATORS.add(new Discriminator(){
+        DISCRIMINATORS.add(new Discriminator() {
 
             @Override
             public IAmqpProtocolConverter create(AmqpTransport transport) {
@@ -45,11 +46,12 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
 
             @Override
             public boolean matches(AmqpHeader header) {
-                switch( header.getProtocolId() ) {
+                switch (header.getProtocolId()) {
                     case 0:
                     case 3:
-                        if( header.getMajor() == 1 && header.getMinor()==0 && header.getRevision()==0 )
+                        if (header.getMajor() == 1 && header.getMinor() == 0 && header.getRevision() == 0) {
                             return true;
+                        }
                 }
                 return false;
             }
@@ -70,12 +72,12 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
 
             Discriminator match = null;
             for (Discriminator discriminator : DISCRIMINATORS) {
-                if( discriminator.matches(header) ) {
+                if (discriminator.matches(header)) {
                     match = discriminator;
                 }
             }
             // Lets use first in the list if none are a good match.
-            if( match == null ) {
+            if (match == null) {
                 match = DISCRIMINATORS.get(0);
             }
             IAmqpProtocolConverter next = match.create(transport);

http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
index 0612fd9..4c036a9 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
@@ -58,11 +58,6 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements Brok
             transport = ((MutexTransport) transport).getNext();
         }
 
-        // MutexTransport mutex = transport.narrow(MutexTransport.class);
-        // if (mutex != null) {
-        // mutex.setSyncOnCommand(true);
-        // }
-
         return transport;
     }
 
@@ -71,17 +66,6 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements Brok
         this.brokerContext = brokerService.getBrokerContext();
     }
 
-    // protected Transport createInactivityMonitor(Transport transport,
-    // WireFormat format) {
-    // AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport,
-    // format);
-    //
-    // AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
-    // filter.setInactivityMonitor(monitor);
-    //
-    // return monitor;
-    // }
-
     @Override
     protected boolean isUseInactivityMonitor(Transport transport) {
         return false;

http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
index bb81ca4..8a6137c 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
@@ -42,13 +42,13 @@ import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.qpid.proton.jms.JMSVendor;
 
 /**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public class ActiveMQJMSVendor extends JMSVendor {
 
     final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor();
 
-    private ActiveMQJMSVendor() {}
+    private ActiveMQJMSVendor() {
+    }
 
     @Override
     public BytesMessage createBytesMessage() {
@@ -87,16 +87,16 @@ public class ActiveMQJMSVendor extends JMSVendor {
 
     @Override
     public <T extends Destination> T createDestination(String name, Class<T> kind) {
-        if( kind == Queue.class ) {
+        if (kind == Queue.class) {
             return kind.cast(new ActiveMQQueue(name));
         }
-        if( kind == Topic.class ) {
+        if (kind == Topic.class) {
             return kind.cast(new ActiveMQTopic(name));
         }
-        if( kind == TemporaryQueue.class ) {
+        if (kind == TemporaryQueue.class) {
             return kind.cast(new ActiveMQTempQueue(name));
         }
-        if( kind == TemporaryTopic.class ) {
+        if (kind == TemporaryTopic.class) {
             return kind.cast(new ActiveMQTempTopic(name));
         }
         return kind.cast(ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE));
@@ -104,26 +104,26 @@ public class ActiveMQJMSVendor extends JMSVendor {
 
     @Override
     public void setJMSXUserID(Message msg, String value) {
-        ((ActiveMQMessage)msg).setUserID(value);
+        ((ActiveMQMessage) msg).setUserID(value);
     }
 
     @Override
     public void setJMSXGroupID(Message msg, String value) {
-        ((ActiveMQMessage)msg).setGroupID(value);
+        ((ActiveMQMessage) msg).setGroupID(value);
     }
 
     @Override
     public void setJMSXGroupSequence(Message msg, int value) {
-        ((ActiveMQMessage)msg).setGroupSequence(value);
+        ((ActiveMQMessage) msg).setGroupSequence(value);
     }
 
     @Override
     public void setJMSXDeliveryCount(Message msg, long value) {
-        ((ActiveMQMessage)msg).setRedeliveryCounter((int) value);
+        ((ActiveMQMessage) msg).setRedeliveryCounter((int) value);
     }
 
     @Override
     public String toAddress(Destination dest) {
-        return ((ActiveMQDestination)dest).getQualifiedName();
+        return ((ActiveMQDestination) dest).getQualifiedName();
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java
index 07d0222..aaf5944 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpHeader.java
@@ -19,65 +19,64 @@ package org.apache.activemq.transport.amqp;
 import org.fusesource.hawtbuf.Buffer;
 
 /**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public class AmqpHeader {
 
-    static final Buffer PREFIX = new Buffer(new byte[]{
-      'A', 'M', 'Q', 'P'
-    });
+    static final Buffer PREFIX = new Buffer(new byte[] { 'A', 'M', 'Q', 'P' });
 
     private Buffer buffer;
 
-    public AmqpHeader(){
-        this(new Buffer(new byte[]{
-          'A', 'M', 'Q', 'P', 0, 1, 0, 0
-        }));
+    public AmqpHeader() {
+        this(new Buffer(new byte[] { 'A', 'M', 'Q', 'P', 0, 1, 0, 0 }));
     }
 
-    public AmqpHeader(Buffer buffer){
+    public AmqpHeader(Buffer buffer) {
         setBuffer(buffer);
     }
 
     public int getProtocolId() {
         return buffer.get(4) & 0xFF;
     }
+
     public void setProtocolId(int value) {
-        buffer.data[buffer.offset+4] = (byte) value;
+        buffer.data[buffer.offset + 4] = (byte) value;
     }
 
     public int getMajor() {
         return buffer.get(5) & 0xFF;
     }
+
     public void setMajor(int value) {
-        buffer.data[buffer.offset+5] = (byte) value;
+        buffer.data[buffer.offset + 5] = (byte) value;
     }
 
     public int getMinor() {
         return buffer.get(6) & 0xFF;
     }
+
     public void setMinor(int value) {
-        buffer.data[buffer.offset+6] = (byte) value;
+        buffer.data[buffer.offset + 6] = (byte) value;
     }
 
     public int getRevision() {
         return buffer.get(7) & 0xFF;
     }
+
     public void setRevision(int value) {
-        buffer.data[buffer.offset+7] = (byte) value;
+        buffer.data[buffer.offset + 7] = (byte) value;
     }
 
     public Buffer getBuffer() {
         return buffer;
     }
+
     public void setBuffer(Buffer value) {
-        if( !value.startsWith(PREFIX) || value.length()!=8 ) {
+        if (!value.startsWith(PREFIX) || value.length() != 8) {
             throw new IllegalArgumentException("Not an AMQP header buffer");
         }
         buffer = value.buffer();
     }
 
-
     @Override
     public String toString() {
         return buffer.toString();

http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
index c569f05..4eb0e6f 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
@@ -16,18 +16,20 @@
  */
 package org.apache.activemq.transport.amqp;
 
-import org.apache.activemq.transport.nio.NIOSSLTransport;
-import org.apache.activemq.wireformat.WireFormat;
-
-import javax.net.SocketFactory;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
+import javax.net.SocketFactory;
+
+import org.apache.activemq.transport.nio.NIOSSLTransport;
+import org.apache.activemq.wireformat.WireFormat;
+
 public class AmqpNioSslTransport extends NIOSSLTransport {
-    private AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this);
+
+    private final AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this);
 
     public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
         super(wireFormat, socketFactory, remoteLocation, localLocation);

http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java
index 2306cc5..5e2fa06 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java
@@ -35,11 +35,12 @@ import org.apache.activemq.wireformat.WireFormat;
 
 public class AmqpNioSslTransportFactory extends AmqpNioTransportFactory {
 
-    SSLContext context;
+    protected SSLContext context;
 
     @Override
     protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
         return new TcpTransportServer(this, location, serverSocketFactory) {
+            @Override
             protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
                 AmqpNioSslTransport transport = new AmqpNioSslTransport(format, socket);
                 if (context != null) {
@@ -71,5 +72,4 @@ public class AmqpNioSslTransportFactory extends AmqpNioTransportFactory {
         }
         return super.doBind(location);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
index ee2694c..ff58404 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
@@ -16,17 +16,6 @@
  */
 package org.apache.activemq.transport.amqp;
 
-import org.apache.activemq.transport.nio.NIOOutputStream;
-import org.apache.activemq.transport.nio.SelectorManager;
-import org.apache.activemq.transport.nio.SelectorSelection;
-import org.apache.activemq.transport.tcp.TcpTransport;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.wireformat.WireFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.SocketFactory;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
@@ -37,14 +26,28 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 
+import javax.net.SocketFactory;
+
+import org.apache.activemq.transport.nio.NIOOutputStream;
+import org.apache.activemq.transport.nio.SelectorManager;
+import org.apache.activemq.transport.nio.SelectorSelection;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO
  */
 public class AmqpNioTransport extends TcpTransport {
+
     private static final Logger LOG = LoggerFactory.getLogger(AmqpNioTransport.class);
+
     private SocketChannel channel;
     private SelectorSelection selection;
-    private AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this);
+    private final AmqpNioTransportHelper amqpNioTransportHelper = new AmqpNioTransportHelper(this);
 
     private ByteBuffer inputBuffer;
 
@@ -71,6 +74,7 @@ public class AmqpNioTransport extends TcpTransport {
 
             @Override
             public void onError(SelectorSelection selection, Throwable error) {
+                LOG.trace("Error detected: {}", error.getMessage());
                 if (error instanceof IOException) {
                     onException((IOException) error);
                 } else {

http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
index fde7a16..c67d3b6 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
@@ -16,6 +16,17 @@
  */
 package org.apache.activemq.transport.amqp;
 
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
@@ -27,16 +38,6 @@ import org.apache.activemq.transport.tcp.TcpTransportServer;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.wireformat.WireFormat;
 
-import javax.net.ServerSocketFactory;
-import javax.net.SocketFactory;
-import java.io.IOException;
-import java.net.Socket;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * A <a href="http://amqp.org/">AMQP</a> over NIO transport factory
  */
@@ -44,18 +45,22 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok
 
     private BrokerContext brokerContext = null;
 
+    @Override
     protected String getDefaultWireFormatType() {
         return "amqp";
     }
 
+    @Override
     protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
         return new TcpTransportServer(this, location, serverSocketFactory) {
+            @Override
             protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
                 return new AmqpNioTransport(format, socket);
             }
         };
     }
 
+    @Override
     protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
         return new AmqpNioTransport(wf, socketFactory, location, localLocation);
     }
@@ -70,14 +75,10 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok
             transport = ((MutexTransport)transport).getNext();
         }
 
-//        MutexTransport mutex = transport.narrow(MutexTransport.class);
-//        if (mutex != null) {
-//            mutex.setSyncOnCommand(true);
-//        }
-
         return transport;
     }
 
+    @Override
     @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
         transport = new AmqpTransportFilter(transport, format, brokerContext);
@@ -85,17 +86,11 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok
         return super.compositeConfigure(transport, format, options);
     }
 
+    @Override
     public void setBrokerService(BrokerService brokerService) {
         this.brokerContext = brokerService.getBrokerContext();
     }
 
-//    protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
-//        AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
-//        AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
-//        filter.setInactivityMonitor(monitor);
-//        return monitor;
-//    }
-
     @Override
     protected boolean isUseInactivityMonitor(Transport transport) {
         return false;

http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java
index 076fff4..021c289 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportHelper.java
@@ -16,24 +16,25 @@
  */
 package org.apache.activemq.transport.amqp;
 
-import org.apache.activemq.transport.TransportSupport;
-import org.fusesource.hawtbuf.Buffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.activemq.transport.TransportSupport;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class AmqpNioTransportHelper {
-    private final DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new byte[]{'A', 'M', 'Q', 'P'}));
+
+    private final DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new byte[] { 'A', 'M', 'Q', 'P' }));
     private final Integer AMQP_HEADER_VALUE;
     private static final Logger LOG = LoggerFactory.getLogger(AmqpNioTransportHelper.class);
     protected int nextFrameSize = -1;
     protected ByteBuffer currentBuffer;
     private boolean magicConsumed = false;
-    private TransportSupport transportSupport;
+    private final TransportSupport transportSupport;
 
     public AmqpNioTransportHelper(TransportSupport transportSupport) throws IOException {
         AMQP_HEADER_VALUE = amqpHeaderValue.readInt();
@@ -41,10 +42,11 @@ public class AmqpNioTransportHelper {
     }
 
     protected void processCommand(ByteBuffer plain) throws Exception {
-        // Are we waiting for the next Command or building on the current one?  The frame size is in the first 4 bytes.
+        // Are we waiting for the next Command or building on the current one?
+        // The frame size is in the first 4 bytes.
         if (nextFrameSize == -1) {
-            // We can get small packets that don't give us enough for the frame size
-            // so allocate enough for the initial size value and
+            // We can get small packets that don't give us enough for the frame
+            // size so allocate enough for the initial size value and
             if (plain.remaining() < 4) {
                 if (currentBuffer == null) {
                     currentBuffer = ByteBuffer.allocate(4);
@@ -63,10 +65,11 @@ public class AmqpNioTransportHelper {
                     nextFrameSize = currentBuffer.getInt();
                 }
             } else {
-                // Either we are completing a previous read of the next frame size or its
-                // fully contained in plain already.
+                // Either we are completing a previous read of the next frame
+                // size or its fully contained in plain already.
                 if (currentBuffer != null) {
-                    // Finish the frame size integer read and get from the current buffer.
+                    // Finish the frame size integer read and get from the
+                    // current buffer.
                     while (currentBuffer.hasRemaining()) {
                         currentBuffer.put(plain.get());
                     }
@@ -79,8 +82,8 @@ public class AmqpNioTransportHelper {
             }
         }
 
-        // There are three possibilities when we get here.  We could have a partial frame,
-        // a full frame, or more than 1 frame
+        // There are three possibilities when we get here. We could have a
+        // partial frame, a full frame, or more than 1 frame
         while (true) {
             // handle headers, which start with 'A','M','Q','P' rather than size
             if (nextFrameSize == AMQP_HEADER_VALUE) {
@@ -91,8 +94,10 @@ public class AmqpNioTransportHelper {
             }
             validateFrameSize(nextFrameSize);
 
-            // now we have the data, let's reallocate and try to fill it,  (currentBuffer.putInt() is called      TODO update
-            // because we need to put back the 4 bytes we read to determine the size)
+            // now we have the data, let's reallocate and try to fill it,
+            // (currentBuffer.putInt() is called TODO update
+            // because we need to put back the 4 bytes we read to determine the
+            // size)
             if (currentBuffer == null || (currentBuffer.limit() == 4)) {
                 currentBuffer = ByteBuffer.allocate(nextFrameSize);
                 currentBuffer.putInt(nextFrameSize);
@@ -106,8 +111,9 @@ public class AmqpNioTransportHelper {
                 currentBuffer.put(fill);
             }
 
-            // Either we have enough data for a new command or we have to wait for some more.  If hasRemaining is true,
-            // we have not filled the buffer yet, i.e. we haven't received the full frame.
+            // Either we have enough data for a new command or we have to wait for some more.
+            // If hasRemaining is true, we have not filled the buffer yet, i.e. we haven't
+            // received the full frame.
             if (currentBuffer.hasRemaining()) {
                 return;
             } else {
@@ -137,8 +143,7 @@ public class AmqpNioTransportHelper {
 
     private void validateFrameSize(int frameSize) throws IOException {
         if (nextFrameSize > AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE) {
-            throw new IOException("Frame size of " + nextFrameSize +
-                    "larger than max allowed " + AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE);
+            throw new IOException("Frame size of " + nextFrameSize + "larger than max allowed " + AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE);
         }
     }
 
@@ -152,7 +157,7 @@ public class AmqpNioTransportHelper {
             currentBuffer.put(plain.get());
         }
         currentBuffer.flip();
-        if (!magicConsumed) {   // The first case we see is special and has to be handled differently
+        if (!magicConsumed) { // The first case we see is special and has to be handled differently
             transportSupport.doConsume(new AmqpHeader(new Buffer(currentBuffer)));
             magicConsumed = true;
         } else {
@@ -172,5 +177,4 @@ public class AmqpNioTransportHelper {
 
         return nextFrameSize;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index f8e5686..ee84a25 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -110,36 +110,38 @@ import org.slf4j.LoggerFactory;
 
 class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
-    static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
+    private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
     private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
-    static final public byte[] EMPTY_BYTE_ARRAY = new byte[] {};
+    private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
     private final AmqpTransport amqpTransport;
     private static final Symbol COPY = Symbol.getSymbol("copy");
     private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
     private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
     private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
 
-    private static final ProtonFactoryLoader<MessageFactory> messageFactoryLoader =
-        new ProtonFactoryLoader<MessageFactory>(MessageFactory.class);
+    private static final ProtonFactoryLoader<MessageFactory> messageFactoryLoader = new ProtonFactoryLoader<MessageFactory>(MessageFactory.class);
 
-    int prefetch = 100;
-
-    EngineFactory engineFactory = new EngineFactoryImpl();
-    Transport protonTransport = engineFactory.createTransport();
-    Connection protonConnection = engineFactory.createConnection();
-    MessageFactory messageFactory = messageFactoryLoader.loadFactory();
-    Collector eventCollector = new CollectorImpl();
+    protected int prefetch = 100;
+    protected EngineFactory engineFactory = new EngineFactoryImpl();
+    protected Transport protonTransport = engineFactory.createTransport();
+    protected Connection protonConnection = engineFactory.createConnection();
+    protected MessageFactory messageFactory = messageFactoryLoader.loadFactory();
+    protected Collector eventCollector = new CollectorImpl();
 
     public AmqpProtocolConverter(AmqpTransport transport) {
         this.amqpTransport = transport;
 
         int maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE;
 
-        // AMQ-4914 - Setting the max frame size to large stalls out the QPid client on sends or
-        //            consume due to no session credit.  Once fixed we should set this value using
-        //            the configured maxFrameSize on the URI.
-        //int maxFrameSize = transport.getWireFormat().getMaxFrameSize() > Integer.MAX_VALUE ?
-        //    Integer.MAX_VALUE : (int) transport.getWireFormat().getMaxFrameSize();
+        // AMQ-4914 - Setting the max frame size to large stalls out the QPid
+        // client on sends or
+        // consume due to no session credit. Once fixed we should set this value
+        // using
+        // the configured maxFrameSize on the URI.
+        // int maxFrameSize = transport.getWireFormat().getMaxFrameSize() >
+        // Integer.MAX_VALUE ?
+        // Integer.MAX_VALUE : (int)
+        // transport.getWireFormat().getMaxFrameSize();
 
         this.protonTransport.setMaxFrameSize(maxFrameSize);
         this.protonTransport.bind(this.protonConnection);
@@ -245,7 +247,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                             if (parts.length > 1) {
                                 connectionInfo.setPassword(parts[1].utf8().toString());
                             }
-                            // We can't really auth at this point since we don't know the client id yet.. :(
+                            // We can't really auth at this point since we don't
+                            // know the client id yet.. :(
                             sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
                             amqpTransport.getWireFormat().magicRead = false;
                             sasl = null;
@@ -371,7 +374,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             if (rh != null) {
                 rh.onResponse(this, response);
             } else {
-                // Pass down any unexpected errors. Should this close the connection?
+                // Pass down any unexpected errors. Should this close the
+                // connection?
                 if (response.isException()) {
                     Throwable exception = ((ExceptionResponse) response).getException();
                     handleException(exception);
@@ -393,7 +397,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 }
             }
         } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
-            // Pass down any unexpected async errors. Should this close the connection?
+            // Pass down any unexpected async errors. Should this close the
+            // connection?
             Throwable exception = ((ConnectionError) command).getException();
             handleException(exception);
         } else if (command.isBrokerInfo()) {
@@ -413,9 +418,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
         abstract public void onDelivery(Delivery delivery) throws Exception;
 
-        public void onClose() throws Exception {}
+        public void onClose() throws Exception {
+        }
 
-        public void drainCheck() {}
+        public void drainCheck() {
+        }
 
         abstract void doCommit() throws Exception;
 
@@ -544,10 +551,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         }
 
         @Override
-        void doCommit() throws Exception {}
+        void doCommit() throws Exception {
+        }
 
         @Override
-        void doRollback() throws Exception {}
+        void doRollback() throws Exception {
+        }
 
         abstract protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception;
     }
@@ -575,7 +584,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 }
                 message.setProducerId(producerId);
 
-                // Always override the AMQP client's MessageId with our own.  Preserve the
+                // Always override the AMQP client's MessageId with our own.
+                // Preserve the
                 // original in the TextView property for later Ack.
                 MessageId messageId = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
 
@@ -599,8 +609,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                     message.setTransactionId(new LocalTransactionId(connectionId, txid));
                 }
 
-                // Lets handle the case where the expiration was set, but the timestamp
-                // was not set by the client. Lets assign the timestamp now, and adjust the
+                // Lets handle the case where the expiration was set, but the
+                // timestamp
+                // was not set by the client. Lets assign the timestamp now, and
+                // adjust the
                 // expiration.
                 if (message.getExpiration() != 0) {
                     if (message.getTimestamp() == 0) {
@@ -624,8 +636,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                                 delivery.disposition(rejected);
                             } else {
                                 if (receiver.getCredit() <= (prefetch * .2)) {
-                                    LOG.trace("Sending more credit ({}) to producer: {}",
-                                              prefetch - receiver.getCredit(), producerId);
+                                    LOG.trace("Sending more credit ({}) to producer: {}", prefetch - receiver.getCredit(), producerId);
                                     receiver.flow(prefetch - receiver.getCredit());
                                 }
 
@@ -638,8 +649,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                     });
                 } else {
                     if (receiver.getCredit() <= (prefetch * .2)) {
-                        LOG.trace("Sending more credit ({}) to producer: {}",
-                                  prefetch - receiver.getCredit(), producerId);
+                        LOG.trace("Sending more credit ({}) to producer: {}", prefetch - receiver.getCredit(), producerId);
                         receiver.flow(prefetch - receiver.getCredit());
                         pumpProtonToSocket();
                     }
@@ -942,8 +952,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                     ActiveMQMessage temp = null;
                     if (md.getMessage() != null) {
 
-                        // Topics can dispatch the same Message to more than one consumer
-                        // so we must copy to prevent concurrent read / write to the same
+                        // Topics can dispatch the same Message to more than one
+                        // consumer
+                        // so we must copy to prevent concurrent read / write to
+                        // the same
                         // message object.
                         if (md.getDestination().isTopic()) {
                             synchronized (md.getMessage()) {
@@ -993,7 +1005,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             }
 
             if (ackType == -1) {
-                // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
+                // we are going to settle, but redeliver.. we we won't yet ack
+                // to ActiveMQ
                 delivery.settle();
                 onMessageDispatch((MessageDispatch) delivery.getContext());
             } else {
@@ -1013,7 +1026,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                     LocalTransactionId localTxId = new LocalTransactionId(connectionId, txid);
                     ack.setTransactionId(localTxId);
 
-                    // Store the message sent in this TX we might need to re-send on rollback
+                    // Store the message sent in this TX we might need to
+                    // re-send on rollback
                     md.getMessage().setTransactionId(localTxId);
                     dispatchedInTx.addFirst(md);
                 }
@@ -1042,7 +1056,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         public void drainCheck() {
             // If we are a browser.. lets not say we are drained until
             // we hit the end of browse message.
-            if( info.isBrowser() && !endOfBrowse)
+            if (info.isBrowser() && !endOfBrowse)
                 return;
 
             if (outbound.isEmpty()) {
@@ -1058,11 +1072,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             if (state instanceof TransactionalState) {
                 TransactionalState txState = (TransactionalState) state;
                 if (txState.getOutcome() instanceof DeliveryState) {
-
                     LOG.trace("onDelivery: TX delivery state = {}", state);
-
                     state = (DeliveryState) txState.getOutcome();
-
                     if (state instanceof Accepted) {
                         if (!delivery.remotelySettled()) {
                             delivery.disposition(new Accepted());
@@ -1073,7 +1084,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             } else {
                 if (state instanceof Accepted) {
                     LOG.trace("onDelivery: accepted state = {}", state);
-
                     if (!delivery.remotelySettled()) {
                         delivery.disposition(new Accepted());
                     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
index 94cdf70..3c94778 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java
@@ -18,7 +18,6 @@ package org.apache.activemq.transport.amqp;
 
 import java.io.IOException;
 
-
 public class AmqpProtocolException extends IOException {
 
     private static final long serialVersionUID = -2869735532997332242L;

http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
index 4e992b7..e3680c5 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
@@ -16,28 +16,26 @@
  */
 package org.apache.activemq.transport.amqp;
 
-import org.fusesource.hawtbuf.Buffer;
-
 import java.nio.ByteBuffer;
 
+import org.fusesource.hawtbuf.Buffer;
+
 /**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public class AmqpSupport {
 
     static public Buffer toBuffer(ByteBuffer data) {
-        if( data == null ) {
+        if (data == null) {
             return null;
         }
         Buffer rc;
-        if( data.isDirect() ) {
+        if (data.isDirect()) {
             rc = new Buffer(data.remaining());
             data.get(rc.data);
         } else {
             rc = new Buffer(data);
-            data.position(data.position()+data.remaining());
+            data.position(data.position() + data.remaining());
         }
         return rc;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
index 02e0c10..58b776f 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
@@ -16,11 +16,11 @@
  */
 package org.apache.activemq.transport.amqp;
 
-import org.apache.activemq.command.Command;
-
 import java.io.IOException;
 import java.security.cert.X509Certificate;
 
+import org.apache.activemq.command.Command;
+
 /**
  * Basic interface that mediates between protocol converter and transport
  */
@@ -36,8 +36,6 @@ public interface AmqpTransport {
 
     public void onException(IOException error);
 
-//    public AmqpInactivityMonitor getInactivityMonitor();
-
     public AmqpWireFormat getWireFormat();
 
     public void stop() throws Exception;
@@ -49,6 +47,7 @@ public interface AmqpTransport {
     public boolean isTrace();
 
     public IAmqpProtocolConverter getProtocolConverter();
+
     public void setProtocolConverter(IAmqpProtocolConverter protocolConverter);
 
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
index 3cf72c9..e394c85 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.transport.amqp;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
@@ -25,9 +28,6 @@ import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.wireformat.WireFormat;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * A <a href="http://amqp.org/">AMQP</a> transport factory
  */
@@ -35,10 +35,12 @@ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerS
 
     private BrokerContext brokerContext = null;
 
+    @Override
     protected String getDefaultWireFormatType() {
         return "amqp";
     }
 
+    @Override
     @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
         transport = new AmqpTransportFilter(transport, format, brokerContext);
@@ -46,6 +48,7 @@ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerS
         return super.compositeConfigure(transport, format, options);
     }
 
+    @Override
     public void setBrokerService(BrokerService brokerService) {
         this.brokerContext = brokerService.getBrokerContext();
     }
@@ -56,26 +59,13 @@ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerS
         transport = super.serverConfigure(transport, format, options);
 
         // strip off the mutex transport.
-        if( transport instanceof MutexTransport ) {
-            transport = ((MutexTransport)transport).getNext();
+        if (transport instanceof MutexTransport) {
+            transport = ((MutexTransport) transport).getNext();
         }
-//        MutexTransport mutex = transport.narrow(MutexTransport.class);
-//        if (mutex != null) {
-//            mutex.setSyncOnCommand(true);
-//        }
+
         return transport;
     }
 
-//    @Override
-//    protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
-//        AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
-//
-//        AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
-//        filter.setInactivityMonitor(monitor);
-//
-//        return monitor;
-//    }
-
     @Override
     protected boolean isUseInactivityMonitor(Transport transport) {
         return false;

http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
index fe58cc6..0f0badb 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
@@ -16,22 +16,22 @@
  */
 package org.apache.activemq.transport.amqp;
 
+import java.io.IOException;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.TransportListener;
-import org.apache.qpid.proton.jms.InboundTransformer;
 import org.apache.activemq.transport.tcp.SslTransport;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
+import org.apache.qpid.proton.jms.InboundTransformer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.security.cert.X509Certificate;
-import java.util.concurrent.locks.ReentrantLock;
-
 /**
  * The AMQPTransportFilter normally sits on top of a TcpTransport that has been
  * configured with the AmqpWireFormat and is used to convert AMQP commands to
@@ -43,12 +43,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
     static final Logger TRACE_BYTES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".BYTES");
     static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".FRAMES");
     private IAmqpProtocolConverter protocolConverter;
-//    private AmqpInactivityMonitor monitor;
     private AmqpWireFormat wireFormat;
 
     private boolean trace;
     private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
-    private ReentrantLock lock = new ReentrantLock();
+    private final ReentrantLock lock = new ReentrantLock();
 
     public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
         super(next);
@@ -58,6 +57,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
         }
     }
 
+    @Override
     public void oneway(Object o) throws IOException {
         try {
             final Command command = (Command) o;
@@ -82,10 +82,12 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
         }
     }
 
+    @Override
     public void sendToActiveMQ(IOException error) {
         super.onException(error);
     }
 
+    @Override
     public void onCommand(Object command) {
         try {
             if (trace) {
@@ -104,6 +106,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
         }
     }
 
+    @Override
     public void sendToActiveMQ(Command command) {
         assert lock.isHeldByCurrentThread();
         TransportListener l = transportListener;
@@ -112,6 +115,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
         }
     }
 
+    @Override
     public void sendToAmqp(Object command) throws IOException {
         assert lock.isHeldByCurrentThread();
         if (trace) {
@@ -123,6 +127,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
         }
     }
 
+    @Override
     public X509Certificate[] getPeerCertificates() {
         if (next instanceof SslTransport) {
             X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
@@ -134,6 +139,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
         return null;
     }
 
+    @Override
     public boolean isTrace() {
         return trace;
     }
@@ -143,15 +149,6 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
         this.protocolConverter.updateTracer();
     }
 
-//    @Override
-//    public AmqpInactivityMonitor getInactivityMonitor() {
-//        return monitor;
-//    }
-//
-//    public void setInactivityMonitor(AmqpInactivityMonitor monitor) {
-//        this.monitor = monitor;
-//    }
-
     @Override
     public AmqpWireFormat getWireFormat() {
         return this.wireFormat;
@@ -161,6 +158,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
         super.onException(e);
     }
 
+    @Override
     public String getTransformer() {
         return transformer;
     }
@@ -168,10 +166,13 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
     public void setTransformer(String transformer) {
         this.transformer = transformer;
     }
+
+    @Override
     public IAmqpProtocolConverter getProtocolConverter() {
         return protocolConverter;
     }
 
+    @Override
     public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) {
         this.protocolConverter = protocolConverter;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
index 11d40fc..75856da 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
@@ -23,6 +23,8 @@ import org.apache.activemq.wireformat.WireFormatFactory;
  * Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
  */
 public class AmqpWireFormatFactory implements WireFormatFactory {
+
+    @Override
     public WireFormat createWireFormat() {
         return new AmqpWireFormat();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f2653e69/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
index 0693a70..392ed77 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
@@ -16,10 +16,9 @@
  */
 package org.apache.activemq.transport.amqp;
 
-import org.apache.activemq.command.Response;
-
 import java.io.IOException;
 
+import org.apache.activemq.command.Response;
 
 /**
  * Interface used by the AMQPProtocolConverter for callbacks.


Mime
View raw message