activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [2/4] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5591
Date Tue, 24 Mar 2015 22:09:43 GMT
http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
index 5dfdf75..c65145a 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
@@ -25,7 +25,6 @@ import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.transport.amqp.message.InboundTransformer;
 import org.apache.activemq.transport.tcp.SslTransport;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
@@ -41,18 +40,17 @@ import org.slf4j.LoggerFactory;
 public class AmqpTransportFilter extends TransportFilter implements AmqpTransport {
     private static final Logger LOG = LoggerFactory.getLogger(AmqpTransportFilter.class);
     static final Logger TRACE_BYTES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".BYTES");
-    static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".FRAMES");
-    private IAmqpProtocolConverter protocolConverter;
+    public static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".FRAMES");
+    private AmqpProtocolConverter protocolConverter;
     private AmqpWireFormat wireFormat;
     private AmqpInactivityMonitor monitor;
 
     private boolean trace;
-    private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
     private final ReentrantLock lock = new ReentrantLock();
 
     public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerService brokerService) {
         super(next);
-        this.protocolConverter = new AMQPProtocolDiscriminator(this, brokerService);
+        this.protocolConverter = new AmqpProtocolDiscriminator(this, brokerService);
         if (wireFormat instanceof AmqpWireFormat) {
             this.wireFormat = (AmqpWireFormat) wireFormat;
         }
@@ -170,20 +168,20 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
 
     @Override
     public String getTransformer() {
-        return transformer;
+        return wireFormat.getTransformer();
     }
 
     public void setTransformer(String transformer) {
-        this.transformer = transformer;
+        wireFormat.setTransformer(transformer);
     }
 
     @Override
-    public IAmqpProtocolConverter getProtocolConverter() {
+    public AmqpProtocolConverter getProtocolConverter() {
         return protocolConverter;
     }
 
     @Override
-    public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) {
+    public void setProtocolConverter(AmqpProtocolConverter protocolConverter) {
         this.protocolConverter = protocolConverter;
     }
 
@@ -195,7 +193,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
     }
 
     public void setProducerCredit(int producerCredit) {
-        protocolConverter.setProducerCredit(producerCredit);
+        wireFormat.setProducerCredit(producerCredit);
+    }
+
+    public int getProducerCredit() {
+        return wireFormat.getProducerCredit();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
index dc0e3d5..3734cc5 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 
+import org.apache.activemq.transport.amqp.message.InboundTransformer;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
@@ -37,6 +38,7 @@ public class AmqpWireFormat implements WireFormat {
     public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
     public static final int NO_AMQP_MAX_FRAME_SIZE = -1;
     public static final long DEFAULT_CONNECTION_TIMEOUT = 30000L;
+    public static final int DEFAULT_PRODUCER_CREDIT = 1000;
 
     private static final int SASL_PROTOCOL = 3;
 
@@ -44,6 +46,8 @@ public class AmqpWireFormat implements WireFormat {
     private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
     private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE;
     private long connectAttemptTimeout = DEFAULT_CONNECTION_TIMEOUT;
+    private int producerCredit = DEFAULT_PRODUCER_CREDIT;
+    private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
 
     private boolean magicRead = false;
     private ResetListener resetListener;
@@ -207,4 +211,20 @@ public class AmqpWireFormat implements WireFormat {
     public void setConnectAttemptTimeout(long connectAttemptTimeout) {
         this.connectAttemptTimeout = connectAttemptTimeout;
     }
+
+    public void setProducerCredit(int producerCredit) {
+        this.producerCredit = producerCredit;
+    }
+
+    public int getProducerCredit() {
+        return producerCredit;
+    }
+
+    public String getTransformer() {
+        return transformer;
+    }
+
+    public void setTransformer(String transformer) {
+        this.transformer = transformer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
index 75856da..f4de950 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java
@@ -20,7 +20,7 @@ import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.wireformat.WireFormatFactory;
 
 /**
- * Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ * Creates the default AMQP WireFormat object used to configure the protocol support.
  */
 public class AmqpWireFormatFactory implements WireFormatFactory {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
deleted file mode 100644
index 8296ef2..0000000
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import java.io.IOException;
-
-import org.apache.activemq.command.Command;
-
-/**
- */
-public interface IAmqpProtocolConverter {
-
-    void onAMQPData(Object command) throws Exception;
-
-    void onAMQPException(IOException error);
-
-    void onActiveMQCommand(Command command) throws Exception;
-
-    void updateTracer();
-
-    void setProducerCredit(int producerCredit);
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
index 392ed77..901fd69 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ResponseHandler.java
@@ -21,8 +21,21 @@ import java.io.IOException;
 import org.apache.activemq.command.Response;
 
 /**
- * Interface used by the AMQPProtocolConverter for callbacks.
+ * Interface used by the AmqpProtocolConverter for callbacks from the broker.
  */
-interface ResponseHandler {
-    void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException;
+public interface ResponseHandler {
+
+    /**
+     * Called when the Broker has handled a previously issued request and
+     * has a response ready.
+     *
+     * @param converter
+     *        the protocol converter that is awaiting the response.
+     * @param response
+     *        the response from the broker.
+     *
+     * @throws IOException if an error occurs while processing the response.
+     */
+    void onResponse(AmqpProtocolConverter converter, Response response) throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractLink.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractLink.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractLink.java
new file mode 100644
index 0000000..d4fe301
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractLink.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.amqp.ResponseHandler;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Sender;
+
+/**
+ * Abstract AmqpLink implementation that provide basic Link services.
+ */
+public abstract class AmqpAbstractLink<LINK_TYPE extends Link> implements AmqpLink {
+
+    protected final AmqpSession session;
+    protected final LINK_TYPE endpoint;
+
+    protected boolean closed;
+    protected boolean opened;
+    protected List<Runnable> closeActions = new ArrayList<Runnable>();
+
+    /**
+     * Creates a new AmqpLink type.
+     *
+     * @param session
+     *        the AmqpSession that servers as the parent of this Link.
+     * @param endpoint
+     *        the link endpoint this object represents.
+     */
+    public AmqpAbstractLink(AmqpSession session, LINK_TYPE endpoint) {
+        this.session = session;
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    public void open() {
+        if (!opened) {
+            getEndpoint().setContext(this);
+            getEndpoint().open();
+
+            opened = true;
+        }
+    }
+
+    @Override
+    public void detach() {
+        if (!closed) {
+            if (getEndpoint() != null) {
+                getEndpoint().setContext(null);
+                getEndpoint().detach();
+                getEndpoint().free();
+            }
+        }
+    }
+
+    @Override
+    public void close(ErrorCondition error) {
+        if (!closed) {
+
+            if (getEndpoint() != null) {
+                if (getEndpoint() instanceof Sender) {
+                    getEndpoint().setSource(null);
+                } else {
+                    getEndpoint().setTarget(null);
+                }
+                getEndpoint().setCondition(error);
+            }
+
+            close();
+        }
+    }
+
+    @Override
+    public void close() {
+        if (!closed) {
+
+            if (getEndpoint() != null) {
+                getEndpoint().setContext(null);
+                getEndpoint().close();
+                getEndpoint().free();
+            }
+
+            for (Runnable action : closeActions) {
+                action.run();
+            }
+
+            closeActions.clear();
+            opened = false;
+            closed = true;
+        }
+    }
+
+    /**
+     * @return true if this link has already been opened.
+     */
+    public boolean isOpened() {
+        return opened;
+    }
+
+    /**
+     * @return true if this link has already been closed.
+     */
+    public boolean isClosed() {
+        return closed;
+    }
+
+    /**
+     * @return the Proton Link type this link represents.
+     */
+    public LINK_TYPE getEndpoint() {
+        return endpoint;
+    }
+
+    /**
+     * @return the parent AmqpSession for this Link instance.
+     */
+    public AmqpSession getSession() {
+        return session;
+    }
+
+    @Override
+    public void addCloseAction(Runnable action) {
+        closeActions.add(action);
+    }
+
+    /**
+     * Shorcut method to hand off an ActiveMQ Command to the broker and assign
+     * a ResponseHandler to deal with any reply from the broker.
+     *
+     * @param command
+     *        the Command object to send to the Broker.
+     */
+    protected void sendToActiveMQ(Command command) {
+        session.getConnection().sendToActiveMQ(command, null);
+    }
+
+    /**
+     * Shorcut method to hand off an ActiveMQ Command to the broker and assign
+     * a ResponseHandler to deal with any reply from the broker.
+     *
+     * @param command
+     *        the Command object to send to the Broker.
+     * @param handler
+     *        the ResponseHandler that will handle the Broker's response.
+     */
+    protected void sendToActiveMQ(Command command, ResponseHandler handler) {
+        session.getConnection().sendToActiveMQ(command, handler);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
new file mode 100644
index 0000000..7436a78
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base that provides common services for AMQP Receiver types.
+ */
+public abstract class AmqpAbstractReceiver extends AmqpAbstractLink<Receiver> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpAbstractReceiver.class);
+
+    protected ByteArrayOutputStream current = new ByteArrayOutputStream();
+    protected final byte[] recvBuffer = new byte[1024 * 8];
+
+    /**
+     * Handle create of new AMQP Receiver instance.
+     *
+     * @param session
+     *        the AmqpSession that servers as the parent of this Link.
+     * @param endpoint
+     *        the Receiver endpoint being managed by this class.
+     */
+    public AmqpAbstractReceiver(AmqpSession session, Receiver endpoint) {
+        super(session, endpoint);
+    }
+
+    @Override
+    public void detach() {
+    }
+
+    @Override
+    public void flow() throws Exception {
+    }
+
+    /**
+     * Provide the receiver endpoint with the given amount of credits.
+     *
+     * @param credits
+     *        the credit value to pass on to the wrapped Receiver.
+     */
+    public void flow(int credits) {
+        getEndpoint().flow(credits);
+    }
+
+    @Override
+    public void commit() throws Exception {
+    }
+
+    @Override
+    public void rollback() throws Exception {
+    }
+
+    @Override
+    public void delivery(Delivery delivery) throws Exception {
+
+        if (!delivery.isReadable()) {
+            LOG.debug("Delivery was not readable!");
+            return;
+        }
+
+        if (current == null) {
+            current = new ByteArrayOutputStream();
+        }
+
+        int count;
+        while ((count = getEndpoint().recv(recvBuffer, 0, recvBuffer.length)) > 0) {
+            current.write(recvBuffer, 0, count);
+        }
+
+        // Expecting more deliveries..
+        if (count == 0) {
+            return;
+        }
+
+        try {
+            processDelivery(delivery, current.toBuffer());
+        } finally {
+            getEndpoint().advance();
+            current = null;
+        }
+    }
+
+    protected abstract void processDelivery(Delivery delivery, Buffer deliveryBytes) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
new file mode 100644
index 0000000..a902315
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -0,0 +1,742 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.ANONYMOUS_RELAY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
+import static org.apache.activemq.transport.amqp.AmqpSupport.QUEUE_PREFIX;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TOPIC_PREFIX;
+import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.security.cert.X509Certificate;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.InvalidClientIDException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.TopicRegion;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.security.AuthenticationBroker;
+import org.apache.activemq.security.SecurityContext;
+import org.apache.activemq.transport.amqp.AmqpHeader;
+import org.apache.activemq.transport.amqp.AmqpInactivityMonitor;
+import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
+import org.apache.activemq.transport.amqp.AmqpProtocolException;
+import org.apache.activemq.transport.amqp.AmqpTransport;
+import org.apache.activemq.transport.amqp.AmqpTransportFilter;
+import org.apache.activemq.transport.amqp.AmqpWireFormat;
+import org.apache.activemq.transport.amqp.ResponseHandler;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.impl.CollectorImpl;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements the mechanics of managing a single remote peer connection.
+ */
+public class AmqpConnection implements AmqpProtocolConverter {
+
+    private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
+    private static final int CHANNEL_MAX = 32767;
+
+    private final Transport protonTransport = Proton.transport();
+    private final Connection protonConnection = Proton.connection();
+    private final Collector eventCollector = new CollectorImpl();
+
+    private final AmqpTransport amqpTransport;
+    private final AmqpWireFormat amqpWireFormat;
+    private final BrokerService brokerService;
+    private AuthenticationBroker authenticator;
+    private Sasl sasl;
+
+    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+    private final AtomicInteger lastCommandId = new AtomicInteger();
+    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
+    private final ConnectionInfo connectionInfo = new ConnectionInfo();
+    private long nextSessionId = 0;
+    private long nextTempDestinationId = 0;
+    private boolean closing = false;
+    private boolean closedSocket = false;
+
+    private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
+    private final ConcurrentMap<ConsumerId, AmqpSender> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, AmqpSender>();
+
+    public AmqpConnection(AmqpTransport transport, BrokerService brokerService) {
+        this.amqpTransport = transport;
+        AmqpInactivityMonitor monitor = transport.getInactivityMonitor();
+        if (monitor != null) {
+            monitor.setProtocolConverter(this);
+        }
+        this.amqpWireFormat = transport.getWireFormat();
+        this.brokerService = brokerService;
+
+        // the configured maxFrameSize on the URI.
+        int maxFrameSize = amqpWireFormat.getMaxAmqpFrameSize();
+        if (maxFrameSize > AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE) {
+            this.protonTransport.setMaxFrameSize(maxFrameSize);
+        }
+
+        this.protonTransport.bind(this.protonConnection);
+        this.protonTransport.setChannelMax(CHANNEL_MAX);
+
+        this.protonConnection.collect(eventCollector);
+
+        updateTracer();
+    }
+
+    /**
+     * Load and return a <code>[]Symbol</code> that contains the connection capabilities
+     * offered to new connections
+     *
+     * @return the capabilities that are offered to new clients on connect.
+     */
+    protected Symbol[] getConnectionCapabilitiesOffered() {
+        return new Symbol[]{ ANONYMOUS_RELAY };
+    }
+
+    /**
+     * Load and return a <code>Map<Symbol, Object></code> that contains the properties
+     * that this connection supplies to incoming connections.
+     *
+     * @return the properties that are offered to the incoming connection.
+     */
+    protected Map<Symbol, Object> getConnetionProperties() {
+        Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
+
+        properties.put(QUEUE_PREFIX, "queue://");
+        properties.put(TOPIC_PREFIX, "topic://");
+
+        return properties;
+    }
+
+    /**
+     * Load and return a <code>Map<Symbol, Object></code> that contains the properties
+     * that this connection supplies to incoming connections when the open has failed
+     * and the remote should expect a close to follow.
+     *
+     * @return the properties that are offered to the incoming connection.
+     */
+    protected Map<Symbol, Object> getFailedConnetionProperties() {
+        Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
+
+        properties.put(CONNECTION_OPEN_FAILED, true);
+
+        return properties;
+    }
+
+    @Override
+    public void updateTracer() {
+        if (amqpTransport.isTrace()) {
+            ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
+                @Override
+                public void receivedFrame(TransportFrame transportFrame) {
+                    TRACE_FRAMES.trace("{} | RECV: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody());
+                }
+
+                @Override
+                public void sentFrame(TransportFrame transportFrame) {
+                    TRACE_FRAMES.trace("{} | SENT: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody());
+                }
+            });
+        }
+    }
+
+    //----- Connection Properties Accessors ----------------------------------//
+
+    /**
+     * @return the amount of credit assigned to AMQP receiver links created from
+     *         sender links on the remote peer.
+     */
+    public int getConfiguredReceiverCredit() {
+        return amqpWireFormat.getProducerCredit();
+    }
+
+    /**
+     * @return the transformer type that was configured for this AMQP transport.
+     */
+    public String getConfiguredTransformer() {
+        return amqpWireFormat.getTransformer();
+    }
+
+    /**
+     * @return the ActiveMQ ConnectionId that identifies this AMQP Connection.
+     */
+    public ConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    /**
+     * @return the Client ID used to create the connection with ActiveMQ
+     */
+    public String getClientId() {
+        return connectionInfo.getClientId();
+    }
+
+    //----- Proton Event handling and IO support -----------------------------//
+
+    void pumpProtonToSocket() {
+        try {
+            boolean done = false;
+            while (!done) {
+                ByteBuffer toWrite = protonTransport.getOutputBuffer();
+                if (toWrite != null && toWrite.hasRemaining()) {
+                    LOG.trace("Sending {} bytes out", toWrite.limit());
+                    amqpTransport.sendToAmqp(toWrite);
+                    protonTransport.outputConsumed();
+                } else {
+                    done = true;
+                }
+            }
+        } catch (IOException e) {
+            amqpTransport.onException(e);
+        }
+    }
+
+    @Override
+    public void onAMQPData(Object command) throws Exception {
+        Buffer frame;
+        if (command.getClass() == AmqpHeader.class) {
+            AmqpHeader header = (AmqpHeader) command;
+
+            if (amqpWireFormat.isHeaderValid(header)) {
+                LOG.trace("Connection from an AMQP v1.0 client initiated. {}", header);
+            } else {
+                LOG.warn("Connection attempt from non AMQP v1.0 client. {}", header);
+                AmqpHeader reply = amqpWireFormat.getMinimallySupportedHeader();
+                amqpTransport.sendToAmqp(reply.getBuffer());
+                handleException(new AmqpProtocolException(
+                    "Connection from client using unsupported AMQP attempted", true));
+            }
+
+            switch (header.getProtocolId()) {
+                case 0:
+                    break; // nothing to do..
+                case 3: // Client will be using SASL for auth..
+                    sasl = protonTransport.sasl();
+                    sasl.setMechanisms(new String[] { "ANONYMOUS", "PLAIN" });
+                    sasl.server();
+                    break;
+                default:
+            }
+            frame = header.getBuffer();
+        } else {
+            frame = (Buffer) command;
+        }
+
+        onFrame(frame);
+    }
+
+    public void onFrame(Buffer frame) throws Exception {
+        while (frame.length > 0) {
+            try {
+                int count = protonTransport.input(frame.data, frame.offset, frame.length);
+                frame.moveHead(count);
+            } catch (Throwable e) {
+                handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));
+                return;
+            }
+
+            try {
+                if (sasl != null) {
+                    // Lets try to complete the sasl handshake.
+                    if (sasl.getRemoteMechanisms().length > 0) {
+                        if ("PLAIN".equals(sasl.getRemoteMechanisms()[0])) {
+                            byte[] data = new byte[sasl.pending()];
+                            sasl.recv(data, 0, data.length);
+                            Buffer[] parts = new Buffer(data).split((byte) 0);
+                            if (parts.length > 0) {
+                                connectionInfo.setUserName(parts[0].utf8().toString());
+                            }
+                            if (parts.length > 1) {
+                                connectionInfo.setPassword(parts[1].utf8().toString());
+                            }
+
+                            if (tryAuthenticate(connectionInfo, amqpTransport.getPeerCertificates())) {
+                                sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+                            } else {
+                                sasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
+                            }
+
+                            amqpTransport.getWireFormat().resetMagicRead();
+                            sasl = null;
+                            LOG.debug("SASL [PLAIN] Handshake complete.");
+                        } else if ("ANONYMOUS".equals(sasl.getRemoteMechanisms()[0])) {
+                            if (tryAuthenticate(connectionInfo, amqpTransport.getPeerCertificates())) {
+                                sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+                            } else {
+                                sasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
+                            }
+                            amqpTransport.getWireFormat().resetMagicRead();
+                            sasl = null;
+                            LOG.debug("SASL [ANONYMOUS] Handshake complete.");
+                        }
+                    }
+                }
+
+                Event event = null;
+                while ((event = eventCollector.peek()) != null) {
+                    if (amqpTransport.isTrace()) {
+                        LOG.trace("Processing event: {}", event.getType());
+                    }
+                    switch (event.getType()) {
+                        case CONNECTION_REMOTE_OPEN:
+                            processConnectionOpen(event.getConnection());
+                            break;
+                        case CONNECTION_REMOTE_CLOSE:
+                            processConnectionClose(event.getConnection());
+                            break;
+                        case SESSION_REMOTE_OPEN:
+                            processSessionOpen(event.getSession());
+                            break;
+                        case SESSION_REMOTE_CLOSE:
+                            processSessionClose(event.getSession());
+                            break;
+                        case LINK_REMOTE_OPEN:
+                            processLinkOpen(event.getLink());
+                            break;
+                        case LINK_REMOTE_DETACH:
+                            processLinkDetach(event.getLink());
+                            break;
+                        case LINK_REMOTE_CLOSE:
+                            processLinkClose(event.getLink());
+                            break;
+                        case LINK_FLOW:
+                            processLinkFlow(event.getLink());
+                            break;
+                        case DELIVERY:
+                            processDelivery(event.getDelivery());
+                            break;
+                        default:
+                            break;
+                    }
+
+                    eventCollector.pop();
+                }
+
+            } catch (Throwable e) {
+                handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
+            }
+
+            pumpProtonToSocket();
+        }
+    }
+
+    protected void processConnectionOpen(Connection connection) throws Exception {
+
+        connectionInfo.setResponseRequired(true);
+        connectionInfo.setConnectionId(connectionId);
+
+        configureInactivityMonitor();
+
+        String clientId = protonConnection.getRemoteContainer();
+        if (clientId != null && !clientId.isEmpty()) {
+            connectionInfo.setClientId(clientId);
+        }
+
+        connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
+
+        sendToActiveMQ(connectionInfo, new ResponseHandler() {
+            @Override
+            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                Throwable exception = null;
+                try {
+                    if (response.isException()) {
+                        protonConnection.setProperties(getFailedConnetionProperties());
+                        protonConnection.open();
+
+                        exception = ((ExceptionResponse) response).getException();
+                        if (exception instanceof SecurityException) {
+                            protonConnection.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
+                        } else if (exception instanceof InvalidClientIDException) {
+                            protonConnection.setCondition(new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage()));
+                        } else {
+                            protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, exception.getMessage()));
+                        }
+
+                        protonConnection.close();
+                    } else {
+                        protonConnection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+                        protonConnection.setProperties(getConnetionProperties());
+                        protonConnection.open();
+                    }
+                } finally {
+                    pumpProtonToSocket();
+
+                    if (response.isException()) {
+                        amqpTransport.onException(IOExceptionSupport.create(exception));
+                    }
+                }
+            }
+        });
+    }
+
+    protected void processConnectionClose(Connection connection) throws Exception {
+        if (!closing) {
+            closing = true;
+            sendToActiveMQ(new RemoveInfo(connectionId), new ResponseHandler() {
+                @Override
+                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                    protonConnection.close();
+                    protonConnection.free();
+
+                    if (!closedSocket) {
+                        pumpProtonToSocket();
+                    }
+                }
+            });
+
+            sendToActiveMQ(new ShutdownInfo(), null);
+        }
+    }
+
+    protected void processSessionOpen(Session protonSession) throws Exception {
+        new AmqpSession(this, getNextSessionId(), protonSession).open();
+    }
+
+    protected void processSessionClose(Session protonSession) throws Exception {
+        if (protonSession.getContext() != null) {
+            ((AmqpResource) protonSession.getContext()).close();
+        } else {
+            protonSession.close();
+            protonSession.free();
+        }
+    }
+
+    protected void processLinkOpen(Link link) throws Exception {
+        link.setSource(link.getRemoteSource());
+        link.setTarget(link.getRemoteTarget());
+
+        AmqpSession session = (AmqpSession) link.getSession().getContext();
+        if (link instanceof Receiver) {
+            if (link.getRemoteTarget() instanceof Coordinator) {
+                session.createCoordinator((Receiver) link);
+            } else {
+                session.createReceiver((Receiver) link);
+            }
+        } else {
+            session.createSender((Sender) link);
+        }
+    }
+
+    protected void processLinkDetach(Link link) throws Exception {
+        Object context = link.getContext();
+
+        if (context instanceof AmqpLink) {
+            ((AmqpLink) context).detach();
+        } else {
+            link.detach();
+            link.free();
+        }
+    }
+
+    protected void processLinkClose(Link link) throws Exception {
+        Object context = link.getContext();
+
+        if (context instanceof AmqpLink) {
+            ((AmqpLink) context).close();;
+        } else {
+            link.close();
+            link.free();
+        }
+    }
+
+    protected void processLinkFlow(Link link) throws Exception {
+        Object context = link.getContext();
+        if (context instanceof AmqpLink) {
+            ((AmqpLink) context).flow();
+        }
+    }
+
+    protected void processDelivery(Delivery delivery) throws Exception {
+        if (!delivery.isPartial()) {
+            Object context = delivery.getLink().getContext();
+            if (context instanceof AmqpLink) {
+                AmqpLink amqpLink = (AmqpLink) context;
+                amqpLink.delivery(delivery);
+            }
+        }
+    }
+
+    //----- Event entry points for ActiveMQ commands and errors --------------//
+
+    @Override
+    public void onAMQPException(IOException error) {
+        closedSocket = true;
+        if (!closing) {
+            amqpTransport.sendToActiveMQ(error);
+        } else {
+            try {
+                amqpTransport.stop();
+            } catch (Exception ignore) {
+            }
+        }
+    }
+
+    @Override
+    public void onActiveMQCommand(Command command) throws Exception {
+        if (command.isResponse()) {
+            Response response = (Response) command;
+            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
+            if (rh != null) {
+                rh.onResponse(this, response);
+            } else {
+                // Pass down any unexpected errors. Should this close the connection?
+                if (response.isException()) {
+                    Throwable exception = ((ExceptionResponse) response).getException();
+                    handleException(exception);
+                }
+            }
+        } else if (command.isMessageDispatch()) {
+            MessageDispatch dispatch = (MessageDispatch) command;
+            AmqpSender sender = subscriptionsByConsumerId.get(dispatch.getConsumerId());
+            if (sender != null) {
+                // End of Queue Browse will have no Message object.
+                if (dispatch.getMessage() != null) {
+                    LOG.trace("Dispatching MessageId: {} to consumer", dispatch.getMessage().getMessageId());
+                } else {
+                    LOG.trace("Dispatching End of Browse Command to consumer {}", dispatch.getConsumerId());
+                }
+                sender.onMessageDispatch(dispatch);
+                if (dispatch.getMessage() != null) {
+                    LOG.trace("Finished Dispatch of MessageId: {} to consumer", dispatch.getMessage().getMessageId());
+                }
+            }
+        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
+            // Pass down any unexpected async errors. Should this close the connection?
+            Throwable exception = ((ConnectionError) command).getException();
+            handleException(exception);
+        } else if (command.isBrokerInfo()) {
+            // ignore
+        } else {
+            LOG.debug("Do not know how to process ActiveMQ Command {}", command);
+        }
+    }
+
+    //----- Utility methods for connection resources to use ------------------//
+
+    void regosterSender(ConsumerId consumerId, AmqpSender sender) {
+        subscriptionsByConsumerId.put(consumerId, sender);
+    }
+
+    void unregosterSender(ConsumerId consumerId) {
+        subscriptionsByConsumerId.remove(consumerId);
+    }
+
+    ActiveMQDestination lookupSubscription(String subscriptionName) throws AmqpProtocolException {
+        ActiveMQDestination result = null;
+        RegionBroker regionBroker;
+
+        try {
+            regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
+        } catch (Exception e) {
+            throw new AmqpProtocolException("Error finding subscription: " + subscriptionName + ": " + e.getMessage(), false, e);
+        }
+
+        final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
+        DurableTopicSubscription subscription = topicRegion.lookupSubscription(subscriptionName, connectionInfo.getClientId());
+        if (subscription != null) {
+            result = subscription.getActiveMQDestination();
+        }
+
+        return result;
+    }
+
+    ActiveMQDestination createTemporaryDestination(final Link link, Symbol[] capabilities) {
+        ActiveMQDestination rc = null;
+        if (contains(capabilities, TEMP_TOPIC_CAPABILITY)) {
+            rc = new ActiveMQTempTopic(connectionId, nextTempDestinationId++);
+        } else if (contains(capabilities, TEMP_QUEUE_CAPABILITY)) {
+            rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
+        } else {
+            LOG.debug("Dynamic link request with no type capability, defaults to Temporary Queue");
+            rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
+        }
+
+        DestinationInfo info = new DestinationInfo();
+        info.setConnectionId(connectionId);
+        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
+        info.setDestination(rc);
+
+        sendToActiveMQ(info, new ResponseHandler() {
+
+            @Override
+            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                if (response.isException()) {
+                    link.setSource(null);
+
+                    Throwable exception = ((ExceptionResponse) response).getException();
+                    if (exception instanceof SecurityException) {
+                        link.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
+                    } else {
+                        link.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
+                    }
+
+                    link.close();
+                    link.free();
+                }
+            }
+        });
+
+        return rc;
+    }
+
+    void deleteTemporaryDestination(ActiveMQTempDestination destination) {
+        DestinationInfo info = new DestinationInfo();
+        info.setConnectionId(connectionId);
+        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
+        info.setDestination(destination);
+
+        sendToActiveMQ(info, new ResponseHandler() {
+
+            @Override
+            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                if (response.isException()) {
+                    Throwable exception = ((ExceptionResponse) response).getException();
+                    LOG.debug("Error during temp destination removeal: {}", exception.getMessage());
+                }
+            }
+        });
+    }
+
+    void sendToActiveMQ(Command command) {
+        sendToActiveMQ(command, null);
+    }
+
+    void sendToActiveMQ(Command command, ResponseHandler handler) {
+        command.setCommandId(lastCommandId.incrementAndGet());
+        if (handler != null) {
+            command.setResponseRequired(true);
+            resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
+        }
+        amqpTransport.sendToActiveMQ(command);
+    }
+
+    void handleException(Throwable exception) {
+        exception.printStackTrace();
+        LOG.debug("Exception detail", exception);
+        try {
+            amqpTransport.stop();
+        } catch (Throwable e) {
+            LOG.error("Failed to stop AMQP Transport ", e);
+        }
+    }
+
+    //----- Internal implementation ------------------------------------------//
+
+    private SessionId getNextSessionId() {
+        return new SessionId(connectionId, nextSessionId++);
+    }
+
+    private void configureInactivityMonitor() {
+        AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
+        if (monitor == null) {
+            return;
+        }
+
+        monitor.stopConnectChecker();
+    }
+
+    private boolean tryAuthenticate(ConnectionInfo info, X509Certificate[] peerCertificates) {
+        try {
+            if (getAuthenticator().authenticate(info.getUserName(), info.getPassword(), peerCertificates) != null) {
+                return true;
+            }
+
+            return false;
+        } catch (Throwable error) {
+            return false;
+        }
+    }
+
+    private AuthenticationBroker getAuthenticator() {
+        if (authenticator == null) {
+            try {
+                authenticator = (AuthenticationBroker) brokerService.getBroker().getAdaptor(AuthenticationBroker.class);
+            } catch (Exception e) {
+                LOG.debug("Failed to lookup AuthenticationBroker from Broker, will use a default Noop version.");
+            }
+
+            if (authenticator == null) {
+                authenticator = new DefaultAuthenticationBroker();
+            }
+        }
+
+        return authenticator;
+    }
+
+    private class DefaultAuthenticationBroker implements AuthenticationBroker {
+
+        @Override
+        public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates) throws SecurityException {
+            return new SecurityContext(username) {
+
+                @Override
+                public Set<Principal> getPrincipals() {
+                    return null;
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java
new file mode 100644
index 0000000..d245769
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * Interface used to define the operations needed to implement an AMQP
+ * Link based endpoint, i.e. Sender, Receiver or Coordinator.
+ */
+public interface AmqpLink extends AmqpResource {
+
+    /**
+     * Close the Link with an error indicating the reson for the close.
+     *
+     * @param error
+     *        the error that prompted the close.
+     */
+    void close(ErrorCondition error);
+
+    /**
+     * Request from the remote peer to detach this resource.
+     */
+    void detach();
+
+    /**
+     * Handles an incoming flow control.
+     *
+     * @throws Excption if an error occurs during the flow processing.
+     */
+    void flow() throws Exception;
+
+    /**
+     * Called when a new Delivery arrives for the given Link.
+     *
+     * @param delivery
+     *        the newly arrived delivery on this link.
+     *
+     * @throws Exception if an error occurs while processing the new Delivery.
+     */
+    void delivery(Delivery delivery) throws Exception;
+
+    /**
+     * Handle work necessary on commit of transacted resources associated with
+     * this Link instance.
+     *
+     * @throws Exception if an error occurs while performing the commit.
+     */
+    void commit() throws Exception;
+
+    /**
+     * Handle work necessary on rollback of transacted resources associated with
+     * this Link instance.
+     *
+     * @throws Exception if an error occurs while performing the rollback.
+     */
+    void rollback() throws Exception;
+
+    /**
+     * @return the ActiveMQDestination that this link is servicing.
+     */
+    public ActiveMQDestination getDestination();
+
+    /**
+     * Sets the ActiveMQDestination that this link will be servicing.
+     *
+     * @param destination
+     *        the ActiveMQDestination that this link services.
+     */
+    public void setDestination(ActiveMQDestination destination);
+
+    /**
+     * Adds a new Runnable that is called on close of this link.
+     *
+     * @param action
+     *        a Runnable that will be executed when the link closes or detaches.
+     */
+    public void addCloseAction(Runnable action);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
new file mode 100644
index 0000000..9ab7ebe
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
+
+import java.io.IOException;
+
+import javax.jms.Destination;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
+import org.apache.activemq.transport.amqp.ResponseHandler;
+import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer;
+import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer;
+import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
+import org.apache.activemq.transport.amqp.message.EncodedMessage;
+import org.apache.activemq.transport.amqp.message.InboundTransformer;
+import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An AmqpReceiver wraps the AMQP Receiver end of a link from the remote peer
+ * which holds the corresponding Sender which transfers message accross the
+ * link.  The AmqpReceiver handles all incoming deliveries by converting them
+ * or wrapping them into an ActiveMQ message object and forwarding that message
+ * on to the appropriate ActiveMQ Destination.
+ */
+public class AmqpReceiver extends AmqpAbstractReceiver {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class);
+
+    private final ProducerInfo producerInfo;
+    private final int configuredCredit;
+    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
+
+    private InboundTransformer inboundTransformer;
+
+    /**
+     * Create a new instance of an AmqpReceiver
+     *
+     * @param session
+     *        the Session that is the parent of this AmqpReceiver instance.
+     * @param endpoint
+     *        the AMQP receiver endpoint that the class manages.
+     * @param producerInfo
+     *        the ProducerInfo instance that contains this sender's configuration.
+     */
+    public AmqpReceiver(AmqpSession session, Receiver endpoint, ProducerInfo producerInfo) {
+        super(session, endpoint);
+
+        this.producerInfo = producerInfo;
+        this.configuredCredit = session.getConnection().getConfiguredReceiverCredit();
+    }
+
+    @Override
+    public void close() {
+        if (!isClosed() && isOpened()) {
+            sendToActiveMQ(new RemoveInfo(getProducerId()));
+        }
+
+        super.close();
+    }
+
+    //----- Configuration accessors ------------------------------------------//
+
+    /**
+     * @return the ActiveMQ ProducerId used to register this Receiver on the Broker.
+     */
+    public ProducerId getProducerId() {
+        return producerInfo.getProducerId();
+    }
+
+    @Override
+    public ActiveMQDestination getDestination() {
+        return producerInfo.getDestination();
+    }
+
+    @Override
+    public void setDestination(ActiveMQDestination destination) {
+        producerInfo.setDestination(destination);
+    }
+
+    /**
+     * If the Sender that initiated this Receiver endpoint did not define an address
+     * then it is using anonymous mode and message are to be routed to the address
+     * that is defined in the AMQP message 'To' field.
+     *
+     * @return true if this Receiver should operate in anonymous mode.
+     */
+    public boolean isAnonymous() {
+        return producerInfo.getDestination() == null;
+    }
+
+    /**
+     * Returns the amount of receiver credit that has been configured for this AMQP
+     * transport.  If no value was configured on the TransportConnector URI then a
+     * sensible default is used.
+     *
+     * @return the configured receiver credit to grant.
+     */
+    public int getConfiguredReceiverCredit() {
+        return configuredCredit;
+    }
+
+    //----- Internal Implementation ------------------------------------------//
+
+    protected InboundTransformer getInboundTransformer() {
+        if (inboundTransformer == null) {
+            String transformer = session.getConnection().getConfiguredTransformer();
+            if (transformer.equals(InboundTransformer.TRANSFORMER_JMS)) {
+                inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+            } else if (transformer.equals(InboundTransformer.TRANSFORMER_NATIVE)) {
+                inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+            } else if (transformer.equals(InboundTransformer.TRANSFORMER_RAW)) {
+                inboundTransformer = new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+            } else {
+                LOG.warn("Unknown transformer type {} using native one instead", transformer);
+                inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+            }
+        }
+        return inboundTransformer;
+    }
+
+    @Override
+    protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws Exception {
+        if (!isClosed()) {
+            EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length);
+            final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
+            current = null;
+
+            if (isAnonymous()) {
+                Destination toDestination = message.getJMSDestination();
+                if (toDestination == null || !(toDestination instanceof ActiveMQDestination)) {
+                    Rejected rejected = new Rejected();
+                    ErrorCondition condition = new ErrorCondition();
+                    condition.setCondition(Symbol.valueOf("failed"));
+                    condition.setDescription("Missing to field for message sent to an anonymous producer");
+                    rejected.setError(condition);
+                    delivery.disposition(rejected);
+                    return;
+                }
+            } else {
+                message.setJMSDestination(getDestination());
+            }
+
+            message.setProducerId(getProducerId());
+
+            // Always override the AMQP client's MessageId with our own.  Preserve
+            // the original in the TextView property for later Ack.
+            MessageId messageId = new MessageId(getProducerId(), messageIdGenerator.getNextSequenceId());
+
+            MessageId amqpMessageId = message.getMessageId();
+            if (amqpMessageId != null) {
+                if (amqpMessageId.getTextView() != null) {
+                    messageId.setTextView(amqpMessageId.getTextView());
+                } else {
+                    messageId.setTextView(amqpMessageId.toString());
+                }
+            }
+
+            message.setMessageId(messageId);
+
+            LOG.trace("Inbound Message:{} from Producer:{}",
+                      message.getMessageId(), getProducerId() + ":" + messageId.getProducerSequenceId());
+
+            final DeliveryState remoteState = delivery.getRemoteState();
+            if (remoteState != null && remoteState instanceof TransactionalState) {
+                TransactionalState s = (TransactionalState) remoteState;
+                long txid = toLong(s.getTxnId());
+                message.setTransactionId(new LocalTransactionId(session.getConnection().getConnectionId(), txid));
+            }
+
+            message.onSend();
+            if (!delivery.remotelySettled()) {
+                sendToActiveMQ(message, new ResponseHandler() {
+
+                    @Override
+                    public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                        if (response.isException()) {
+                            ExceptionResponse er = (ExceptionResponse) response;
+                            Rejected rejected = new Rejected();
+                            ErrorCondition condition = new ErrorCondition();
+                            condition.setCondition(Symbol.valueOf("failed"));
+                            condition.setDescription(er.getException().getMessage());
+                            rejected.setError(condition);
+                            delivery.disposition(rejected);
+                        } else {
+
+                            if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) {
+                                LOG.trace("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId());
+                                getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
+                            }
+
+                            if (remoteState != null && remoteState instanceof TransactionalState) {
+                                TransactionalState txAccepted = new TransactionalState();
+                                txAccepted.setOutcome(Accepted.getInstance());
+                                txAccepted.setTxnId(((TransactionalState) remoteState).getTxnId());
+
+                                delivery.disposition(txAccepted);
+                            } else {
+                                delivery.disposition(Accepted.getInstance());
+                            }
+
+                            delivery.settle();
+                        }
+
+                        session.pumpProtonToSocket();
+                    }
+                });
+            } else {
+                if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) {
+                    LOG.trace("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId());
+                    getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
+                    session.pumpProtonToSocket();
+                }
+                sendToActiveMQ(message);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpResource.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpResource.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpResource.java
new file mode 100644
index 0000000..6ee68dc
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpResource.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+/**
+ * Root interface for all endpoint objects.
+ */
+public interface AmqpResource {
+
+    /**
+     * Request from the remote peer to open this resource.
+     */
+    void open();
+
+    /**
+     * Request from the remote peer to close this resource.
+     */
+    void close();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
new file mode 100644
index 0000000..75b6758
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerControl;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
+import org.apache.activemq.transport.amqp.ResponseHandler;
+import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
+import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
+import org.apache.activemq.transport.amqp.message.EncodedMessage;
+import org.apache.activemq.transport.amqp.message.OutboundTransformer;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Outcome;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An AmqpSender wraps the AMQP Sender end of a link from the remote peer
+ * which holds the corresponding Receiver which receives messages transfered
+ * across the link from the Broker.
+ *
+ * An AmqpSender is in turn a message consumer subscribed to some destination
+ * on the broker.  As messages are dispatched to this sender that are sent on
+ * to the remote Receiver end of the lin.
+ */
+public class AmqpSender extends AmqpAbstractLink<Sender> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class);
+
+    private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
+
+    private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
+    private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
+    private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
+    private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
+    private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
+
+    private final ConsumerInfo consumerInfo;
+    private final boolean presettle;
+
+    private boolean closed;
+    private boolean endOfBrowse;
+    private int currentCredit;
+    private long lastDeliveredSequenceId;
+
+    private Buffer currentBuffer;
+    private Delivery currentDelivery;
+
+    /**
+     * Creates a new AmqpSender instance that manages the given Sender
+     *
+     * @param session
+     *        the AmqpSession object that is the parent of this instance.
+     * @param endpoint
+     *        the AMQP Sender instance that this class manages.
+     * @param consumerInfo
+     *        the ConsumerInfo instance that holds configuration for this sender.
+     */
+    public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) {
+        super(session, endpoint);
+
+        this.currentCredit = endpoint.getRemoteCredit();
+        this.consumerInfo = consumerInfo;
+        this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
+    }
+
+    @Override
+    public void open() {
+        if (!closed) {
+            session.regosterSender(getConsumerId(), this);
+        }
+
+        super.open();
+    }
+
+    @Override
+    public void detach() {
+        if (!isClosed() && isOpened()) {
+            RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
+            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
+            sendToActiveMQ(removeCommand, null);
+
+            session.unregisterSender(getConsumerId());
+        }
+
+        super.detach();
+    }
+
+    @Override
+    public void close() {
+        if (!isClosed() && isOpened()) {
+            RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
+            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
+            sendToActiveMQ(removeCommand, null);
+
+            if (consumerInfo.isDurable()) {
+                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+                rsi.setConnectionId(session.getConnection().getConnectionId());
+                rsi.setSubscriptionName(getEndpoint().getName());
+                rsi.setClientId(session.getConnection().getClientId());
+
+                sendToActiveMQ(rsi, null);
+
+                session.unregisterSender(getConsumerId());
+            }
+        }
+
+        super.close();
+    }
+
+    @Override
+    public void flow() throws Exception {
+        int updatedCredit = getEndpoint().getCredit();
+
+        if (updatedCredit != currentCredit) {
+            currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
+            ConsumerControl control = new ConsumerControl();
+            control.setConsumerId(getConsumerId());
+            control.setDestination(getDestination());
+            control.setPrefetch(currentCredit);
+            sendToActiveMQ(control, null);
+        }
+
+        drainCheck();
+    }
+
+    @Override
+    public void delivery(Delivery delivery) throws Exception {
+        MessageDispatch md = (MessageDispatch) delivery.getContext();
+        DeliveryState state = delivery.getRemoteState();
+
+        if (state instanceof TransactionalState) {
+            TransactionalState txState = (TransactionalState) state;
+            LOG.trace("onDelivery: TX delivery state = {}", state);
+            if (txState.getOutcome() != null) {
+                Outcome outcome = txState.getOutcome();
+                if (outcome instanceof Accepted) {
+                    if (!delivery.remotelySettled()) {
+                        TransactionalState txAccepted = new TransactionalState();
+                        txAccepted.setOutcome(Accepted.getInstance());
+                        txAccepted.setTxnId(((TransactionalState) state).getTxnId());
+
+                        delivery.disposition(txAccepted);
+                    }
+                    settle(delivery, MessageAck.DELIVERED_ACK_TYPE);
+                }
+            }
+        } else {
+            if (state instanceof Accepted) {
+                LOG.trace("onDelivery: accepted state = {}", state);
+                if (!delivery.remotelySettled()) {
+                    delivery.disposition(new Accepted());
+                }
+                settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
+            } else if (state instanceof Rejected) {
+                // re-deliver /w incremented delivery counter.
+                md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
+                LOG.trace("onDelivery: Rejected state = {}, delivery count now {}", state, md.getRedeliveryCounter());
+                settle(delivery, -1);
+            } else if (state instanceof Released) {
+                LOG.trace("onDelivery: Released state = {}", state);
+                // re-deliver && don't increment the counter.
+                settle(delivery, -1);
+            } else if (state instanceof Modified) {
+                Modified modified = (Modified) state;
+                if (modified.getDeliveryFailed()) {
+                    // increment delivery counter..
+                    md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
+                }
+                LOG.trace("onDelivery: Modified state = {}, delivery count now {}", state, md.getRedeliveryCounter());
+                byte ackType = -1;
+                Boolean undeliverableHere = modified.getUndeliverableHere();
+                if (undeliverableHere != null && undeliverableHere) {
+                    // receiver does not want the message..
+                    // perhaps we should DLQ it?
+                    ackType = MessageAck.POSION_ACK_TYPE;
+                }
+                settle(delivery, ackType);
+            }
+        }
+
+        pumpOutbound();
+    }
+
+    @Override
+    public void commit() throws Exception {
+        if (!dispatchedInTx.isEmpty()) {
+            for (MessageDispatch md : dispatchedInTx) {
+                MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
+                pendingTxAck.setFirstMessageId(md.getMessage().getMessageId());
+                pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
+
+                LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
+
+                sendToActiveMQ(pendingTxAck, new ResponseHandler() {
+                    @Override
+                    public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                        if (response.isException()) {
+                            if (response.isException()) {
+                                Throwable exception = ((ExceptionResponse) response).getException();
+                                exception.printStackTrace();
+                                getEndpoint().close();
+                            }
+                        }
+                        session.pumpProtonToSocket();
+                    }
+                });
+            }
+
+            dispatchedInTx.clear();
+        }
+    }
+
+    @Override
+    public void rollback() throws Exception {
+        synchronized (outbound) {
+
+            LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size());
+
+            for (MessageDispatch dispatch : dispatchedInTx) {
+                dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1);
+                dispatch.getMessage().setTransactionId(null);
+                outbound.addFirst(dispatch);
+            }
+
+            dispatchedInTx.clear();
+        }
+    }
+
+    /**
+     * Event point for incoming message from ActiveMQ on this Sender's
+     * corresponding subscription.
+     *
+     * @param dispatch
+     *        the MessageDispatch to process and send across the link.
+     *
+     * @throws Exception if an error occurs while encoding the message for send.
+     */
+    public void onMessageDispatch(MessageDispatch dispatch) throws Exception {
+        if (!isClosed()) {
+            // Lock to prevent stepping on TX redelivery
+            synchronized (outbound) {
+                outbound.addLast(dispatch);
+            }
+            pumpOutbound();
+            session.pumpProtonToSocket();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "AmqpSender {" + getConsumerId() + "}";
+    }
+
+    //----- Property getters and setters -------------------------------------//
+
+    public ConsumerId getConsumerId() {
+        return consumerInfo.getConsumerId();
+    }
+
+    @Override
+    public ActiveMQDestination getDestination() {
+        return consumerInfo.getDestination();
+    }
+
+    @Override
+    public void setDestination(ActiveMQDestination destination) {
+        consumerInfo.setDestination(destination);
+    }
+
+    //----- Internal Implementation ------------------------------------------//
+
+    public void pumpOutbound() throws Exception {
+        while (!closed) {
+            while (currentBuffer != null) {
+                int sent = getEndpoint().send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
+                if (sent > 0) {
+                    currentBuffer.moveHead(sent);
+                    if (currentBuffer.length == 0) {
+                        if (presettle) {
+                            settle(currentDelivery, MessageAck.INDIVIDUAL_ACK_TYPE);
+                        } else {
+                            getEndpoint().advance();
+                        }
+                        currentBuffer = null;
+                        currentDelivery = null;
+                    }
+                } else {
+                    return;
+                }
+            }
+
+            if (outbound.isEmpty()) {
+                return;
+            }
+
+            final MessageDispatch md = outbound.removeFirst();
+            try {
+
+                ActiveMQMessage temp = null;
+                if (md.getMessage() != null) {
+
+                    // Topics can dispatch the same Message to more than one consumer
+                    // so we must copy to prevent concurrent read / write to the same
+                    // message object.
+                    if (md.getDestination().isTopic()) {
+                        synchronized (md.getMessage()) {
+                            temp = (ActiveMQMessage) md.getMessage().copy();
+                        }
+                    } else {
+                        temp = (ActiveMQMessage) md.getMessage();
+                    }
+
+                    if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
+                        temp.setProperty(MESSAGE_FORMAT_KEY, 0);
+                    }
+                }
+
+                final ActiveMQMessage jms = temp;
+                if (jms == null) {
+                    // It's the end of browse signal.
+                    endOfBrowse = true;
+                    drainCheck();
+                } else {
+                    jms.setRedeliveryCounter(md.getRedeliveryCounter());
+                    jms.setReadOnlyBody(true);
+                    final EncodedMessage amqp = outboundTransformer.transform(jms);
+                    if (amqp != null && amqp.getLength() > 0) {
+                        currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
+                        if (presettle) {
+                            currentDelivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
+                        } else {
+                            final byte[] tag = tagCache.getNextTag();
+                            currentDelivery = getEndpoint().delivery(tag, 0, tag.length);
+                        }
+                        currentDelivery.setContext(md);
+                    } else {
+                        // TODO: message could not be generated what now?
+                    }
+                }
+            } catch (Exception e) {
+                LOG.warn("Error detected while flushing outbound messages: {}", e.getMessage());
+            }
+        }
+    }
+
+    private void settle(final Delivery delivery, final int ackType) throws Exception {
+        byte[] tag = delivery.getTag();
+        if (tag != null && tag.length > 0 && delivery.remotelySettled()) {
+            tagCache.returnTag(tag);
+        }
+
+        if (ackType == -1) {
+            // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
+            delivery.settle();
+            onMessageDispatch((MessageDispatch) delivery.getContext());
+        } else {
+            MessageDispatch md = (MessageDispatch) delivery.getContext();
+            lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
+            MessageAck ack = new MessageAck();
+            ack.setConsumerId(getConsumerId());
+            ack.setFirstMessageId(md.getMessage().getMessageId());
+            ack.setLastMessageId(md.getMessage().getMessageId());
+            ack.setMessageCount(1);
+            ack.setAckType((byte) ackType);
+            ack.setDestination(md.getDestination());
+
+            DeliveryState remoteState = delivery.getRemoteState();
+            if (remoteState != null && remoteState instanceof TransactionalState) {
+                TransactionalState s = (TransactionalState) remoteState;
+                long txid = toLong(s.getTxnId());
+                LocalTransactionId localTxId = new LocalTransactionId(session.getConnection().getConnectionId(), txid);
+                ack.setTransactionId(localTxId);
+
+                // Store the message sent in this TX we might need to
+                // re-send on rollback
+                md.getMessage().setTransactionId(localTxId);
+                dispatchedInTx.addFirst(md);
+            }
+
+            LOG.trace("Sending Ack to ActiveMQ: {}", ack);
+
+            sendToActiveMQ(ack, new ResponseHandler() {
+                @Override
+                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                    if (response.isException()) {
+                        if (response.isException()) {
+                            Throwable exception = ((ExceptionResponse) response).getException();
+                            exception.printStackTrace();
+                            getEndpoint().close();
+                        }
+                    } else {
+                        delivery.settle();
+                    }
+                    session.pumpProtonToSocket();
+                }
+            });
+        }
+    }
+
+    private void drainCheck() {
+        // If we are a browser.. lets not say we are drained until
+        // we hit the end of browse message.
+        if (consumerInfo.isBrowser() && !endOfBrowse) {
+            return;
+        }
+
+        if (outbound.isEmpty()) {
+            getEndpoint().drained();
+        }
+    }
+}


Mime
View raw message