activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject git commit: Adding hooks so that in the future we can more easily support handling older versions of the AMQP protocol.
Date Wed, 18 Sep 2013 19:27:00 GMT
Updated Branches:
  refs/heads/trunk 97bbd2dfe -> 8d5b9a558


Adding hooks so that in the future we can more easily support handling older versions of the
AMQP protocol.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8d5b9a55
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8d5b9a55
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8d5b9a55

Branch: refs/heads/trunk
Commit: 8d5b9a55876ebaae2f05e037a947c2ee7a9f49f6
Parents: 97bbd2d
Author: Hiram Chirino <hiram@hiramchirino.com>
Authored: Tue Sep 17 15:31:53 2013 -0400
Committer: Hiram Chirino <hiram@hiramchirino.com>
Committed: Wed Sep 18 15:26:07 2013 -0400

----------------------------------------------------------------------
 .../amqp/AMQPProtocolDiscriminator.java         | 105 +++++++++++++++++++
 .../transport/amqp/AmqpNioTransport.java        |  21 ++++
 .../transport/amqp/AmqpProtocolConverter.java   |  31 +++---
 .../activemq/transport/amqp/AmqpTransport.java  |   4 +-
 .../transport/amqp/AmqpTransportFilter.java     |  30 ++++--
 .../transport/amqp/IAmqpProtocolConverter.java  |  36 +++++++
 .../transport/amqp/ResponseHandler.java         |   2 +-
 7 files changed, 200 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8d5b9a55/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
new file mode 100644
index 0000000..8a03d09
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.command.Command;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+/**
+ * Used to assign the best implementation of a AmqpProtocolConverter to the
+ * AmqpTransport based on the AmqpHeader that the client sends us.
+ */
+public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
+
+    final private AmqpTransport transport;
+
+    interface Discriminator {
+        boolean matches(AmqpHeader header);
+        IAmqpProtocolConverter create(AmqpTransport transport);
+    }
+
+    static final private ArrayList<Discriminator> DISCRIMINATORS = new ArrayList<Discriminator>();
+    static {
+        DISCRIMINATORS.add(new Discriminator(){
+
+            @Override
+            public IAmqpProtocolConverter create(AmqpTransport transport) {
+                return new AmqpProtocolConverter(transport);
+            }
+
+            @Override
+            public boolean matches(AmqpHeader header) {
+                switch( header.getProtocolId() ) {
+                    case 0:
+                    case 3:
+                        if( header.getMajor() == 1 && header.getMinor()==0 &&
header.getRevision()==0 )
+                            return true;
+                }
+                return false;
+            }
+        });
+
+    }
+
+    static final private ArrayList<Command> pendingCommands = new ArrayList<Command>();
+
+    public AMQPProtocolDiscriminator(AmqpTransport transport) {
+        this.transport = transport;
+    }
+
+    @Override
+    public void onAMQPData(Object command) throws Exception {
+        if (command.getClass() == AmqpHeader.class) {
+            AmqpHeader header = (AmqpHeader) command;
+
+            Discriminator match = null;
+            for (Discriminator discriminator : DISCRIMINATORS) {
+                if( discriminator.matches(header) ) {
+                    match = discriminator;
+                }
+            }
+            // Lets use first in the list if none are a good match.
+            if( match == null ) {
+                match = DISCRIMINATORS.get(0);
+            }
+            IAmqpProtocolConverter next = match.create(transport);
+            transport.setProtocolConverter(next);
+            for (Command send : pendingCommands) {
+                next.onActiveMQCommand(send);
+            }
+            pendingCommands.clear();
+            next.onAMQPData(command);
+        } else {
+            throw new IllegalStateException();
+        }
+    }
+
+    @Override
+    public void onAMQPException(IOException error) {
+    }
+
+    @Override
+    public void onActiveMQCommand(Command command) throws Exception {
+        pendingCommands.add(command);
+    }
+
+    @Override
+    public void updateTracer() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/8d5b9a55/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
index d4ebaff..665bf88 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.amqp;
 
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
@@ -28,6 +29,7 @@ import java.nio.channels.SocketChannel;
 
 import javax.net.SocketFactory;
 
