Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4EC4611D9D for ; Tue, 23 Sep 2014 18:20:26 +0000 (UTC) Received: (qmail 75512 invoked by uid 500); 23 Sep 2014 18:20:26 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 75460 invoked by uid 500); 23 Sep 2014 18:20:26 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 74872 invoked by uid 99); 23 Sep 2014 18:20:25 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Sep 2014 18:20:25 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 982C4937BAB; Tue, 23 Sep 2014 18:20:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@qpid.apache.org Date: Tue, 23 Sep 2014 18:20:40 -0000 Message-Id: In-Reply-To: <7d0bb9de60ea4f74b14c83776f4b02de@git.apache.org> References: <7d0bb9de60ea4f74b14c83776f4b02de@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [16/27] Initial drop of donated AMQP Client Code. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java new file mode 100644 index 0000000..234caaa --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp.message; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.message.Message; + +/** + * Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage + * type. + */ +public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate { + + private final Message message; + + /** + * Create a new delegate that uses Java serialization to store the message content. + * + * @param message + * the AMQP message instance where the object is to be stored / read. + */ + public AmqpTypedObjectDelegate(Message message) { + this.message = message; + } + + @Override + public Serializable getObject() throws IOException, ClassNotFoundException { + // TODO: this should actually return a snapshot of the object, so we + // need to save the bytes so we can return an equal/unmodified object later + + Section body = message.getBody(); + if (body == null) { + return null; + } else if (body instanceof AmqpValue) { + // TODO: This is assuming the object can be immediately returned, and is + // Serializable. We will actually have to ensure elements are + // Serializable and e.g convert the Uint/Ubyte etc wrappers. + return (Serializable) ((AmqpValue) body).getValue(); + } else if (body instanceof Data) { + // TODO: return as byte[]? ByteBuffer? + throw new UnsupportedOperationException("Data support still to be added"); + } else if (body instanceof AmqpSequence) { + // TODO: return as list? + throw new UnsupportedOperationException("AmqpSequence support still to be added"); + } else { + throw new IllegalStateException("Unexpected body type: " + body.getClass().getSimpleName()); + } + } + + @Override + public void setObject(Serializable value) throws IOException { + if (value == null) { + // TODO: verify whether not sending a body is OK, send some form of + // null (AmqpValue containing null) instead if it isn't? + message.setBody(null); + } else if (isSupportedAmqpValueObjectType(value)) { + // TODO: This is a temporary hack, we actually need to take a snapshot of the object + // at this point in time, not simply set the object itself into the Proton message. + // We will need to encode it now, first to save the snapshot to send, and also to + // verify up front that we can actually send it later. + + // Even if we do that we would currently then need to decode it later to set the + // body to send, unless we augment Proton to allow setting the bytes directly. + // We will always need to decode bytes to return a snapshot from getObject(). We + // will need to save the bytes somehow to support that on received messages. + message.setBody(new AmqpValue(value)); + } else { + // TODO: Data and AmqpSequence? + throw new IllegalArgumentException("Encoding this object type with the AMQP type system is not supported: " + value.getClass().getName()); + } + + // TODO: ensure content type is not set (assuming we aren't using data sections)? + } + + private boolean isSupportedAmqpValueObjectType(Serializable serializable) { + // TODO: augment supported types to encode as an AmqpValue? + return serializable instanceof Map || + serializable instanceof List || + serializable.getClass().isArray(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java new file mode 100644 index 0000000..2f4db2c --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java @@ -0,0 +1,857 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.failover; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.JmsSslContext; +import org.apache.qpid.jms.message.JmsDefaultMessageFactory; +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.apache.qpid.jms.message.JmsMessageFactory; +import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; +import org.apache.qpid.jms.meta.JmsConnectionInfo; +import org.apache.qpid.jms.meta.JmsConsumerId; +import org.apache.qpid.jms.meta.JmsResource; +import org.apache.qpid.jms.meta.JmsSessionId; +import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.jms.provider.DefaultProviderListener; +import org.apache.qpid.jms.provider.Provider; +import org.apache.qpid.jms.provider.ProviderFactory; +import org.apache.qpid.jms.provider.ProviderFuture; +import org.apache.qpid.jms.provider.ProviderListener; +import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; +import org.apache.qpid.jms.util.IOExceptionSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Provider Facade that provides services for detection dropped Provider connections + * and attempting to reconnect to a different remote peer. Upon establishment of a new + * connection the FailoverProvider will initiate state recovery of the active JMS + * framework resources. + */ +public class FailoverProvider extends DefaultProviderListener implements Provider { + + private static final Logger LOG = LoggerFactory.getLogger(FailoverProvider.class); + + private static final int UNLIMITED = -1; + + private ProviderListener listener; + private Provider provider; + private final FailoverUriPool uris; + + private final ExecutorService serializer; + private final ScheduledExecutorService connectionHub; + private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicBoolean failed = new AtomicBoolean(); + private final AtomicLong requestId = new AtomicLong(); + private final Map requests = new LinkedHashMap(); + private final DefaultProviderListener closedListener = new DefaultProviderListener(); + private final JmsSslContext sslContext; + private final JmsMessageFactory defaultMessageFactory = new JmsDefaultMessageFactory(); + + // Current state of connection / reconnection + private boolean firstConnection = true; + private long reconnectAttempts; + private long reconnectDelay = TimeUnit.SECONDS.toMillis(5); + private IOException failureCause; + private URI connectedURI; + + // Timeout values configured via JmsConnectionInfo + private long connectTimeout = JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT; + private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT; + private long sendTimeout = JmsConnectionInfo.DEFAULT_SEND_TIMEOUT; + private long requestTimeout = JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT; + + // Configuration values. + private long initialReconnectDelay = 0L; + private long maxReconnectDelay = TimeUnit.SECONDS.toMillis(30); + private boolean useExponentialBackOff = true; + private double backOffMultiplier = 2d; + private int maxReconnectAttempts = UNLIMITED; + private int startupMaxReconnectAttempts = UNLIMITED; + private int warnAfterReconnectAttempts = 10; + + public FailoverProvider(Map nestedOptions) { + this(null, nestedOptions); + } + + public FailoverProvider(URI[] uris) { + this(uris, null); + } + + public FailoverProvider(URI[] uris, Map nestedOptions) { + this.uris = new FailoverUriPool(uris, nestedOptions); + this.sslContext = JmsSslContext.getCurrentSslContext(); + + this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + + @Override + public Thread newThread(Runnable runner) { + Thread serial = new Thread(runner); + serial.setDaemon(true); + serial.setName("FailoverProvider: serialization thread"); + return serial; + } + }); + + // All Connection attempts happen in this schedulers thread. Once a connection + // is established it will hand the open connection back to the serializer thread + // for state recovery. + this.connectionHub = Executors.newScheduledThreadPool(1, new ThreadFactory() { + + @Override + public Thread newThread(Runnable runner) { + Thread serial = new Thread(runner); + serial.setDaemon(true); + serial.setName("FailoverProvider: connect thread"); + return serial; + } + }); + } + + @Override + public void connect() throws IOException { + checkClosed(); + LOG.debug("Performing initial connection attempt"); + triggerReconnectionAttempt(); + } + + @Override + public void start() throws IOException, IllegalStateException { + checkClosed(); + + if (listener == null) { + throw new IllegalStateException("No ProviderListener registered."); + } + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + final ProviderFuture request = new ProviderFuture(); + serializer.execute(new Runnable() { + + @Override + public void run() { + try { + IOException error = failureCause != null ? failureCause : new IOException("Connection closed"); + List pending = new ArrayList(requests.values()); + for (FailoverRequest request : pending) { + request.onFailure(error); + } + + if (provider != null) { + provider.close(); + } + } catch (Exception e) { + LOG.debug("Caught exception while closing connection"); + } finally { + + if (connectionHub != null) { + connectionHub.shutdown(); + } + + if (serializer != null) { + serializer.shutdown(); + } + + request.onSuccess(); + } + } + }); + + try { + if (this.closeTimeout >= 0) { + request.sync(); + } else { + request.sync(closeTimeout, TimeUnit.MILLISECONDS); + } + } catch (IOException e) { + LOG.warn("Error caught while closing Provider: ", e.getMessage()); + } + } + } + + @Override + public void create(final JmsResource resource, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException { + checkClosed(); + final FailoverRequest pending = new FailoverRequest(request) { + @Override + public void doTask() throws Exception { + if (resource instanceof JmsConnectionInfo) { + JmsConnectionInfo connectionInfo = (JmsConnectionInfo) resource; + connectTimeout = connectionInfo.getConnectTimeout(); + closeTimeout = connectionInfo.getCloseTimeout(); + sendTimeout = connectionInfo.getSendTimeout(); + requestTimeout = connectionInfo.getRequestTimeout(); + } + + provider.create(resource, this); + } + }; + + serializer.execute(pending); + } + + @Override + public void start(final JmsResource resource, final AsyncResult request) throws IOException, JMSException { + checkClosed(); + final FailoverRequest pending = new FailoverRequest(request) { + @Override + public void doTask() throws Exception { + provider.start(resource, this); + } + }; + + serializer.execute(pending); + } + + @Override + public void destroy(final JmsResource resourceId, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException { + checkClosed(); + final FailoverRequest pending = new FailoverRequest(request) { + @Override + public void doTask() throws IOException, JMSException, UnsupportedOperationException { + provider.destroy(resourceId, this); + } + + @Override + public boolean succeedsWhenOffline() { + // Allow this to succeed, acks would be stale. + return true; + } + }; + + serializer.execute(pending); + } + + @Override + public void send(final JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException { + checkClosed(); + final FailoverRequest pending = new FailoverRequest(request) { + @Override + public void doTask() throws Exception { + provider.send(envelope, this); + } + }; + + serializer.execute(pending); + } + + @Override + public void acknowledge(final JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException { + checkClosed(); + final FailoverRequest pending = new FailoverRequest(request) { + @Override + public void doTask() throws Exception { + provider.acknowledge(sessionId, this); + } + + @Override + public boolean succeedsWhenOffline() { + // Allow this to succeed, acks would be stale. + return true; + } + }; + + serializer.execute(pending); + } + + @Override + public void acknowledge(final JmsInboundMessageDispatch envelope, final ACK_TYPE ackType, AsyncResult request) throws IOException, JMSException { + checkClosed(); + final FailoverRequest pending = new FailoverRequest(request) { + @Override + public void doTask() throws Exception { + provider.acknowledge(envelope, ackType, this); + } + + @Override + public boolean succeedsWhenOffline() { + // Allow this to succeed, acks would be stale. + return true; + } + }; + + serializer.execute(pending); + } + + @Override + public void commit(final JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException { + checkClosed(); + final FailoverRequest pending = new FailoverRequest(request) { + @Override + public void doTask() throws Exception { + provider.commit(sessionId, this); + } + + @Override + public boolean failureWhenOffline() { + return true; + } + }; + + serializer.execute(pending); + } + + @Override + public void rollback(final JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException { + checkClosed(); + final FailoverRequest pending = new FailoverRequest(request) { + @Override + public void doTask() throws Exception { + provider.rollback(sessionId, this); + } + + @Override + public boolean failureWhenOffline() { + return true; + } + }; + + serializer.execute(pending); + } + + + @Override + public void recover(final JmsSessionId sessionId, final AsyncResult request) throws IOException, UnsupportedOperationException { + checkClosed(); + final FailoverRequest pending = new FailoverRequest(request) { + @Override + public void doTask() throws Exception { + provider.recover(sessionId, this); + } + + @Override + public boolean succeedsWhenOffline() { + return true; + } + }; + + serializer.execute(pending); + } + + @Override + public void unsubscribe(final String subscription, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException { + checkClosed(); + final FailoverRequest pending = new FailoverRequest(request) { + @Override + public void doTask() throws Exception { + provider.unsubscribe(subscription, this); + } + }; + + serializer.execute(pending); + } + + @Override + public void pull(final JmsConsumerId consumerId, final long timeout, final AsyncResult request) throws IOException, UnsupportedOperationException { + checkClosed(); + final FailoverRequest pending = new FailoverRequest(request) { + @Override + public void doTask() throws Exception { + provider.pull(consumerId, timeout, this); + } + }; + + serializer.execute(pending); + } + + @Override + public JmsMessageFactory getMessageFactory() { + final AtomicReference result = + new AtomicReference(defaultMessageFactory); + + serializer.execute(new Runnable() { + + @Override + public void run() { + if (provider != null) { + result.set(provider.getMessageFactory()); + } + } + }); + + return result.get(); + } + + //--------------- Connection Error and Recovery methods ------------------// + + /** + * This method is always called from within the FailoverProvider's serialization thread. + * + * When a failure is encountered either from an outgoing request or from an error fired + * from the underlying Provider instance this method is called to determine if a reconnect + * is allowed and if so a new reconnect cycle is triggered on the connection thread. + * + * @param cause + */ + private void handleProviderFailure(final IOException cause) { + LOG.debug("handling Provider failure: {}", cause.getMessage()); + LOG.trace("stack", cause); + + this.provider.setProviderListener(closedListener); + URI failedURI = this.provider.getRemoteURI(); + try { + this.provider.close(); + } catch (Throwable error) { + LOG.trace("Caught exception while closing failed provider: {}", error.getMessage()); + } + this.provider = null; + + if (reconnectAllowed()) { + ProviderListener listener = this.listener; + if (listener != null) { + listener.onConnectionInterrupted(failedURI); + } + triggerReconnectionAttempt(); + } else { + ProviderListener listener = this.listener; + if (listener != null) { + listener.onConnectionFailure(cause); + } + } + } + + /** + * Called from the reconnection thread. This method enqueues a new task that + * will attempt to recover connection state, once successful normal operations + * will resume. If an error occurs while attempting to recover the JMS framework + * state then a reconnect cycle is again triggered on the connection thread. + * + * @param provider + * The newly connect Provider instance that will become active. + */ + private void initializeNewConnection(final Provider provider) { + this.serializer.execute(new Runnable() { + @Override + public void run() { + try { + if (firstConnection) { + firstConnection = false; + FailoverProvider.this.provider = provider; + provider.setProviderListener(FailoverProvider.this); + + List pending = new ArrayList(requests.values()); + for (FailoverRequest request : pending) { + request.run(); + } + } else { + LOG.debug("Signalling connection recovery: {}", provider); + FailoverProvider.this.provider = provider; + provider.setProviderListener(FailoverProvider.this); + + // Stage 1: Recovery all JMS Framework resources + listener.onConnectionRecovery(provider); + + // Stage 2: Restart consumers, send pull commands, etc. + listener.onConnectionRecovered(provider); + + // Stage 3: Let the client know that connection has restored. + listener.onConnectionRestored(provider.getRemoteURI()); + + // Stage 4: Send pending actions. + List pending = new ArrayList(requests.values()); + for (FailoverRequest request : pending) { + request.run(); + } + + reconnectDelay = initialReconnectDelay; + reconnectAttempts = 0; + connectedURI = provider.getRemoteURI(); + uris.connected(); + } + } catch (Throwable error) { + handleProviderFailure(IOExceptionSupport.create(error)); + } + } + }); + } + + /** + * Called when the Provider was either first created or when a connection failure has + * been connected. A reconnection attempt is immediately executed on the connection + * thread. If a new Provider is able to be created and connected then a recovery task + * is scheduled on the main serializer thread. If the connect attempt fails another + * attempt is scheduled based on the configured delay settings until a max attempts + * limit is hit, if one is set. + */ + private void triggerReconnectionAttempt() { + if (closed.get() || failed.get()) { + return; + } + + connectionHub.execute(new Runnable() { + @Override + public void run() { + if (provider != null || closed.get() || failed.get()) { + return; + } + + reconnectAttempts++; + Throwable failure = null; + URI target = uris.getNext(); + if (target != null) { + try { + LOG.debug("Attempting connection to: {}", target); + JmsSslContext.setCurrentSslContext(sslContext); + Provider provider = ProviderFactory.createAsync(target); + initializeNewConnection(provider); + return; + } catch (Throwable e) { + LOG.info("Connection attempt to: {} failed.", target); + failure = e; + } + } + + int reconnectLimit = reconnectAttemptLimit(); + + if (reconnectLimit != UNLIMITED && reconnectAttempts >= reconnectLimit) { + LOG.error("Failed to connect after: " + reconnectAttempts + " attempt(s)"); + failed.set(true); + failureCause = IOExceptionSupport.create(failure); + if (listener != null) { + listener.onConnectionFailure(failureCause); + }; + + return; + } + + int warnInterval = getWarnAfterReconnectAttempts(); + if (warnInterval > 0 && (reconnectAttempts % warnInterval) == 0) { + LOG.warn("Failed to connect after: {} attempt(s) continuing to retry.", reconnectAttempts); + } + + long delay = nextReconnectDelay(); + connectionHub.schedule(this, delay, TimeUnit.MILLISECONDS); + } + }); + } + + private boolean reconnectAllowed() { + return reconnectAttemptLimit() != 0; + } + + private int reconnectAttemptLimit() { + int maxReconnectValue = this.maxReconnectAttempts; + if (firstConnection && this.startupMaxReconnectAttempts != UNLIMITED) { + maxReconnectValue = this.startupMaxReconnectAttempts; + } + return maxReconnectValue; + } + + private long nextReconnectDelay() { + if (useExponentialBackOff) { + // Exponential increment of reconnect delay. + reconnectDelay *= backOffMultiplier; + if (reconnectDelay > maxReconnectDelay) { + reconnectDelay = maxReconnectDelay; + } + } + + return reconnectDelay; + } + + protected void checkClosed() throws IOException { + if (closed.get()) { + throw new IOException("The Provider is already closed"); + } + } + + //--------------- DefaultProviderListener overrides ----------------------// + + @Override + public void onMessage(final JmsInboundMessageDispatch envelope) { + if (closed.get() || failed.get()) { + return; + } + serializer.execute(new Runnable() { + @Override + public void run() { + if (!closed.get()) { + listener.onMessage(envelope); + } + } + }); + } + + @Override + public void onConnectionFailure(final IOException ex) { + if (closed.get() || failed.get()) { + return; + } + serializer.execute(new Runnable() { + @Override + public void run() { + if (!closed.get() && !failed.get()) { + LOG.debug("Failover: the provider reports failure: {}", ex.getMessage()); + handleProviderFailure(ex); + } + } + }); + } + + //--------------- URI update and rebalance methods -----------------------// + + public void add(final URI uri) { + serializer.execute(new Runnable() { + @Override + public void run() { + uris.add(uri); + } + }); + } + + public void remove(final URI uri) { + serializer.execute(new Runnable() { + @Override + public void run() { + uris.remove(uri); + } + }); + } + + //--------------- Property Getters and Setters ---------------------------// + + @Override + public URI getRemoteURI() { + Provider provider = this.provider; + if (provider != null) { + return provider.getRemoteURI(); + } + return null; + } + + @Override + public void setProviderListener(ProviderListener listener) { + this.listener = listener; + } + + @Override + public ProviderListener getProviderListener() { + return listener; + } + + public boolean isRandomize() { + return uris.isRandomize(); + } + + public void setRandomize(boolean value) { + this.uris.setRandomize(value); + } + + public long getInitialReconnectDealy() { + return initialReconnectDelay; + } + + public void setInitialReconnectDealy(long initialReconnectDealy) { + this.initialReconnectDelay = initialReconnectDealy; + } + + public long getMaxReconnectDelay() { + return maxReconnectDelay; + } + + public void setMaxReconnectDelay(long maxReconnectDelay) { + this.maxReconnectDelay = maxReconnectDelay; + } + + public int getMaxReconnectAttempts() { + return maxReconnectAttempts; + } + + public void setMaxReconnectAttempts(int maxReconnectAttempts) { + this.maxReconnectAttempts = maxReconnectAttempts; + } + + public int getStartupMaxReconnectAttempts() { + return startupMaxReconnectAttempts; + } + + public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) { + this.startupMaxReconnectAttempts = startupMaxReconnectAttempts; + } + + /** + * Gets the current setting controlling how many Connect / Reconnect attempts must occur + * before a warn message is logged. A value of {@code <= 0} indicates that there will be + * no warn message logged regardless of how many reconnect attempts occur. + * + * @return the current number of connection attempts before warn logging is triggered. + */ + public int getWarnAfterReconnectAttempts() { + return warnAfterReconnectAttempts; + } + + /** + * Sets the number of Connect / Reconnect attempts that must occur before a warn message + * is logged indicating that the transport is not connected. This can be useful when the + * client is running inside some container or service as it gives an indication of some + * problem with the client connection that might not otherwise be visible. To disable the + * log messages this value should be set to a value @{code attempts <= 0} + * + * @param warnAfterReconnectAttempts + * The number of failed connection attempts that must happen before a warning is logged. + */ + public void setWarnAfterReconnectAttempts(int warnAfterReconnectAttempts) { + this.warnAfterReconnectAttempts = warnAfterReconnectAttempts; + } + + public double getReconnectDelayExponent() { + return backOffMultiplier; + } + + public void setReconnectDelayExponent(double reconnectDelayExponent) { + this.backOffMultiplier = reconnectDelayExponent; + } + + public boolean isUseExponentialBackOff() { + return useExponentialBackOff; + } + + public void setUseExponentialBackOff(boolean useExponentialBackOff) { + this.useExponentialBackOff = useExponentialBackOff; + } + + public long getConnectTimeout() { + return this.connectTimeout; + } + + public long getCloseTimeout() { + return this.closeTimeout; + } + + public long getSendTimeout() { + return this.sendTimeout; + } + + public long getRequestTimeout() { + return this.requestTimeout; + } + + @Override + public String toString() { + return "FailoverProvider: " + + (connectedURI == null ? "unconnected" : connectedURI.toString()); + } + + //--------------- FailoverProvider Asynchronous Request --------------------// + + /** + * For all requests that are dispatched from the FailoverProvider to a connected + * Provider instance an instance of FailoverRequest is used to handle errors that + * occur during processing of that request and trigger a reconnect. + * + * @param + */ + protected abstract class FailoverRequest extends ProviderFuture implements Runnable { + + private final long id = requestId.incrementAndGet(); + + public FailoverRequest(AsyncResult watcher) { + super(watcher); + } + + @Override + public void run() { + requests.put(id, this); + if (provider == null) { + if (failureWhenOffline()) { + requests.remove(id); + watcher.onFailure(new IOException("Provider disconnected")); + } else if (succeedsWhenOffline()) { + onSuccess(); + } + } else { + try { + LOG.debug("Executing Failover Task: {}", this); + doTask(); + } catch (UnsupportedOperationException e) { + requests.remove(id); + watcher.onFailure(e); + } catch (Exception e) { + // TODO Should we let JMSException through? + LOG.debug("Caught exception while executing task: {}", e.getMessage()); + triggerReconnectionAttempt(); + } + } + } + + @Override + public void onFailure(final Throwable result) { + if (closed.get() || failed.get()) { + requests.remove(id); + super.onFailure(result); + } else { + LOG.debug("Request received error: {}", result.getMessage()); + serializer.execute(new Runnable() { + @Override + public void run() { + handleProviderFailure(IOExceptionSupport.create(result)); + } + }); + } + } + + @Override + public void onSuccess() { + requests.remove(id); + super.onSuccess(); + } + + /** + * Called to execute the specific task that was requested. + * + * @throws Exception if an error occurs during task execution. + */ + public abstract void doTask() throws Exception; + + /** + * Should the request just succeed when the Provider is not connected. + * + * @return true if the request is marked as successful when not connected. + */ + public boolean succeedsWhenOffline() { + return false; + } + + /** + * When the transport is not connected should this request automatically fail. + * + * @return true if the task should fail when the Provider is not connected. + */ + public boolean failureWhenOffline() { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java new file mode 100644 index 0000000..8c63869 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.failover; + +import java.net.URI; +import java.util.Map; + +import org.apache.qpid.jms.provider.Provider; +import org.apache.qpid.jms.provider.ProviderFactory; +import org.apache.qpid.jms.util.PropertyUtil; +import org.apache.qpid.jms.util.URISupport; +import org.apache.qpid.jms.util.URISupport.CompositeData; + +/** + * Factory for creating instances of the Failover Provider type. + */ +public class FailoverProviderFactory extends ProviderFactory { + + @Override + public Provider createAsyncProvider(URI remoteURI) throws Exception { + CompositeData composite = URISupport.parseComposite(remoteURI); + Map options = composite.getParameters(); + Map nested = PropertyUtil.filterProperties(options, "nested."); + + FailoverProvider provider = new FailoverProvider(composite.getComponents(), nested); + if (!PropertyUtil.setProperties(provider, options)) { + String msg = "" + + " Not all options could be set on the Failover provider." + + " Check the options are spelled correctly." + + " Given parameters=[" + options + "]." + + " This Provider cannot be started."; + throw new IllegalArgumentException(msg); + } + + return provider; + } + + @Override + public String getName() { + return "Failover"; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java new file mode 100644 index 0000000..bb2d2b3 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.failover; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.Map; + +import org.apache.qpid.jms.util.URISupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages the list of available failover URIs that are used to connect + * and recover a connection. + */ +public class FailoverUriPool { + + private static final Logger LOG = LoggerFactory.getLogger(FailoverUriPool.class); + + private final LinkedList uris; + private final Map nestedOptions; + private boolean randomize; + + public FailoverUriPool() { + this.uris = new LinkedList(); + this.nestedOptions = Collections.emptyMap(); + } + + public FailoverUriPool(URI[] uris, Map nestedOptions) { + this.uris = new LinkedList(); + if (nestedOptions != null) { + this.nestedOptions = nestedOptions; + } else { + this.nestedOptions = Collections.emptyMap(); + } + + if (uris != null) { + for (URI uri : uris) { + this.add(uri); + } + } + } + + /** + * Returns the next URI in the pool of URIs. The URI will be shifted to the + * end of the list and not be attempted again until the full list has been + * returned once. + * + * @return the next URI that should be used for a connection attempt. + */ + public URI getNext() { + URI next = null; + if (!uris.isEmpty()) { + next = uris.removeFirst(); + uris.addLast(next); + } + + return next; + } + + /** + * Reports that the Failover Provider connected to the last URI returned from + * this pool. If the Pool is set to randomize this will result in the Pool of + * URIs being shuffled in preparation for the next connect cycle. + */ + public void connected() { + if (isRandomize()) { + Collections.shuffle(uris); + } + } + + /** + * @return true if this pool returns the URI values in random order. + */ + public boolean isRandomize() { + return randomize; + } + + /** + * Sets whether the URIs that are returned by this pool are returned in random + * order or not. If false the URIs are returned in FIFO order. + * + * @param randomize + * true to have the URIs returned in a random order. + */ + public void setRandomize(boolean randomize) { + this.randomize = randomize; + if (randomize) { + Collections.shuffle(uris); + } + } + + /** + * Adds a new URI to the pool if not already contained within. The URI will have + * any nest options that have been configured added to its existing set of options. + * + * @param uri + * The new URI to add to the pool. + */ + public void add(URI uri) { + if (!contains(uri)) { + if (!nestedOptions.isEmpty()) { + try { + URISupport.applyParameters(uri, nestedOptions); + } catch (URISyntaxException e) { + LOG.debug("Failed to add nested options to uri: {}", uri); + } + } + + this.uris.add(uri); + } + } + + /** + * Remove a URI from the pool if present, otherwise has no effect. + * + * @param uri + * The URI to attempt to remove from the pool. + */ + public void remove(URI uri) { + this.uris.remove(uri); + } + + /** + * Returns the currently set value for nested options which will be added to each + * URI that is returned from the pool. + * + * @return the Map instance containing the nest options which can be empty if none set. + */ + public Map getNestedOptions() { + return nestedOptions; + } + + private boolean contains(URI newURI) { + boolean result = false; + for (URI uri : uris) { + if (compareURIs(newURI, uri)) { + result = true; + break; + } + } + + return result; + } + + private boolean compareURIs(final URI first, final URI second) { + boolean result = false; + if (first == null || second == null) { + return result; + } + + if (first.getPort() == second.getPort()) { + InetAddress firstAddr = null; + InetAddress secondAddr = null; + try { + firstAddr = InetAddress.getByName(first.getHost()); + secondAddr = InetAddress.getByName(second.getHost()); + + if (firstAddr.equals(secondAddr)) { + result = true; + } + } catch(IOException e) { + if (firstAddr == null) { + LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e); + } else { + LOG.error("Failed to Lookup INetAddress for URI[ " + second + " ] : " + e); + } + + if (first.getHost().equalsIgnoreCase(second.getHost())) { + result = true; + } + } + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AbstractMechanism.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AbstractMechanism.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AbstractMechanism.java new file mode 100644 index 0000000..d1972cd --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AbstractMechanism.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.sasl; + +import java.util.HashMap; +import java.util.Map; + +/** + * Base class for SASL Authentication Mechanism that implements the basic + * methods of a Mechanism class. + */ +public abstract class AbstractMechanism implements Mechanism { + + protected static final byte[] EMPTY = new byte[0]; + + private String username; + private String password; + private Map properties = new HashMap(); + + @Override + public int compareTo(Mechanism other) { + + if (getPriority() < other.getPriority()) { + return -1; + } else if (getPriority() > other.getPriority()) { + return 1; + } + + return 0; + } + + @Override + public void setUsername(String value) { + this.username = value; + } + + @Override + public String getUsername() { + return username; + } + + @Override + public void setPassword(String value) { + this.password = value; + } + + @Override + public String getPassword() { + return this.password; + } + + @Override + public void setProperties(Map properties) { + this.properties = properties; + } + + @Override + public Map getProperties() { + return this.properties; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AnonymousMechanism.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AnonymousMechanism.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AnonymousMechanism.java new file mode 100644 index 0000000..903a5fe --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AnonymousMechanism.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.sasl; + +/** + * Implements the Anonymous SASL authentication mechanism. + */ +public class AnonymousMechanism extends AbstractMechanism { + + @Override + public byte[] getInitialResponse() { + return EMPTY; + } + + @Override + public byte[] getChallengeResponse(byte[] challenge) { + return EMPTY; + } + + @Override + public int getPriority() { + return PRIORITY.LOWEST.getValue(); + } + + @Override + public String getName() { + return "ANONYMOUS"; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AnonymousMechanismFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AnonymousMechanismFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AnonymousMechanismFactory.java new file mode 100644 index 0000000..28cd1f1 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/AnonymousMechanismFactory.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.sasl; + +/** + * Create the Anonymous SASL Authentication Mechanism types. + */ +public class AnonymousMechanismFactory implements MechanismFactory { + + @Override + public Mechanism createMechanism() { + return new AnonymousMechanism(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/CramMD5Mechanism.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/CramMD5Mechanism.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/CramMD5Mechanism.java new file mode 100644 index 0000000..4b896ce --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/CramMD5Mechanism.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.sasl; + +import java.io.UnsupportedEncodingException; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import javax.security.sasl.SaslException; + +/** + * Implements the SASL PLAIN authentication Mechanism. + * + * User name and Password values are sent without being encrypted. + */ +public class CramMD5Mechanism extends AbstractMechanism { + + private static final String ASCII = "ASCII"; + private static final String HMACMD5 = "HMACMD5"; + private boolean _sentResponse; + + @Override + public int getPriority() { + return PRIORITY.HIGH.getValue(); + } + + @Override + public String getName() { + return "CRAM-MD5"; + } + + @Override + public byte[] getInitialResponse() { + return EMPTY; + } + + @Override + public byte[] getChallengeResponse(byte[] challenge) throws SaslException { + if (!_sentResponse && challenge != null && challenge.length != 0) { + try { + SecretKeySpec key = new SecretKeySpec(getPassword().getBytes(ASCII), HMACMD5); + Mac mac = Mac.getInstance(HMACMD5); + mac.init(key); + + byte[] bytes = mac.doFinal(challenge); + + StringBuffer hash = new StringBuffer(getUsername()); + hash.append(' '); + for (int i = 0; i < bytes.length; i++) { + String hex = Integer.toHexString(0xFF & bytes[i]); + if (hex.length() == 1) { + hash.append('0'); + } + hash.append(hex); + } + + _sentResponse = true; + return hash.toString().getBytes(ASCII); + } catch (UnsupportedEncodingException e) { + throw new SaslException("Unable to utilise required encoding", e); + } catch (InvalidKeyException e) { + throw new SaslException("Unable to utilise key", e); + } catch (NoSuchAlgorithmException e) { + throw new SaslException("Unable to utilise required algorithm", e); + } + } else { + return EMPTY; + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/CramMD5MechanismFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/CramMD5MechanismFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/CramMD5MechanismFactory.java new file mode 100644 index 0000000..0ce5736 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/CramMD5MechanismFactory.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.sasl; + +/** + * Create the Plain SASL Authentication Mechanism types. + */ +public class CramMD5MechanismFactory implements MechanismFactory { + + @Override + public Mechanism createMechanism() { + return new CramMD5Mechanism(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/Mechanism.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/Mechanism.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/Mechanism.java new file mode 100644 index 0000000..4fbfcd5 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/Mechanism.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.sasl; + +import java.util.Map; + +import javax.security.sasl.SaslException; + +/** + * Interface for all SASL authentication mechanism implementations. + */ +public interface Mechanism extends Comparable { + + /** + * Relative priority values used to arrange the found SASL + * mechanisms in a preferred order where the level of security + * generally defines the preference. + */ + public enum PRIORITY { + LOWEST(0), + LOW(1), + MEDIUM(2), + HIGH(3), + HIGHEST(4); + + private final int value; + + private PRIORITY(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + }; + + /** + * @return return the relative priority of this SASL mechanism. + */ + int getPriority(); + + /** + * @return the well known name of this SASL mechanism. + */ + String getName(); + + /** + * @return the response buffer used to answer the initial SASL cycle. + * @throws SaslException if an error occurs computing the response. + */ + byte[] getInitialResponse() throws SaslException; + + /** + * Create a response based on a given challenge from the remote peer. + * + * @param challenge + * the challenge that this Mechanism should response to. + * + * @return the response that answers the given challenge. + * @throws SaslException if an error occurs computing the response. + */ + byte[] getChallengeResponse(byte[] challenge) throws SaslException; + + /** + * Sets the user name value for this Mechanism. The Mechanism can ignore this + * value if it does not utilize user name in it's authentication processing. + * + * @param username + * The user name given. + */ + void setUsername(String value); + + /** + * Returns the configured user name value for this Mechanism. + * + * @return the currently set user name value for this Mechanism. + */ + String getUsername(); + + /** + * Sets the password value for this Mechanism. The Mechanism can ignore this + * value if it does not utilize a password in it's authentication processing. + * + * @param username + * The user name given. + */ + void setPassword(String value); + + /** + * Returns the configured password value for this Mechanism. + * + * @return the currently set password value for this Mechanism. + */ + String getPassword(); + + /** + * Sets any additional Mechanism specific properties using a Map + * + * @param options + * the map of additional properties that this Mechanism should utilize. + */ + void setProperties(Map options); + + /** + * The currently set Properties for this Mechanism. + * + * @return the current set of configuration Properties for this Mechanism. + */ + Map getProperties(); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/MechanismFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/MechanismFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/MechanismFactory.java new file mode 100644 index 0000000..a47f38c --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/MechanismFactory.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.sasl; + +/** + * Interface for all SASL authentication mechanism implementations. + */ +public interface MechanismFactory { + + /** + * Creates an instance of the authentication mechanism implementation. + * + * @return a new Mechanism instance. + */ + Mechanism createMechanism(); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/PlainMechanism.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/PlainMechanism.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/PlainMechanism.java new file mode 100644 index 0000000..b305e98 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/PlainMechanism.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.sasl; + +/** + * Implements the SASL PLAIN authentication Mechanism. + * + * User name and Password values are sent without being encrypted. + */ +public class PlainMechanism extends AbstractMechanism { + + @Override + public int getPriority() { + return PRIORITY.MEDIUM.getValue(); + } + + @Override + public String getName() { + return "PLAIN"; + } + + @Override + public byte[] getInitialResponse() { + + String username = getUsername(); + String password = getPassword(); + + if (username == null) { + username = ""; + } + + if (password == null) { + password = ""; + } + + byte[] usernameBytes = username.getBytes(); + byte[] passwordBytes = password.getBytes(); + byte[] data = new byte[usernameBytes.length + passwordBytes.length + 2]; + System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length); + System.arraycopy(passwordBytes, 0, data, 2 + usernameBytes.length, passwordBytes.length); + return data; + } + + @Override + public byte[] getChallengeResponse(byte[] challenge) { + return EMPTY; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/PlainMechanismFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/PlainMechanismFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/PlainMechanismFactory.java new file mode 100644 index 0000000..3cdd205 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/PlainMechanismFactory.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.sasl; + +/** + * Create the Plain SASL Authentication Mechanism types. + */ +public class PlainMechanismFactory implements MechanismFactory { + + @Override + public Mechanism createMechanism() { + return new PlainMechanism(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/SaslMechanismFinder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/SaslMechanismFinder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/SaslMechanismFinder.java new file mode 100644 index 0000000..5bc0a94 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/sasl/SaslMechanismFinder.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.sasl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.qpid.jms.util.FactoryFinder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used to find a SASL Mechanism that most closely matches the preferred set + * of Mechanisms supported by the remote peer. + * + * The Matching mechanism is chosen by first find all instances of SASL + * mechanism types that are supported on the remote peer, and then making a + * final selection based on the Mechanism in the found set that has the + * highest priority value. + */ +public class SaslMechanismFinder { + + private static final Logger LOG = LoggerFactory.getLogger(SaslMechanismFinder.class); + + private static final FactoryFinder MECHANISM_FACTORY_FINDER = + new FactoryFinder(MechanismFactory.class, + "META-INF/services/" + SaslMechanismFinder.class.getPackage().getName().replace(".", "/") + "/"); + + /** + * Attempts to find a matching Mechanism implementation given a list of supported + * mechanisms from a remote peer. Can return null if no matching Mechanisms are + * found. + * + * @param remoteMechanisms + * list of mechanism names that are supported by the remote peer. + * + * @return the best matching Mechanism for the supported remote set. + */ + public static Mechanism findMatchingMechanism(String...remoteMechanisms) { + + Mechanism match = null; + List found = new ArrayList(); + + for (String remoteMechanism : remoteMechanisms) { + try { + MechanismFactory factory = findMechanismFactory(remoteMechanism); + found.add(factory.createMechanism()); + } catch (IOException e) { + LOG.warn("Caught exception while searching for SASL mechanisms: {}", e.getMessage()); + } + } + + if (!found.isEmpty()) { + // Sorts by priority using Mechanism comparison and return the last value in + // list which is the Mechanism deemed to be the highest priority match. + Collections.sort(found); + match = found.get(found.size() - 1); + } + + LOG.info("Best match for SASL auth was: {}", match); + + return match; + } + + /** + * Searches for a MechanismFactory by using the scheme from the given name. + * + * The search first checks the local cache of mechanism factories before moving on + * to search in the classpath. + * + * @param name + * The name of the authentication mechanism to search for.. + * + * @return a mechanism factory instance matching the URI's scheme. + * + * @throws IOException if an error occurs while locating the factory. + */ + protected static MechanismFactory findMechanismFactory(String name) throws IOException { + if (name == null || name.isEmpty()) { + throw new IOException("No Mechanism name specified: [" + name + "]"); + } + + MechanismFactory factory = null; + try { + factory = MECHANISM_FACTORY_FINDER.newInstance(name); + } catch (Throwable e) { + throw new IOException("Mechanism scheme NOT recognized: [" + name + "]", e); + } + + return factory; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java new file mode 100644 index 0000000..ae96ed7 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java @@ -0,0 +1,383 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import javax.net.SocketFactory; + +import org.apache.qpid.jms.util.IOExceptionSupport; +import org.apache.qpid.jms.util.InetAddressUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.vertx.java.core.buffer.Buffer; + +/** + * + */ +public class RawTcpTransport implements Transport, Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(RawTcpTransport.class); + + private TransportListener listener; + private final URI remoteLocation; + private final AtomicBoolean connected = new AtomicBoolean(); + private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicReference connectionError = new AtomicReference(); + + private final Socket socket; + private DataOutputStream dataOut; + private DataInputStream dataIn; + private Thread runner; + + private boolean closeAsync = true; + private int socketBufferSize = 64 * 1024; + private int soTimeout = 0; + private int soLinger = Integer.MIN_VALUE; + private Boolean keepAlive; + private Boolean tcpNoDelay = true; + private boolean useLocalHost = false; + private int ioBufferSize = 8 * 1024; + + /** + * Create a new instance of the transport. + * + * @param listener + * The TransportListener that will receive data from this Transport instance. + * @param remoteLocation + * The remote location where this transport should connection to. + */ + public RawTcpTransport(TransportListener listener, URI remoteLocation) { + this.listener = listener; + this.remoteLocation = remoteLocation; + + Socket temp = null; + try { + temp = createSocketFactory().createSocket(); + } catch (IOException e) { + connectionError.set(e); + } + + this.socket = temp; + } + + @Override + public void connect() throws IOException { + if (connectionError.get() != null) { + throw IOExceptionSupport.create(connectionError.get()); + } + + if (socket == null) { + throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set"); + } + + InetSocketAddress remoteAddress = null; + + if (remoteLocation != null) { + String host = resolveHostName(remoteLocation.getHost()); + remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); + } + + socket.connect(remoteAddress); + + connected.set(true); + + initialiseSocket(socket); + initializeStreams(); + + runner = new Thread(null, this, "QpidJMS RawTcpTransport: " + toString()); + runner.setDaemon(false); + runner.start(); + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + if (socket == null) { + return; + } + + // Closing the streams flush the sockets before closing.. if the socket + // is hung.. then this hangs the close so we support an asynchronous close + // by default which will timeout if the close doesn't happen after a delay. + if (closeAsync) { + final CountDownLatch latch = new CountDownLatch(1); + + final ExecutorService closer = Executors.newSingleThreadExecutor(); + closer.execute(new Runnable() { + @Override + public void run() { + LOG.trace("Closing socket {}", socket); + try { + socket.close(); + LOG.debug("Closed socket {}", socket); + } catch (IOException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e); + } + } finally { + latch.countDown(); + } + } + }); + + try { + latch.await(1,TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + closer.shutdownNow(); + } + } else { + LOG.trace("Closing socket {}", socket); + try { + socket.close(); + LOG.debug("Closed socket {}", socket); + } catch (IOException e) { + LOG.debug("Caught exception closing socket {}. This exception will be ignored.", socket, e); + } + } + } + } + + @Override + public void send(ByteBuffer output) throws IOException { + checkConnected(); + LOG.info("RawTcpTransport sending packet of size: {}", output.remaining()); + if (dataOut instanceof OutputStream) { + WritableByteChannel channel = Channels.newChannel(dataOut); + channel.write(output); + } else { + while (output.hasRemaining()) { + dataOut.writeByte(output.get()); + } + } + dataOut.flush(); + } + + @Override + public void send(org.fusesource.hawtbuf.Buffer output) throws IOException { + checkConnected(); + send(output.toByteBuffer()); + } + + @Override + public boolean isConnected() { + return this.connected.get(); + } + + @Override + public TransportListener getTransportListener() { + return this.listener; + } + + @Override + public void setTransportListener(TransportListener listener) { + if (listener == null) { + throw new IllegalArgumentException("Listener cannot be set to null"); + } + + this.listener = listener; + } + + public int getSocketBufferSize() { + return socketBufferSize; + } + + public void setSocketBufferSize(int socketBufferSize) { + this.socketBufferSize = socketBufferSize; + } + + public int getSoTimeout() { + return soTimeout; + } + + public void setSoTimeout(int soTimeout) { + this.soTimeout = soTimeout; + } + + public boolean isTcpNoDelay() { + return tcpNoDelay; + } + + public void setTcpNoDelay(Boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + } + + public int getSoLinger() { + return soLinger; + } + + public void setSoLinger(int soLinger) { + this.soLinger = soLinger; + } + + public boolean isKeepAlive() { + return keepAlive; + } + + public void setKeepAlive(Boolean keepAlive) { + this.keepAlive = keepAlive; + } + + public boolean isUseLocalHost() { + return useLocalHost; + } + + public void setUseLocalHost(boolean useLocalHost) { + this.useLocalHost = useLocalHost; + } + + public int getIoBufferSize() { + return ioBufferSize; + } + + public void setIoBufferSize(int ioBufferSize) { + this.ioBufferSize = ioBufferSize; + } + + public boolean isCloseAsync() { + return closeAsync; + } + + public void setCloseAsync(boolean closeAsync) { + this.closeAsync = closeAsync; + } + + //---------- Transport internal implementation ---------------------------// + + @Override + public void run() { + LOG.trace("TCP consumer thread for " + this + " starting"); + try { + while (isConnected()) { + doRun(); + } + } catch (IOException e) { + connectionError.set(e); + onException(e); + } catch (Throwable e) { + IOException ioe = new IOException("Unexpected error occured: " + e); + connectionError.set(ioe); + ioe.initCause(e); + onException(ioe); + } + } + + protected void doRun() throws IOException { + int size = dataIn.available(); + if (size <= 0) { + try { + TimeUnit.NANOSECONDS.sleep(1); + } catch (InterruptedException e) { + } + return; + } + + byte[] buffer = new byte[size]; + dataIn.readFully(buffer); + Buffer incoming = new Buffer(buffer); + listener.onData(incoming); + } + + /** + * Passes any IO exceptions into the transport listener + */ + public void onException(IOException e) { + if (listener != null) { + try { + listener.onTransportError(e); + } catch (RuntimeException e2) { + LOG.debug("Unexpected runtime exception: " + e2, e2); + } + } + } + + protected SocketFactory createSocketFactory() throws IOException { + return SocketFactory.getDefault(); + } + + protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException { + try { + sock.setReceiveBufferSize(socketBufferSize); + sock.setSendBufferSize(socketBufferSize); + } catch (SocketException se) { + LOG.warn("Cannot set socket buffer size = {}", socketBufferSize); + LOG.debug("Cannot set socket buffer size. Reason: {}. This exception is ignored.", se.getMessage(), se); + } + + sock.setSoTimeout(soTimeout); + + if (keepAlive != null) { + sock.setKeepAlive(keepAlive.booleanValue()); + } + + if (soLinger > -1) { + sock.setSoLinger(true, soLinger); + } else if (soLinger == -1) { + sock.setSoLinger(false, 0); + } + + if (tcpNoDelay != null) { + sock.setTcpNoDelay(tcpNoDelay.booleanValue()); + } + } + + protected void initializeStreams() throws IOException { + try { + TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize); + this.dataIn = new DataInputStream(buffIn); + TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); + this.dataOut = new DataOutputStream(outputStream); + } catch (Throwable e) { + throw IOExceptionSupport.create(e); + } + } + + protected String resolveHostName(String host) throws UnknownHostException { + if (isUseLocalHost()) { + String localName = InetAddressUtil.getLocalHostName(); + if (localName != null && localName.equals(host)) { + return "localhost"; + } + } + return host; + } + + private void checkConnected() throws IOException { + if (!connected.get()) { + throw new IOException("Cannot send to a non-connected transport."); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java new file mode 100644 index 0000000..49d250c --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports; + +import java.io.IOException; +import java.net.URI; + +import org.apache.qpid.jms.JmsSslContext; +import org.vertx.java.core.net.NetClient; + +/** + * Provides SSL configuration to the Vert.x NetClient object used by the underling + * TCP based Transport. + */ +public class SslTransport extends TcpTransport { + + private final JmsSslContext context; + + /** + * Create an instance of the SSL transport + * + * @param listener + * The TransportListener that will handle events from this Transport instance. + * @param remoteLocation + * The location that is being connected to. + * @param JmsSslContext + * The JMS Framework SslContext to use for this SSL connection. + */ + public SslTransport(TransportListener listener, URI remoteLocation, JmsSslContext context) { + super(listener, remoteLocation); + + this.context = context; + } + + @Override + protected void configureNetClient(NetClient client) throws IOException { + super.configureNetClient(client); + + client.setSSL(true); + client.setKeyStorePath(context.getKeyStoreLocation()); + client.setKeyStorePassword(context.getKeyStorePassword()); + client.setTrustStorePath(context.getTrustStoreLocation()); + client.setTrustStorePassword(context.getTrustStorePassword()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org