activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/4] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5591
Date Tue, 24 Mar 2015 22:09:42 GMT
Repository: activemq
Updated Branches:
  refs/heads/master e33b3f593 -> 3306467a6


http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
new file mode 100644
index 0000000..068a170
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.createDestination;
+import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.InvalidSelectorException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
+import org.apache.activemq.transport.amqp.AmqpProtocolException;
+import org.apache.activemq.transport.amqp.ResponseHandler;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wraps the AMQP Session and provides the services needed to manage the remote
+ * peer requests for link establishment.
+ */
+public class AmqpSession implements AmqpResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class);
+
+    private final Map<ConsumerId, AmqpSender> consumers = new HashMap<ConsumerId,
AmqpSender>();
+
+    private final AmqpConnection connection;
+    private final Session protonSession;
+    private final SessionId sessionId;
+
+    private long nextProducerId = 0;
+    private long nextConsumerId = 0;
+
+    /**
+     * Create new AmqpSession instance whose parent is the given AmqpConnection.
+     *
+     * @param connection
+     *        the parent connection for this session.
+     * @param sessionId
+     *        the ActiveMQ SessionId that is used to identify this session.
+     * @param session
+     *        the AMQP Session that this class manages.
+     */
+    public AmqpSession(AmqpConnection connection, SessionId sessionId, Session session) {
+        this.connection = connection;
+        this.sessionId = sessionId;
+        this.protonSession = session;
+    }
+
+    @Override
+    public void open() {
+        LOG.trace("Session {} opened", getSessionId());
+
+        getEndpoint().setContext(this);
+        getEndpoint().setIncomingCapacity(Integer.MAX_VALUE);
+        getEndpoint().open();
+
+        connection.sendToActiveMQ(new SessionInfo(getSessionId()));
+    }
+
+    @Override
+    public void close() {
+        LOG.trace("Session {} closed", getSessionId());
+
+        getEndpoint().setContext(null);
+        getEndpoint().close();
+        getEndpoint().free();
+
+        connection.sendToActiveMQ(new RemoveInfo(getSessionId()));
+    }
+
+    /**
+     * Commits all pending work for all resources managed under this session.
+     *
+     * @throws Exception if an error occurs while attempting to commit work.
+     */
+    public void commit() throws Exception {
+        for (AmqpSender consumer : consumers.values()) {
+            consumer.commit();
+        }
+    }
+
+    /**
+     * Rolls back any pending work being down under this session.
+     *
+     * @throws Exception if an error occurs while attempting to roll back work.
+     */
+    public void rollback() throws Exception {
+        for (AmqpSender consumer : consumers.values()) {
+            consumer.rollback();
+        }
+    }
+
+    /**
+     * Used to direct all Session managed Senders to push any queued Messages
+     * out to the remote peer.
+     *
+     * @throws Exception if an error occurs while flushing the messages.
+     */
+    public void flushPendingMessages() throws Exception {
+        for (AmqpSender consumer : consumers.values()) {
+            consumer.pumpOutbound();
+        }
+    }
+
+    public void createCoordinator(final Receiver protonReceiver) throws Exception {
+        AmqpTransactionCoordinator txCoordinator = new AmqpTransactionCoordinator(this, protonReceiver);
+        txCoordinator.flow(connection.getConfiguredReceiverCredit());
+        txCoordinator.open();
+    }
+
+    public void createReceiver(final Receiver protonReceiver) throws Exception {
+        org.apache.qpid.proton.amqp.transport.Target remoteTarget = protonReceiver.getRemoteTarget();
+
+        ProducerInfo producerInfo = new ProducerInfo(getNextProducerId());
+        final AmqpReceiver receiver = new AmqpReceiver(this, protonReceiver, producerInfo);
+
+        try {
+            Target target = (Target) remoteTarget;
+            ActiveMQDestination destination = null;
+            String targetNodeName = target.getAddress();
+
+            if (target.getDynamic()) {
+                destination = connection.createTemporaryDestination(protonReceiver, target.getCapabilities());
+                Target actualTarget = new Target();
+                actualTarget.setAddress(destination.getQualifiedName());
+                actualTarget.setDynamic(true);
+                protonReceiver.setTarget(actualTarget);
+                receiver.addCloseAction(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        connection.deleteTemporaryDestination((ActiveMQTempDestination) receiver.getDestination());
+                    }
+                });
+            } else if (targetNodeName != null && !targetNodeName.isEmpty()) {
+                destination = createDestination(remoteTarget);
+            }
+
+            receiver.setDestination(destination);
+            connection.sendToActiveMQ(producerInfo, new ResponseHandler() {
+                @Override
+                public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
+                    if (response.isException()) {
+                        ErrorCondition error = null;
+                        Throwable exception = ((ExceptionResponse) response).getException();
+                        if (exception instanceof SecurityException) {
+                            error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage());
+                        } else {
+                            error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage());
+                        }
+
+                        receiver.close(error);
+                    } else {
+                        receiver.flow(connection.getConfiguredReceiverCredit());
+                        receiver.open();
+                    }
+                    pumpProtonToSocket();
+                }
+            });
+
+        } catch (AmqpProtocolException exception) {
+            receiver.close(new ErrorCondition(Symbol.getSymbol(exception.getSymbolicName()),
exception.getMessage()));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void createSender(final Sender protonSender) throws Exception {
+        org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source)
protonSender.getRemoteSource();
+
+        ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
+        final AmqpSender sender = new AmqpSender(this, protonSender, consumerInfo);
+
+        try {
+            final Map<Symbol, Object> supportedFilters = new HashMap<Symbol, Object>();
+            protonSender.setContext(sender);
+
+            boolean noLocal = false;
+            String selector = null;
+
+            if (source != null) {
+                Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(),
JMS_SELECTOR_FILTER_IDS);
+                if (filter != null) {
+                    selector = filter.getValue().getDescribed().toString();
+                    // Validate the Selector.
+                    try {
+                        SelectorParser.parse(selector);
+                    } catch (InvalidSelectorException e) {
+                        sender.close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
+                        return;
+                    }
+
+                    supportedFilters.put(filter.getKey(), filter.getValue());
+                }
+
+                filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
+                if (filter != null) {
+                    noLocal = true;
+                    supportedFilters.put(filter.getKey(), filter.getValue());
+                }
+            }
+
+            ActiveMQDestination destination;
+            if (source == null) {
+                // Attempt to recover previous subscription
+                destination = connection.lookupSubscription(protonSender.getName());
+
+                if (destination != null) {
+                    source = new org.apache.qpid.proton.amqp.messaging.Source();
+                    source.setAddress(destination.getQualifiedName());
+                    source.setDurable(TerminusDurability.UNSETTLED_STATE);
+                    source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+                    source.setDistributionMode(COPY);
+                } else {
+                    sender.close(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription
link: " + protonSender.getName()));
+                    return;
+                }
+            } else if (source.getDynamic()) {
+                // lets create a temp dest.
+                destination = connection.createTemporaryDestination(protonSender, source.getCapabilities());
+                source = new org.apache.qpid.proton.amqp.messaging.Source();
+                source.setAddress(destination.getQualifiedName());
+                source.setDynamic(true);
+                sender.addCloseAction(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        connection.deleteTemporaryDestination((ActiveMQTempDestination) sender.getDestination());
+                    }
+                });
+            } else {
+                destination = createDestination(source);
+            }
+
+            source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
+            protonSender.setSource(source);
+
+            int senderCredit = protonSender.getRemoteCredit();
+
+            consumerInfo.setSelector(selector);
+            consumerInfo.setNoRangeAcks(true);
+            consumerInfo.setDestination(destination);
+            consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0);
+            consumerInfo.setDispatchAsync(true);
+            consumerInfo.setNoLocal(noLocal);
+
+            if (source.getDistributionMode() == COPY && destination.isQueue()) {
+                consumerInfo.setBrowser(true);
+            }
+
+            if ((TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
+                 TerminusDurability.CONFIGURATION.equals(source.getDurable())) &&
destination.isTopic()) {
+                consumerInfo.setSubscriptionName(protonSender.getName());
+            }
+
+            connection.sendToActiveMQ(consumerInfo, new ResponseHandler() {
+                @Override
+                public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
+                    if (response.isException()) {
+                        ErrorCondition error = null;
+                        Throwable exception = ((ExceptionResponse) response).getException();
+                        if (exception instanceof SecurityException) {
+                            error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage());
+                        } else if (exception instanceof InvalidSelectorException) {
+                            error = new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage());
+                        } else {
+                            error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage());
+                        }
+
+                        sender.close(error);
+                    } else {
+                        sender.open();
+                    }
+                    pumpProtonToSocket();
+                }
+            });
+
+        } catch (AmqpProtocolException e) {
+            sender.close(new ErrorCondition(Symbol.getSymbol(e.getSymbolicName()), e.getMessage()));
+        }
+    }
+
+    /**
+     * Send all pending work out to the remote peer.
+     */
+    public void pumpProtonToSocket() {
+        connection.pumpProtonToSocket();
+    }
+
+    public void regosterSender(ConsumerId consumerId, AmqpSender sender) {
+        consumers.put(consumerId, sender);
+        connection.regosterSender(consumerId, sender);
+    }
+
+    public void unregisterSender(ConsumerId consumerId) {
+        consumers.remove(consumerId);
+        connection.unregosterSender(consumerId);
+    }
+
+    //----- Configuration accessors ------------------------------------------//
+
+    public AmqpConnection getConnection() {
+        return connection;
+    }
+
+    public SessionId getSessionId() {
+        return sessionId;
+    }
+
+    public Session getEndpoint() {
+        return protonSession;
+    }
+
+    //----- Internal Implementation ------------------------------------------//
+
+    protected ConsumerId getNextConsumerId() {
+        return new ConsumerId(sessionId, nextConsumerId++);
+    }
+
+    protected ProducerId getNextProducerId() {
+        return new ProducerId(sessionId, nextProducerId++);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
new file mode 100644
index 0000000..14fa3ad
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.toBytes;
+import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
+
+import java.io.IOException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
+import org.apache.activemq.transport.amqp.ResponseHandler;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transaction.Declare;
+import org.apache.qpid.proton.amqp.transaction.Declared;
+import org.apache.qpid.proton.amqp.transaction.Discharge;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.Message;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements the AMQP Transaction Coordinator support to manage local
+ * transactions between an AMQP client and the broker.
+ */
+public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionCoordinator.class);
+
+    private long nextTransactionId;
+
+    /**
+     * Creates a new Transaction coordinator used to manage AMQP transactions.
+     *
+     * @param session
+     *        the AmqpSession under which the coordinator was created.
+     * @param receiver
+     *        the AMQP receiver link endpoint for this coordinator.
+     */
+    public AmqpTransactionCoordinator(AmqpSession session, Receiver endpoint) {
+        super(session, endpoint);
+    }
+
+    @Override
+    protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws
Exception {
+        Message message = Proton.message();
+        int offset = deliveryBytes.offset;
+        int len = deliveryBytes.length;
+
+        while (len > 0) {
+            final int decoded = message.decode(deliveryBytes.data, offset, len);
+            assert decoded > 0 : "Make progress decoding the message";
+            offset += decoded;
+            len -= decoded;
+        }
+
+        final AmqpSession session = (AmqpSession) getEndpoint().getSession().getContext();
+        ConnectionId connectionId = session.getConnection().getConnectionId();
+        final Object action = ((AmqpValue) message.getBody()).getValue();
+
+        LOG.debug("COORDINATOR received: {}, [{}]", action, deliveryBytes);
+        if (action instanceof Declare) {
+            Declare declare = (Declare) action;
+            if (declare.getGlobalId() != null) {
+                throw new Exception("don't know how to handle a declare /w a set GlobalId");
+            }
+
+            long txid = getNextTransactionId();
+            TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId,
txid), TransactionInfo.BEGIN);
+            sendToActiveMQ(txinfo, null);
+            LOG.trace("started transaction {}", txid);
+
+            Declared declared = new Declared();
+            declared.setTxnId(new Binary(toBytes(txid)));
+            delivery.disposition(declared);
+            delivery.settle();
+        } else if (action instanceof Discharge) {
+            Discharge discharge = (Discharge) action;
+            long txid = toLong(discharge.getTxnId());
+
+            final byte operation;
+            if (discharge.getFail()) {
+                LOG.trace("rollback transaction {}", txid);
+                operation = TransactionInfo.ROLLBACK;
+            } else {
+                LOG.trace("commit transaction {}", txid);
+                operation = TransactionInfo.COMMIT_ONE_PHASE;
+            }
+
+            if (operation == TransactionInfo.ROLLBACK) {
+                session.rollback();
+            } else {
+                session.commit();
+            }
+
+            TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId,
txid), operation);
+            sendToActiveMQ(txinfo, new ResponseHandler() {
+                @Override
+                public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
+                    if (response.isException()) {
+                        ExceptionResponse er = (ExceptionResponse) response;
+                        Rejected rejected = new Rejected();
+                        rejected.setError(new ErrorCondition(Symbol.valueOf("failed"), er.getException().getMessage()));
+                        delivery.disposition(rejected);
+                    } else {
+                        delivery.disposition(Accepted.getInstance());
+                    }
+                    LOG.debug("TX: {} settling {}", operation, action);
+                    delivery.settle();
+                    session.pumpProtonToSocket();
+                }
+            });
+
+            if (operation == TransactionInfo.ROLLBACK) {
+                session.flushPendingMessages();
+            }
+
+        } else {
+            throw new Exception("Expected coordinator message type: " + action.getClass());
+        }
+    }
+
+    private long getNextTransactionId() {
+        return ++nextTransactionId;
+    }
+
+    @Override
+    public ActiveMQDestination getDestination() {
+        return null;
+    }
+
+    @Override
+    public void setDestination(ActiveMQDestination destination) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransferTagGenerator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransferTagGenerator.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransferTagGenerator.java
new file mode 100644
index 0000000..a1e3d84
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransferTagGenerator.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.protocol;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * Utility class that can generate and if enabled pool the binary tag values
+ * used to identify transfers over an AMQP link.
+ */
+public final class AmqpTransferTagGenerator {
+
+    public static final int DEFAULT_TAG_POOL_SIZE = 1024;
+
+    private long nextTagId;
+    private int maxPoolSize = DEFAULT_TAG_POOL_SIZE;
+
+    private final Set<byte[]> tagPool;
+
+    public AmqpTransferTagGenerator() {
+        this(false);
+    }
+
+    public AmqpTransferTagGenerator(boolean pool) {
+        if (pool) {
+            this.tagPool = new LinkedHashSet<byte[]>();
+        } else {
+            this.tagPool = null;
+        }
+    }
+
+    /**
+     * Retrieves the next available tag.
+     *
+     * @return a new or unused tag depending on the pool option.
+     */
+    public byte[] getNextTag() {
+        byte[] rc;
+        if (tagPool != null && !tagPool.isEmpty()) {
+            final Iterator<byte[]> iterator = tagPool.iterator();
+            rc = iterator.next();
+            iterator.remove();
+        } else {
+            try {
+                rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
+            } catch (UnsupportedEncodingException e) {
+                // This should never happen since we control the input.
+                throw new RuntimeException(e);
+            }
+        }
+        return rc;
+    }
+
+    /**
+     * When used as a pooled cache of tags the unused tags should always be returned once
+     * the transfer has been settled.
+     *
+     * @param data
+     *        a previously borrowed tag that is no longer in use.
+     */
+    public void returnTag(byte[] data) {
+        if (tagPool != null && tagPool.size() < maxPoolSize) {
+            tagPool.add(data);
+        }
+    }
+
+    /**
+     * Gets the current max pool size value.
+     *
+     * @return the current max tag pool size.
+     */
+    public int getMaxPoolSize() {
+        return maxPoolSize;
+    }
+
+    /**
+     * Sets the max tag pool size.  If the size is smaller than the current number
+     * of pooled tags the pool will drain over time until it matches the max.
+     *
+     * @param maxPoolSize
+     *        the maximum number of tags to hold in the pool.
+     */
+    public void setMaxPoolSize(int maxPoolSize) {
+        this.maxPoolSize = maxPoolSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl
b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl
index 9c47006..30c79bf 100644
--- a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl
+++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl
@@ -5,13 +5,13 @@
 ## The ASF licenses this file to You under the Apache License, Version 2.0
 ## (the "License"); you may not use this file except in compliance with
 ## the License.  You may obtain a copy of the License at
-## 
+##
 ## http://www.apache.org/licenses/LICENSE-2.0
-## 
+##
 ## Unless required by applicable law or agreed to in writing, software
 ## distributed under the License is distributed on an "AS IS" BASIS,
 ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.transport.amqp.AMQPSslTransportFactory
+class=org.apache.activemq.transport.amqp.AmqpSslTransportFactory

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index cf4fa95..4f1c861 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -46,6 +46,7 @@ import org.apache.activemq.broker.jmx.TopicViewMBean;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.spring.SpringSslContext;
 import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.transport.amqp.protocol.AmqpConnection;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -111,7 +112,7 @@ public class AmqpTestSupport {
         SSLContext.setDefault(ctx);
 
         // Setup SSL context...
-        final File classesDir = new File(AmqpProtocolConverter.class.getProtectionDomain().getCodeSource().getLocation().getFile());
+        final File classesDir = new File(AmqpConnection.class.getProtectionDomain().getCodeSource().getLocation().getFile());
         File keystore = new File(classesDir, "../../src/test/resources/keystore");
         final SpringSslContext sslContext = new SpringSslContext();
         sslContext.setKeyStore(keystore.getCanonicalPath());

http://git-wip-us.apache.org/repos/asf/activemq/blob/3306467a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
index dfbbc0b..2df5141 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.transport.amqp.interop;
 
-import static org.apache.activemq.transport.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.LIFETIME_POLICY;
 import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY;
 import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
 import static org.junit.Assert.assertEquals;
@@ -194,7 +194,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
 
         // Set the dynamic node lifetime-policy
         Map<Symbol, Object> dynamicNodeProperties = new HashMap<Symbol, Object>();
-        dynamicNodeProperties.put(DYNAMIC_NODE_LIFETIME_POLICY, DeleteOnClose.getInstance());
+        dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
         source.setDynamicNodeProperties(dynamicNodeProperties);
 
         // Set the capability to indicate the node type being created
@@ -216,7 +216,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
 
         // Set the dynamic node lifetime-policy
         Map<Symbol, Object> dynamicNodeProperties = new HashMap<Symbol, Object>();
-        dynamicNodeProperties.put(DYNAMIC_NODE_LIFETIME_POLICY, DeleteOnClose.getInstance());
+        dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
         target.setDynamicNodeProperties(dynamicNodeProperties);
 
         // Set the capability to indicate the node type being created


Mime
View raw message