+import org.apache.activemq.transport.nio.NIOInputStream;
 import org.apache.activemq.transport.nio.NIOOutputStream;
 import org.apache.activemq.transport.nio.SelectorManager;
 import org.apache.activemq.transport.nio.SelectorSelection;
@@ -35,6 +37,7 @@ import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.Buffer;
 
 /**
  * An implementation of the {@link org.apache.activemq.transport.Transport} interface for
using AMQP over NIO
@@ -80,6 +83,8 @@ public class AmqpNioTransport extends TcpTransport {
         this.buffOut = outPutStream;
     }
 
+    boolean magicRead = false;
+
     private void serviceRead() {
         try {
 
@@ -100,9 +105,25 @@ public class AmqpNioTransport extends TcpTransport {
                 receiveCounter += readSize;
 
                 inputBuffer.flip();
+
+                if( !magicRead ) {
+                    if( inputBuffer.remaining()>= 8 ) {
+                        magicRead = true;
+                        Buffer magic = new Buffer(8);
+                        for (int i = 0; i < 8; i++) {
+                            magic.data[i] = inputBuffer.get();
+                        }
+                        doConsume(new AmqpHeader(magic));
+                    } else {
+                        inputBuffer.flip();
+                        continue;
+                    }
+                }
+
                 doConsume(AmqpSupport.toBuffer(inputBuffer));
                 // clear the buffer
                 inputBuffer.clear();
+
             }
         } catch (IOException e) {
             onException(e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/8d5b9a55/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index b27c76c..9d2e079 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -25,11 +25,9 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
 
 import javax.jms.InvalidSelectorException;
 
-import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTempQueue;
@@ -86,7 +84,6 @@ import org.apache.qpid.proton.engine.Sasl;
 import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.engine.impl.ConnectionImpl;
 import org.apache.qpid.proton.engine.impl.EngineFactoryImpl;
 import org.apache.qpid.proton.engine.impl.ProtocolTracer;
 import org.apache.qpid.proton.engine.impl.TransportImpl;
@@ -104,7 +101,7 @@ import org.fusesource.hawtbuf.ByteArrayOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class AmqpProtocolConverter {
+class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
     static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
     public static final EnumSet<EndpointState> UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED);
@@ -122,18 +119,17 @@ class AmqpProtocolConverter {
 
     int prefetch = 100;
 
-    ReentrantLock lock = new ReentrantLock();
     EngineFactory engineFactory = new EngineFactoryImpl();
     Transport protonTransport = engineFactory.createTransport();
     Connection protonConnection = engineFactory.createConnection();
 
-    public AmqpProtocolConverter(AmqpTransport transport, BrokerContext brokerContext) {
+    public AmqpProtocolConverter(AmqpTransport transport) {
         this.amqpTransport = transport;
         this.protonTransport.bind(this.protonConnection);
         updateTracer();
     }
 
-    void updateTracer() {
+    public void updateTracer() {
         if (amqpTransport.isTrace()) {
             ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
                 @Override
@@ -188,6 +184,7 @@ class AmqpProtocolConverter {
     /**
      * Convert a AMQP command
      */
+    @Override
     public void onAMQPData(Object command) throws Exception {
         Buffer frame;
         if (command.getClass() == AmqpHeader.class) {
@@ -313,7 +310,7 @@ class AmqpProtocolConverter {
             closing = true;
             sendToActiveMQ(new RemoveInfo(connectionId), new ResponseHandler() {
                 @Override
-                public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
+                public void onResponse(IAmqpProtocolConverter converter, Response response)
throws IOException {
                     protonConnection.close();
                     if (!closedSocket) {
                         pumpProtonToSocket();
@@ -323,6 +320,7 @@ class AmqpProtocolConverter {
         }
     }
 
+    @Override
     public void onAMQPException(IOException error) {
         closedSocket = true;
         if (!closing) {
@@ -335,6 +333,7 @@ class AmqpProtocolConverter {
         }
     }
 
+    @Override
     public void onActiveMQCommand(Command command) throws Exception {
         if (command.isResponse()) {
             Response response = (Response) command;
@@ -405,7 +404,7 @@ class AmqpProtocolConverter {
 
         sendToActiveMQ(connectionInfo, new ResponseHandler() {
             @Override
-            public void onResponse(AmqpProtocolConverter converter, Response response) throws
IOException {
+            public void onResponse(IAmqpProtocolConverter converter, Response response) throws
IOException {
                 protonConnection.open();
                 pumpProtonToSocket();
 
@@ -554,7 +553,7 @@ class AmqpProtocolConverter {
             message.onSend();
             sendToActiveMQ(message, new ResponseHandler() {
                 @Override
-                public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
+                public void onResponse(IAmqpProtocolConverter converter, Response response)
throws IOException {
                     if (!delivery.remotelySettled()) {
                         if (response.isException()) {
                             ExceptionResponse er = (ExceptionResponse) response;
@@ -649,7 +648,7 @@ class AmqpProtocolConverter {
                 TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId,
txid), operation);
                 sendToActiveMQ(txinfo, new ResponseHandler() {
                     @Override
-                    public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
+                    public void onResponse(IAmqpProtocolConverter converter, Response response)
throws IOException {
                         if (response.isException()) {
                             ExceptionResponse er = (ExceptionResponse) response;
                             Rejected rejected = new Rejected();
@@ -706,7 +705,7 @@ class AmqpProtocolConverter {
                 producerInfo.setDestination(dest);
                 sendToActiveMQ(producerInfo, new ResponseHandler() {
                     @Override
-                    public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
+                    public void onResponse(IAmqpProtocolConverter converter, Response response)
throws IOException {
                         if (response.isException()) {
                             receiver.setTarget(null);
                             Throwable exception = ((ExceptionResponse) response).getException();
@@ -920,7 +919,7 @@ class AmqpProtocolConverter {
 
                 sendToActiveMQ(ack, new ResponseHandler() {
                     @Override
-                    public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
+                    public void onResponse(IAmqpProtocolConverter converter, Response response)
throws IOException {
                         if (response.isException()) {
                             if (response.isException()) {
                                 Throwable exception = ((ExceptionResponse) response).getException();
@@ -1020,7 +1019,7 @@ class AmqpProtocolConverter {
 
                 sendToActiveMQ(pendingTxAck, new ResponseHandler() {
                     @Override
-                    public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
+                    public void onResponse(IAmqpProtocolConverter converter, Response response)
throws IOException {
                         if (response.isException()) {
                             if (response.isException()) {
                                 Throwable exception = ((ExceptionResponse) response).getException();
@@ -1100,7 +1099,7 @@ class AmqpProtocolConverter {
                 consumerContext.closed = true;
                 sendToActiveMQ(rsi, new ResponseHandler() {
                     @Override
-                    public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
+                    public void onResponse(IAmqpProtocolConverter converter, Response response)
throws IOException {
                         if (response.isException()) {
                             sender.setSource(null);
                             Throwable exception = ((ExceptionResponse) response).getException();
@@ -1153,7 +1152,7 @@ class AmqpProtocolConverter {
 
             sendToActiveMQ(consumerInfo, new ResponseHandler() {
                 @Override
-                public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
+                public void onResponse(IAmqpProtocolConverter converter, Response response)
throws IOException {
                     if (response.isException()) {
                         sender.setSource(null);
                         Throwable exception = ((ExceptionResponse) response).getException();

http://git-wip-us.apache.org/repos/asf/activemq/blob/8d5b9a55/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
index ba2a6ac..02e0c10 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.transport.amqp;
 
 import org.apache.activemq.command.Command;
-import org.fusesource.hawtbuf.Buffer;
 
 import java.io.IOException;
 import java.security.cert.X509Certificate;
@@ -49,4 +48,7 @@ public interface AmqpTransport {
 
     public boolean isTrace();
 
+    public IAmqpProtocolConverter getProtocolConverter();
+    public void setProtocolConverter(IAmqpProtocolConverter protocolConverter);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8d5b9a55/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
index 8fe9e3e..fe58cc6 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.security.cert.X509Certificate;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * The AMQPTransportFilter normally sits on top of a TcpTransport that has been
@@ -41,17 +42,17 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
     private static final Logger LOG = LoggerFactory.getLogger(AmqpTransportFilter.class);
     static final Logger TRACE_BYTES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName()
+ ".BYTES");
     static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName()
+ ".FRAMES");
-    private final AmqpProtocolConverter protocolConverter;
+    private IAmqpProtocolConverter protocolConverter;
 //    private AmqpInactivityMonitor monitor;
     private AmqpWireFormat wireFormat;
 
     private boolean trace;
     private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
+    private ReentrantLock lock = new ReentrantLock();
 
     public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext)
{
         super(next);
-        this.protocolConverter = new AmqpProtocolConverter(this, brokerContext);
-
+        this.protocolConverter = new AMQPProtocolDiscriminator(this);
         if (wireFormat instanceof AmqpWireFormat) {
             this.wireFormat = (AmqpWireFormat) wireFormat;
         }
@@ -60,11 +61,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
     public void oneway(Object o) throws IOException {
         try {
             final Command command = (Command) o;
-            protocolConverter.lock.lock();
+            lock.lock();
             try {
                 protocolConverter.onActiveMQCommand(command);
             } finally {
-                protocolConverter.lock.unlock();
+                lock.unlock();
             }
         } catch (Exception e) {
             throw IOExceptionSupport.create(e);
@@ -73,11 +74,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
 
     @Override
     public void onException(IOException error) {
-        protocolConverter.lock.lock();
+        lock.lock();
         try {
             protocolConverter.onAMQPException(error);
         } finally {
-            protocolConverter.lock.unlock();
+            lock.unlock();
         }
     }
 
@@ -90,11 +91,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
             if (trace) {
                 TRACE_BYTES.trace("Received: \n{}", command);
             }
-            protocolConverter.lock.lock();
+            lock.lock();
             try {
                 protocolConverter.onAMQPData(command);
             } finally {
-                protocolConverter.lock.unlock();
+                lock.unlock();
             }
         } catch (IOException e) {
             handleException(e);
@@ -104,7 +105,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
     }
 
     public void sendToActiveMQ(Command command) {
-        assert protocolConverter.lock.isHeldByCurrentThread();
+        assert lock.isHeldByCurrentThread();
         TransportListener l = transportListener;
         if (l != null) {
             l.onCommand(command);
@@ -112,7 +113,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
     }
 
     public void sendToAmqp(Object command) throws IOException {
-        assert protocolConverter.lock.isHeldByCurrentThread();
+        assert lock.isHeldByCurrentThread();
         if (trace) {
             TRACE_BYTES.trace("Sending: \n{}", command);
         }
@@ -167,4 +168,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
     public void setTransformer(String transformer) {
         this.transformer = transformer;
     }
+    public IAmqpProtocolConverter getProtocolConverter() {
+        return protocolConverter;
+    }
+
+    public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) {
+        this.protocolConverter = protocolConverter;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8d5b9a55/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
new file mode 100644
index 0000000..51d4da8
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.activemq.command.Command;
+import org.fusesource.hawtbuf.Buffer;
+
+import java.io.IOException;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ */
+public interface IAmqpProtocolConverter {
+
+    void onAMQPData(Object command) throws Exception;
+
+    void onAMQPException(IOException error);
+
+    void onActiveMQCommand(Command command) throws Exception;
+
+    void updateTracer();
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/8d5b9a55/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
index bf1bdae..0693a70 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
@@ -25,5 +25,5 @@ import java.io.IOException;
  * Interface used by the AMQPProtocolConverter for callbacks.
  */
 interface ResponseHandler {
-    void onResponse(AmqpProtocolConverter converter, Response response) throws IOException;
+    void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException;
 }


Mime
View raw message