Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 7944 invoked from network); 19 Sep 2006 22:28:46 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 19 Sep 2006 22:28:46 -0000 Received: (qmail 18073 invoked by uid 500); 19 Sep 2006 22:28:45 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 18049 invoked by uid 500); 19 Sep 2006 22:28:45 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Delivered-To: moderator for qpid-commits@incubator.apache.org Received: (qmail 92039 invoked by uid 99); 19 Sep 2006 22:09:04 -0000 X-ASF-Spam-Status: No, hits=-9.8 required=5.0 tests=ALL_TRUSTED,NO_REAL_NAME Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r447994 [18/46] - in /incubator/qpid/trunk/qpid: ./ cpp/ cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/ cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/ cpp/common/concurrent/ cpp/common/concur... Date: Tue, 19 Sep 2006 22:07:25 -0000 To: qpid-commits@incubator.apache.org From: rhs@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20060919220808.1B8A01A9856@eris.apache.org> X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,301 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; + +import java.util.HashMap; +import java.net.URISyntaxException; +import java.net.URI; + +public class AMQBrokerDetails implements BrokerDetails +{ + private String _host; + private int _port; + private String _transport; + + private HashMap _options; + + public AMQBrokerDetails() + { + _options = new HashMap(); + } + + public AMQBrokerDetails(String url) throws URLSyntaxException + { + this(); + // URL should be of format tcp://host:port?option='value',option='value' + try + { + URI connection = new URI(url); + + String transport = connection.getScheme(); + + // Handles some defaults to minimise changes to existing broker URLS e.g. localhost + if (transport != null) + { + //todo this list of valid transports should be enumerated somewhere + if ((!(transport.equalsIgnoreCase("vm") || + transport.equalsIgnoreCase("tcp")))) + { + if (transport.equalsIgnoreCase("localhost")) + { + connection = new URI(DEFAULT_TRANSPORT + "://" + url); + transport = connection.getScheme(); + } + else + { + if (url.charAt(transport.length()) == ':' && url.charAt(transport.length()+1) != '/' ) + { + //Then most likely we have a host:port value + connection = new URI(DEFAULT_TRANSPORT + "://" + url); + transport = connection.getScheme(); + } + else + { + URLHelper.parseError(0, transport.length(), "Unknown transport", url); + } + } + } + } + else + { + //Default the transport + connection = new URI(DEFAULT_TRANSPORT + "://" + url); + transport = connection.getScheme(); + } + + if (transport == null) + { + URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" + + " In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, ""); + } + + setTransport(transport); + + String host = connection.getHost(); + + // Fix for Java 1.5 + if (host == null) + { + host = ""; + } + + setHost(host); + + int port = connection.getPort(); + + if (port == -1) + { + // Another fix for Java 1.5 URI handling + String auth = connection.getAuthority(); + + if (auth != null && auth.startsWith(":")) + { + setPort(Integer.parseInt(auth.substring(1))); + } + else + { + setPort(DEFAULT_PORT); + } + } + else + { + setPort(port); + } + + String queryString = connection.getQuery(); + + URLHelper.parseOptions(_options, queryString); + + //Fragment is #string (not used) + } + catch (URISyntaxException uris) + { + if (uris instanceof URLSyntaxException) + { + throw (URLSyntaxException) uris; + } + + URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); + } + } + + public AMQBrokerDetails(String host, int port, boolean useSSL) + { + _host = host; + _port = port; + + if (useSSL) + { + setOption(OPTIONS_SSL, "true"); + } + } + + public String getHost() + { + return _host; + } + + public void setHost(String _host) + { + this._host = _host; + } + + public int getPort() + { + return _port; + } + + public void setPort(int _port) + { + this._port = _port; + } + + public String getTransport() + { + return _transport; + } + + public void setTransport(String _transport) + { + this._transport = _transport; + } + + + public String getOption(String key) + { + return _options.get(key); + } + + public void setOption(String key, String value) + { + _options.put(key, value); + } + + public long getTimeout() + { + if (_options.containsKey(OPTIONS_CONNECT_TIMEOUT)) + { + try + { + return Long.parseLong(_options.get(OPTIONS_CONNECT_TIMEOUT)); + } + catch (NumberFormatException nfe) + { + //Do nothing as we will use the default below. + } + } + + return BrokerDetails.DEFAULT_CONNECT_TIMEOUT; + } + + public void setTimeout(long timeout) + { + setOption(OPTIONS_CONNECT_TIMEOUT, Long.toString(timeout)); + } + + public String toString() + { + StringBuffer sb = new StringBuffer(); + + sb.append(_transport); + sb.append("://"); + + if (!(_transport.equalsIgnoreCase("vm"))) + { + sb.append(_host); + } + + sb.append(':'); + sb.append(_port); + + sb.append(printOptionsURL()); + + return sb.toString(); + } + + public boolean equals(Object o) + { + if (!(o instanceof BrokerDetails)) + { + return false; + } + + BrokerDetails bd = (BrokerDetails) o; + + return _host.equalsIgnoreCase(bd.getHost()) && + (_port == bd.getPort()) && + _transport.equalsIgnoreCase(bd.getTransport()) && + (useSSL() == bd.useSSL()); + + //todo do we need to compare all the options as well? + } + + private String printOptionsURL() + { + StringBuffer optionsURL = new StringBuffer(); + + optionsURL.append('?'); + + if (!(_options.isEmpty())) + { + + for (String key : _options.keySet()) + { + optionsURL.append(key); + + optionsURL.append("='"); + + optionsURL.append(_options.get(key)); + + optionsURL.append("'"); + + optionsURL.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + } + + //remove the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options + optionsURL.deleteCharAt(optionsURL.length() - 1); + + return optionsURL.toString(); + } + + public boolean useSSL() + { + // To be friendly to users we should be case insensitive. + // or simply force users to conform to OPTIONS_SSL + // todo make case insensitive by trying ssl Ssl sSl ssL SSl SsL sSL SSL + + if (_options.containsKey(OPTIONS_SSL)) + { + return _options.get(OPTIONS_SSL).equalsIgnoreCase("true"); + } + + return false; + } + + public void useSSL(boolean ssl) + { + setOption(OPTIONS_SSL, Boolean.toString(ssl)); + } + + +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnection.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnection.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnection.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnection.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,927 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQUndeliveredException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.failover.FailoverSupport; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.framing.*; +import org.apache.qpid.jms.*; +import org.apache.qpid.jms.Connection; + +import org.apache.log4j.Logger; + +import javax.jms.*; +import javax.jms.Queue; +import javax.jms.Session; +import javax.naming.Reference; +import javax.naming.NamingException; +import javax.naming.StringRefAddr; +import javax.naming.Referenceable; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.net.ConnectException; +import java.nio.channels.UnresolvedAddressException; +import java.text.MessageFormat; + +public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable +{ + private static final Logger _logger = Logger.getLogger(AMQConnection.class); + + private AtomicInteger _idFactory = new AtomicInteger(0); + + /** + * This is the "root" mutex that must be held when doing anything that could be impacted by failover. + * This must be held by any child objects of this connection such as the session, producers and consumers. + */ + private final Object _failoverMutex = new Object(); + + /** + * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels + * per session and we must prevent the client from opening too many. Zero means unlimited. + */ + private long _maximumChannelCount; + + /** + * The maximum size of frame supported by the server + */ + private long _maximumFrameSize; + + /** + * The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped + * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate + * handler. + */ + private AMQProtocolHandler _protocolHandler; + + /** + * Maps from session id (Integer) to AMQSession instance + */ + private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap + + private String _clientName; + + /** + * The user name to use for authentication + */ + private String _username; + + /** + * The password to use for authentication + */ + private String _password; + + /** + * The virtual path to connect to on the AMQ server + */ + private String _virtualHost; + + private ExceptionListener _exceptionListener; + + private ConnectionListener _connectionListener; + + private ConnectionURL _connectionURL; + + /** + * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for + * message publication. + */ + private boolean _started; + + /** + * Policy dictating how to failover + */ + private FailoverPolicy _failoverPolicy; + + /* + * _Connected should be refactored with a suitable wait object. + */ + private boolean _connected; + + /* + * The last error code that occured on the connection. Used to return the correct exception to the client + */ + private AMQException _lastAMQException = null; + + public AMQConnection(String broker, String username, String password, + String clientName, String virtualHost) throws AMQException, URLSyntaxException + { + this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + + username + ":" + password + "@" + clientName + + virtualHost + "?brokerlist='" + broker + "'")); + } + + public AMQConnection(String host, int port, String username, String password, + String clientName, String virtualHost) throws AMQException, URLSyntaxException + { + this(host, port, false, username, password, clientName, virtualHost); + } + + public AMQConnection(String host, int port, boolean useSSL, String username, String password, + String clientName, String virtualHost) throws AMQException, URLSyntaxException + { + this(new AMQConnectionURL(useSSL ? + ConnectionURL.AMQ_PROTOCOL + "://" + + username + ":" + password + "@" + clientName + + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" + + "," + ConnectionURL.OPTIONS_SSL + "='true'" : + ConnectionURL.AMQ_PROTOCOL + "://" + + username + ":" + password + "@" + clientName + + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" + + "," + ConnectionURL.OPTIONS_SSL + "='false'" + )); + } + + public AMQConnection(String connection) throws AMQException, URLSyntaxException + { + this(new AMQConnectionURL(connection)); + } + + public AMQConnection(ConnectionURL connectionURL) throws AMQException + { + _logger.info("Connection:" + connectionURL); + + if (connectionURL == null) + { + throw new IllegalArgumentException("Connection must be specified"); + } + + _connectionURL = connectionURL; + + _clientName = connectionURL.getClientName(); + _username = connectionURL.getUsername(); + _password = connectionURL.getPassword(); + _virtualHost = connectionURL.getVirtualHost(); + + _failoverPolicy = new FailoverPolicy(connectionURL); + + _protocolHandler = new AMQProtocolHandler(this); + + // We are not currently connected + _connected = false; + + + Exception lastException = new Exception(); + lastException.initCause(new ConnectException()); + + while (lastException != null && lastException.getCause() instanceof ConnectException && _failoverPolicy.failoverAllowed()) + { + try + { + makeBrokerConnection(_failoverPolicy.getNextBrokerDetails()); + lastException = null; + } + catch (Exception e) + { + lastException = e; + + _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause()); + _logger.info(e); + _logger.info(e.getCause()); + } + } + + _logger.debug("Are we connected:" + _connected); + + // Then the Failover Thread will handle conneciton + if (_failoverPolicy.failoverAllowed()) + { + //TODO this needs to be redone so that we are not spinning. + // A suitable object should be set that is then waited on + // and only notified when a connection is made or when + // the AMQConnection gets closed. + while (!_connected && !_closed.get()) + { + try + { + _logger.debug("Sleeping."); + Thread.sleep(100); + } + catch (InterruptedException ie) + { + _logger.debug("Woken up."); + } + } + if (!_failoverPolicy.failoverAllowed() || _failoverPolicy.getCurrentBrokerDetails() == null) + { + if (_lastAMQException != null) + { + throw _lastAMQException; + } + } + } + else + { + String message = null; + + if (lastException != null) + { + if (lastException.getCause() != null) + { + message = lastException.getCause().getMessage(); + } + else + { + message = lastException.getMessage(); + } + } + + if (message == null || message.equals("")) + { + message = "Unable to Connect"; + } + + AMQException e = new AMQConnectionException(message); + + if (lastException != null) + { + if (lastException instanceof UnresolvedAddressException) + { + e = new AMQUnresolvedAddressException(message); + } + e.initCause(lastException); + } + + throw e; + } + } + + protected AMQConnection(String username, String password, String clientName, String virtualHost) + { + _clientName = clientName; + _username = username; + _password = password; + _virtualHost = virtualHost; + } + + private void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException + { + try + { + TransportConnection.getInstance().connect(_protocolHandler, brokerDetail); + // this blocks until the connection has been set up or when an error + // has prevented the connection being set up + _protocolHandler.attainState(AMQState.CONNECTION_OPEN); + _failoverPolicy.attainedConnection(); + + //Again this should be changed to a suitable notify + _connected = true; + } + catch (AMQException e) + { + _lastAMQException = e; + throw e; + } + } + + public boolean attemptReconnection(String host, int port, boolean useSSL) + { + BrokerDetails bd = new AMQBrokerDetails(host, port, useSSL); + + _failoverPolicy.setBroker(bd); + + try + { + makeBrokerConnection(bd); + return true; + } + catch (Exception e) + { + _logger.info("Unable to connect to broker at " + bd); + attemptReconnection(); + } + return false; + } + + public boolean attemptReconnection() + { + while (_failoverPolicy.failoverAllowed()) + { + try + { + makeBrokerConnection(_failoverPolicy.getNextBrokerDetails()); + return true; + } + catch (Exception e) + { + if (!(e instanceof AMQException)) + { + _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); + } + else + { + _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails()); + } + } + } + + //connection unsuccessful + return false; + } + + /** + * Get the details of the currently active broker + * + * @return null if no broker is active (i.e. no successful connection has been made, or + * the BrokerDetail instance otherwise + */ + public BrokerDetails getActiveBrokerDetails() + { + return _failoverPolicy.getCurrentBrokerDetails(); + } + + public boolean failoverAllowed() + { + return _failoverPolicy.failoverAllowed(); + } + + public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException + { + return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH); + } + + public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, + final int prefetch) throws JMSException + { + checkNotClosed(); + if (channelLimitReached()) + { + throw new ChannelLimitReachedException(_maximumChannelCount); + } + else + { + return (org.apache.qpid.jms.Session) new FailoverSupport() + { + public Object operation() throws JMSException + { + int channelId = _idFactory.incrementAndGet(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Write channel open frame for channel id " + channelId); + } + + // We must create the session and register it before actually sending the frame to the server to + // open it, so that there is no window where we could receive data on the channel and not be set + // up to handle it appropriately. + AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, + prefetch); + _protocolHandler.addSessionByChannel(channelId, session); + registerSession(channelId, session); + + boolean success = false; + try + { + createChannelOverWire(channelId, prefetch, transacted); + success = true; + } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error creating session: " + e); + jmse.setLinkedException(e); + throw jmse; + } + finally + { + if (!success) { + _protocolHandler.removeSessionByChannel(channelId); + deregisterSession(channelId); + } + } + + if (_started) + { + session.start(); + } + return session; + } + }.execute(this); + } + } + + private void createChannelOverWire(int channelId, int prefetch, boolean transacted) + throws AMQException + { + _protocolHandler.syncWrite( + ChannelOpenBody.createAMQFrame(channelId, null), ChannelOpenOkBody.class); + _protocolHandler.syncWrite( + BasicQosBody.createAMQFrame(channelId, 0, prefetch, false), + BasicQosOkBody.class); + + if (transacted) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Issuing TxSelect for " + channelId); + } + _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId), TxSelectOkBody.class); + } + } + + private void reopenChannel(int channelId, int prefetch, boolean transacted) throws AMQException + { + try + { + createChannelOverWire(channelId, prefetch, transacted); + } + catch (AMQException e) + { + _protocolHandler.removeSessionByChannel(channelId); + deregisterSession(channelId); + throw new AMQException("Error reopening channel " + channelId + " after failover: " + e); + } + } + + + public void setFailoverPolicy(FailoverPolicy policy) + { + _failoverPolicy = policy; + } + + public FailoverPolicy getFailoverPolicy() + { + return _failoverPolicy; + } + + public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException + { + return (QueueSession) createSession(transacted, acknowledgeMode); + } + + public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException + { + return (TopicSession) createSession(transacted, acknowledgeMode); + } + + private boolean channelLimitReached() + { + return _maximumChannelCount != 0 && _sessions.size() == _maximumChannelCount; + } + + public String getClientID() throws JMSException + { + checkNotClosed(); + return _clientName; + } + + public void setClientID(String clientID) throws JMSException + { + checkNotClosed(); + _clientName = clientID; + } + + public ConnectionMetaData getMetaData() throws JMSException + { + checkNotClosed(); + // TODO Auto-generated method stub + return null; + } + + public ExceptionListener getExceptionListener() throws JMSException + { + checkNotClosed(); + return _exceptionListener; + } + + public void setExceptionListener(ExceptionListener listener) throws JMSException + { + checkNotClosed(); + _exceptionListener = listener; + } + + /** + * Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread + * and is not thread safe (which is legal according to the JMS specification). + * + * @throws JMSException + */ + public void start() throws JMSException + { + checkNotClosed(); + if (!_started) + { + final Iterator it = _sessions.entrySet().iterator(); + while (it.hasNext()) + { + final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue(); + s.start(); + } + _started = true; + } + } + + public void stop() throws JMSException + { + checkNotClosed(); + + if (_started) + { + for (Iterator i = _sessions.values().iterator(); i.hasNext();) + { + ((AMQSession) i.next()).stop(); + } + _started = false; + } + } + + public void close() throws JMSException + { + synchronized (getFailoverMutex()) + { + if (!_closed.getAndSet(true)) + { + try + { + closeAllSessions(null); + _protocolHandler.closeConnection(); + } + catch (AMQException e) + { + throw new JMSException("Error closing connection: " + e); + } + } + } + } + + /** + * Marks all sessions and their children as closed without sending any protocol messages. Useful when + * you need to mark objects "visible" in userland as closed after failover or other significant event that + * impacts the connection. + *

+ * The caller must hold the failover mutex before calling this method. + */ + private void markAllSessionsClosed() + { + final LinkedList sessionCopy = new LinkedList(_sessions.values()); + final Iterator it = sessionCopy.iterator(); + while (it.hasNext()) + { + final AMQSession session = (AMQSession) it.next(); + + session.markClosed(); + } + _sessions.clear(); + } + + /** + * Close all the sessions, either due to normal connection closure or due to an error occurring. + * + * @param cause if not null, the error that is causing this shutdown + *

+ * The caller must hold the failover mutex before calling this method. + */ + private void closeAllSessions(Throwable cause) throws JMSException + { + final LinkedList sessionCopy = new LinkedList(_sessions.values()); + final Iterator it = sessionCopy.iterator(); + JMSException sessionException = null; + while (it.hasNext()) + { + final AMQSession session = (AMQSession) it.next(); + if (cause != null) + { + session.closed(cause); + } + else + { + try + { + session.close(); + } + catch (JMSException e) + { + _logger.error("Error closing session: " + e); + sessionException = e; + } + } + } + _sessions.clear(); + if (sessionException != null) + { + throw sessionException; + } + } + + public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, + ServerSessionPool sessionPool, + int maxMessages) throws JMSException + { + checkNotClosed(); + return null; + } + + public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, + ServerSessionPool sessionPool, + int maxMessages) throws JMSException + { + checkNotClosed(); + return null; + } + + public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, + ServerSessionPool sessionPool, + int maxMessages) throws JMSException + { + checkNotClosed(); + return null; + } + + public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, + String messageSelector, ServerSessionPool sessionPool, + int maxMessages) + throws JMSException + { + // TODO Auto-generated method stub + checkNotClosed(); + return null; + } + + public long getMaximumChannelCount() + { + checkNotClosed(); + return _maximumChannelCount; + } + + public void setConnectionListener(ConnectionListener listener) + { + _connectionListener = listener; + } + + public ConnectionListener getConnectionListener() + { + return _connectionListener; + } + + public void setMaximumChannelCount(long maximumChannelCount) + { + checkNotClosed(); + _maximumChannelCount = maximumChannelCount; + } + + public void setMaximumFrameSize(long frameMax) + { + _maximumFrameSize = frameMax; + } + + public long getMaximumFrameSize() + { + return _maximumFrameSize; + } + + public Map getSessions() + { + return _sessions; + } + + public String getUsername() + { + return _username; + } + + public String getPassword() + { + return _password; + } + + public String getVirtualHost() + { + return _virtualHost; + } + + public AMQProtocolHandler getProtocolHandler() + { + return _protocolHandler; + } + + public void bytesSent(long writtenBytes) + { + if (_connectionListener != null) + { + _connectionListener.bytesSent(writtenBytes); + } + } + + public void bytesReceived(long receivedBytes) + { + if (_connectionListener != null) + { + _connectionListener.bytesReceived(receivedBytes); + } + } + + /** + * Fire the preFailover event to the registered connection listener (if any) + * + * @param redirect true if this is the result of a redirect request rather than a connection error + * @return true if no listener or listener does not veto change + */ + public boolean firePreFailover(boolean redirect) + { + boolean proceed = true; + if (_connectionListener != null) + { + proceed = _connectionListener.preFailover(redirect); + } + return proceed; + } + + /** + * Fire the preResubscribe event to the registered connection listener (if any). If the listener + * vetoes resubscription then all the sessions are closed. + * + * @return true if no listener or listener does not veto resubscription. + * @throws JMSException + */ + public boolean firePreResubscribe() throws JMSException + { + if (_connectionListener != null) + { + boolean resubscribe = _connectionListener.preResubscribe(); + if (!resubscribe) + { + markAllSessionsClosed(); + } + return resubscribe; + } + else + { + return true; + } + } + + /** + * Fires a failover complete event to the registered connection listener (if any). + */ + public void fireFailoverComplete() + { + if (_connectionListener != null) + { + _connectionListener.failoverComplete(); + } + } + + /** + * In order to protect the consistency of the connection and its child sessions, consumers and producers, + * the "failover mutex" must be held when doing any operations that could be corrupted during failover. + * + * @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs. + */ + public final Object getFailoverMutex() + { + return _failoverMutex; + } + + /** + * If failover is taking place this will block until it has completed. If failover + * is not taking place it will return immediately. + * + * @throws InterruptedException + */ + public void blockUntilNotFailingOver() throws InterruptedException + { + _protocolHandler.blockUntilNotFailingOver(); + } + + /** + * Invoked by the AMQProtocolSession when a protocol session exception has occurred. + * This method sends the exception to a JMS exception listener, if configured, and + * propagates the exception to sessions, which in turn will propagate to consumers. + * This allows synchronous consumers to have exceptions thrown to them. + * + * @param cause the exception + */ + public void exceptionReceived(Throwable cause) + { + + _logger.debug("Connection Close done by:" + Thread.currentThread().getName()); + _logger.debug("exceptionReceived is ", cause); + + final JMSException je; + if (cause instanceof JMSException) + { + je = (JMSException) cause; + } + else + { + je = new JMSException("Exception thrown against " + toString() + ": " + cause); + if (cause instanceof Exception) + { + je.setLinkedException((Exception) cause); + } + } + + // in the case of an IOException, MINA has closed the protocol session so we set _closed to true + // so that any generic client code that tries to close the connection will not mess up this error + // handling sequence + if (cause instanceof IOException) + { + _closed.set(true); + } + + if (_exceptionListener != null) + { + _exceptionListener.onException(je); + } + + if (!(cause instanceof AMQUndeliveredException) && !(cause instanceof AMQAuthenticationException)) + { + try + { + _logger.info("Closing AMQConnection due to :" + cause.getMessage()); + _closed.set(true); + closeAllSessions(cause); // FIXME: when doing this end up with RejectedExecutionException from executor. + } + catch (JMSException e) + { + _logger.error("Error closing all sessions: " + e, e); + } + + } + else + { + _logger.info("Not a hard-error connection not closing."); + } + } + + void registerSession(int channelId, AMQSession session) + { + _sessions.put(channelId, session); + } + + void deregisterSession(int channelId) + { + _sessions.remove(channelId); + } + + /** + * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. + * The caller must hold the failover mutex before calling this method. + */ + public void resubscribeSessions() throws AMQException + { + ArrayList sessions = new ArrayList(_sessions.values()); + _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: remove? + for (Iterator it = sessions.iterator(); it.hasNext();) + { + AMQSession s = (AMQSession) it.next(); + _protocolHandler.addSessionByChannel(s.getChannelId(), s); + reopenChannel(s.getChannelId(), s.getDefaultPrefetch(), s.getTransacted()); + s.resubscribe(); + } + } + + public String toString() + { + StringBuffer buf = new StringBuffer("AMQConnection:\n"); + if (_failoverPolicy.getCurrentBrokerDetails() == null) + { + buf.append("No active broker connection"); + } + else + { + BrokerDetails bd = _failoverPolicy.getCurrentBrokerDetails(); + buf.append("Host: ").append(String.valueOf(bd.getHost())); + buf.append("\nPort: ").append(String.valueOf(bd.getPort())); + } + buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost)); + buf.append("\nClient ID: ").append(String.valueOf(_clientName)); + buf.append("\nActive session count: ").append(_sessions == null ? 0 : _sessions.size()); + return buf.toString(); + } + + public String toURL() + { + return _connectionURL.toString(); + } + + public Reference getReference() throws NamingException + { + return new Reference( + AMQConnection.class.getName(), + new StringRefAddr(AMQConnection.class.getName(), toURL()), + AMQConnectionFactory.class.getName(), + null); // factory location + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnection.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionFactory.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionFactory.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionFactory.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionFactory.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,358 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.AMQException; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.jms.ConnectionURL; + +import javax.jms.*; +import javax.jms.Connection; +import javax.naming.*; +import javax.naming.spi.ObjectFactory; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Hashtable; + + +public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, ObjectFactory, Referenceable +{ + private String _host; + private int _port; + private String _defaultUsername; + private String _defaultPassword; + private String _virtualPath; + + private ConnectionURL _connectionDetails; + + + public AMQConnectionFactory() + { + } + + public AMQConnectionFactory(String url) throws URLSyntaxException + { + _connectionDetails = new AMQConnectionURL(url); + } + + public AMQConnectionFactory(ConnectionURL url) + { + _connectionDetails = url; + } + + public AMQConnectionFactory(String broker, String username, String password, + String clientName, String virtualHost) throws URLSyntaxException + { + this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + + username + ":" + password + "@" + clientName + + virtualHost + "?brokerlist='" + broker + "'")); + } + + public AMQConnectionFactory(String host, int port, String virtualPath) + { + this(host, port, "guest", "guest", virtualPath); + } + + public AMQConnectionFactory(String host, int port, String defaultUsername, String defaultPassword, + String virtualPath) + { + _host = host; + _port = port; + _defaultUsername = defaultUsername; + _defaultPassword = defaultPassword; + _virtualPath = virtualPath; + +//todo when setting Host/Port has been resolved then we can use this otherwise those methods won't work with the following line. +// _connectionDetails = new AMQConnectionURL( +// ConnectionURL.AMQ_PROTOCOL + "://" + +// _defaultUsername + ":" + _defaultPassword + "@" + +// virtualPath + "?brokerlist='tcp://" + host + ":" + port + "'"); + } + + /** + * @return The _defaultPassword. + */ + public final String getDefaultPassword(String password) + { + if (_connectionDetails != null) + { + return _connectionDetails.getPassword(); + } + else + { + return _defaultPassword; + } + } + + /** + * @param password The _defaultPassword to set. + */ + public final void setDefaultPassword(String password) + { + if (_connectionDetails != null) + { + _connectionDetails.setPassword(password); + } + _defaultPassword = password; + } + + /** + * @return The _defaultPassword. + */ + public final String getDefaultUsername(String password) + { + if (_connectionDetails != null) + { + return _connectionDetails.getUsername(); + } + else + { + return _defaultUsername; + } + } + + /** + * @param username The _defaultUsername to set. + */ + public final void setDefaultUsername(String username) + { + if (_connectionDetails != null) + { + _connectionDetails.setUsername(username); + } + _defaultUsername = username; + } + + /** + * @return The _host . + */ + public final String getHost() + { + //todo this doesn't make sense in a multi broker URL as we have no current as that is done by AMQConnection + return _host; + } + + /** + * @param host The _host to set. + */ + public final void setHost(String host) + { + //todo if _connectionDetails is set then run _connectionDetails.addBrokerDetails() + // Should perhaps have this method changed to setBroker(host,port) + _host = host; + } + + /** + * @return _port The _port to set. + */ + public final int getPort() + { + //todo see getHost + return _port; + } + + /** + * @param port The port to set. + */ + public final void setPort(int port) + { + //todo see setHost + _port = port; + } + + /** + * @return he _virtualPath. + */ + public final String getVirtualPath() + { + if (_connectionDetails != null) + { + return _connectionDetails.getVirtualHost(); + } + else + { + return _virtualPath; + } + } + + /** + * @param path The _virtualPath to set. + */ + public final void setVirtualPath(String path) + { + if (_connectionDetails != null) + { + _connectionDetails.setVirtualHost(path); + } + + _virtualPath = path; + } + + static String getUniqueClientID() + { + try + { + InetAddress addr = InetAddress.getLocalHost(); + return addr.getHostName() + System.currentTimeMillis(); + } + catch (UnknownHostException e) + { + return null; + } + } + + public Connection createConnection() throws JMSException + { + try + { + if (_connectionDetails != null) + { + if (_connectionDetails.getClientName() == null || _connectionDetails.getClientName().equals("")) + { + _connectionDetails.setClientName(getUniqueClientID()); + } + return new AMQConnection(_connectionDetails); + } + else + { + return new AMQConnection(_host, _port, _defaultUsername, _defaultPassword, getUniqueClientID(), + _virtualPath); + } + } + catch (Exception e) + { + JMSException jmse = new JMSException("Error creating connection: " + e.getMessage()); + jmse.setLinkedException(e); + throw jmse; + } + + + } + + public Connection createConnection(String userName, String password) throws JMSException + { + try + { + return new AMQConnection(_host, _port, userName, password, getUniqueClientID(), _virtualPath); + } + catch (Exception e) + { + JMSException jmse = new JMSException("Error creating connection: " + e.getMessage()); + jmse.setLinkedException(e); + throw jmse; + } + } + + public QueueConnection createQueueConnection() throws JMSException + { + return (QueueConnection) createConnection(); + } + + public QueueConnection createQueueConnection(String username, String password) throws JMSException + { + return (QueueConnection) createConnection(username, password); + } + + public TopicConnection createTopicConnection() throws JMSException + { + return (TopicConnection) createConnection(); + } + + public TopicConnection createTopicConnection(String username, String password) throws JMSException + { + return (TopicConnection) createConnection(username, password); + } + + + public ConnectionURL getConnectionURL() + { + return _connectionDetails; + } + + /** + * JNDI interface to create objects from References. + * + * @param obj The Reference from JNDI + * @param name + * @param ctx + * @param env + * @return AMQConnection,AMQTopic,AMQQueue, or AMQConnectionFactory. + * @throws Exception + */ + public Object getObjectInstance(Object obj, Name name, Context ctx, + Hashtable env) throws Exception + { + if (obj instanceof Reference) + { + Reference ref = (Reference) obj; + + if (ref.getClassName().equals(AMQConnection.class.getName())) + { + RefAddr addr = ref.get(AMQConnection.class.getName()); + + if (addr != null) + { + return new AMQConnection((String) addr.getContent()); + } + } + + if (ref.getClassName().equals(AMQQueue.class.getName())) + { + RefAddr addr = ref.get(AMQQueue.class.getName()); + + if (addr != null) + { + return new AMQQueue(new AMQBindingURL((String) addr.getContent()).getQueueName()); + } + } + + if (ref.getClassName().equals(AMQTopic.class.getName())) + { + RefAddr addr = ref.get(AMQTopic.class.getName()); + + if (addr != null) + { + return new AMQTopic(new AMQBindingURL((String) addr.getContent()).getDestinationName()); + } + } + + if (ref.getClassName().equals(AMQConnectionFactory.class.getName())) + { + RefAddr addr = ref.get(AMQConnectionFactory.class.getName()); + + if (addr != null) + { + return new AMQConnectionFactory(new AMQConnectionURL((String) addr.getContent())); + } + } + + } + return null; + } + + + public Reference getReference() throws NamingException + { + return new Reference( + AMQConnectionFactory.class.getName(), + new StringRefAddr(AMQConnectionFactory.class.getName(), _connectionDetails.getURL()), + AMQConnectionFactory.class.getName(), + null); // factory location + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionURL.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionURL.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionURL.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionURL.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,399 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; + +import java.util.*; +import java.net.URI; +import java.net.URISyntaxException; + +public class AMQConnectionURL implements ConnectionURL +{ + private String _url; + private String _failoverMethod; + private HashMap _failoverOptions; + private HashMap _options; + private List _brokers; + private String _clientName; + private String _username; + private String _password; + private String _virtualHost; + + public AMQConnectionURL(String fullURL) throws URLSyntaxException + { + _url = fullURL; + _options = new HashMap(); + _brokers = new LinkedList(); + _failoverOptions = new HashMap(); + + // Connection URL format + //amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\',option=\'value\';vm://:3/virtualpath?option=\'value\'',failover='method?option=\'value\',option='value''" + // Options are of course optional except for requiring a single broker in the broker list. + try + { + URI connection = new URI(fullURL); + + if (connection.getScheme() == null || !(connection.getScheme().equalsIgnoreCase(AMQ_PROTOCOL))) + { + throw new URISyntaxException(fullURL, "Not an AMQP URL"); + } + + if (connection.getHost() == null || connection.getHost().equals("")) + { + String uid = AMQConnectionFactory.getUniqueClientID(); + if (uid == null) + { + URLHelper.parseError(-1, "Client Name not specified", fullURL); + } + else + { + setClientName(uid); + } + + } + else + { + setClientName(connection.getHost()); + } + + String userInfo = connection.getUserInfo(); + + if (userInfo == null) + { + //Fix for Java 1.5 which doesn't parse UserInfo for non http URIs + userInfo = connection.getAuthority(); + + if (userInfo != null) + { + int atIndex = userInfo.indexOf('@'); + + if (atIndex != -1) + { + userInfo = userInfo.substring(0, atIndex); + } + else + { + userInfo = null; + } + } + + } + + if (userInfo == null) + { + URLHelper.parseError(AMQ_PROTOCOL.length() + 3, + "User information not found on url", fullURL); + } + else + { + parseUserInfo(userInfo); + } + String virtualHost = connection.getPath(); + + if (virtualHost != null && (!virtualHost.equals(""))) + { + setVirtualHost(virtualHost); + } + else + { + int authLength = connection.getAuthority().length(); + int start = AMQ_PROTOCOL.length() + 3; + int testIndex = start + authLength; + if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?') + { + URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL); + } + else + { + URLHelper.parseError(-1, "Virtual host not specified", fullURL); + } + + } + + + URLHelper.parseOptions(_options, connection.getQuery()); + + processOptions(); + + //Fragment is #string (not used) + //System.out.println(connection.getFragment()); + + } + catch (URISyntaxException uris) + { + if (uris instanceof URLSyntaxException) + { + throw (URLSyntaxException) uris; + } + + int slash = fullURL.indexOf("\\"); + + if (slash == -1) + { + URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); + } + else + { + if (slash != 0 && fullURL.charAt(slash - 1) == ':') + { + URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL); + } + else + { + URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL); + } + } + + } + } + + private void parseUserInfo(String userinfo) throws URLSyntaxException + { + //user info = user:pass + + int colonIndex = userinfo.indexOf(':'); + + if (colonIndex == -1) + { + URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(), + "Null password in user information not allowed.", _url); + } + else + { + setUsername(userinfo.substring(0, colonIndex)); + setPassword(userinfo.substring(colonIndex + 1)); + } + + } + + private void processOptions() throws URLSyntaxException + { + if (_options.containsKey(OPTIONS_BROKERLIST)) + { + String brokerlist = _options.get(OPTIONS_BROKERLIST); + + //brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value' + StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR); + + while (st.hasMoreTokens()) + { + String broker = st.nextToken(); + + _brokers.add(new AMQBrokerDetails(broker)); + } + + _options.remove(OPTIONS_BROKERLIST); + } + + if (_options.containsKey(OPTIONS_FAILOVER)) + { + String failover = _options.get(OPTIONS_FAILOVER); + + // failover='method?option='value',option='value'' + + int methodIndex = failover.indexOf('?'); + + if (methodIndex > -1) + { + _failoverMethod = failover.substring(0, methodIndex); + URLHelper.parseOptions(_failoverOptions, failover.substring(methodIndex + 1)); + } + else + { + _failoverMethod = failover; + } + + _options.remove(OPTIONS_FAILOVER); + } + } + + public String getURL() + { + return _url; + } + + public String getFailoverMethod() + { + return _failoverMethod; + } + + public String getFailoverOption(String key) + { + return _failoverOptions.get(key); + } + + public void setFailoverOption(String key, String value) + { + _failoverOptions.put(key, value); + } + + public int getBrokerCount() + { + return _brokers.size(); + } + + public BrokerDetails getBrokerDetails(int index) + { + if (index < _brokers.size()) + { + return _brokers.get(index); + } + else + { + return null; + } + } + + public void addBrokerDetails(BrokerDetails broker) + { + if (!(_brokers.contains(broker))) + { + _brokers.add(broker); + } + } + + public List getAllBrokerDetails() + { + return _brokers; + } + + public String getClientName() + { + return _clientName; + } + + public void setClientName(String clientName) + { + _clientName = clientName; + } + + public String getUsername() + { + return _username; + } + + public void setUsername(String username) + { + _username = username; + } + + public String getPassword() + { + return _password; + } + + public void setPassword(String password) + { + _password = password; + } + + public String getVirtualHost() + { + return _virtualHost; + } + + public void setVirtualHost(String virtuaHost) + { + _virtualHost = virtuaHost; + } + + public String getOption(String key) + { + return _options.get(key); + } + + public void setOption(String key, String value) + { + _options.put(key, value); + } + + public String toString() + { + StringBuffer sb = new StringBuffer(); + + sb.append(AMQ_PROTOCOL); + sb.append("://"); + + if (_username != null) + { + sb.append(_username); + + if (_password != null) + { + sb.append(':'); + sb.append(_password); + } + + sb.append('@'); + } + + sb.append(_clientName); + + sb.append(_virtualHost); + + sb.append(optionsToString()); + + return sb.toString(); + } + + private String optionsToString() + { + StringBuffer sb = new StringBuffer(); + + sb.append("?" + OPTIONS_BROKERLIST + "='"); + + for (BrokerDetails service : _brokers) + { + sb.append(service.toString()); + sb.append(';'); + } + + sb.deleteCharAt(sb.length() - 1); + sb.append("'"); + + if (_failoverMethod != null) + { + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + sb.append(OPTIONS_FAILOVER + "='"); + sb.append(_failoverMethod); + sb.append(URLHelper.printOptions(_failoverOptions)); + sb.append("'"); + } + + return sb.toString(); + } + + + public static void main(String[] args) throws URLSyntaxException + { + + String url2 = "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'"; + //"amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''"; + + ConnectionURL connectionurl2 = new AMQConnectionURL(url2); + + System.out.println(url2); + System.out.println(connectionurl2); + + } + +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQConnectionURL.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQDestination.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQDestination.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQDestination.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQDestination.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,282 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.exchange.ExchangeDefaults; + +import javax.naming.Reference; +import javax.naming.NamingException; +import javax.naming.StringRefAddr; +import javax.naming.Referenceable; +import javax.jms.Destination; + + +public abstract class AMQDestination implements Destination, Referenceable +{ + protected final String _exchangeName; + + protected final String _exchangeClass; + + protected final String _destinationName; + + protected boolean _isDurable; + + protected final boolean _isExclusive; + + protected final boolean _isAutoDelete; + + protected String _queueName; + + protected AMQDestination(String url) throws URLSyntaxException + { + this(new AMQBindingURL(url)); + } + + protected AMQDestination(BindingURL binding) + { + _exchangeName = binding.getExchangeName(); + _exchangeClass = binding.getExchangeClass(); + _destinationName = binding.getDestinationName(); + + _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE)); + _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE)); + _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE)); + _queueName = binding.getQueueName(); + } + + protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, String queueName) + { + this(exchangeName, exchangeClass, destinationName, false, false, queueName); + } + + protected AMQDestination(String exchangeName, String exchangeClass, String destinationName) + { + this(exchangeName, exchangeClass, destinationName, false, false, null); + } + + protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, boolean isExclusive, + boolean isAutoDelete, String queueName) + { + if (destinationName == null) + { + throw new IllegalArgumentException("Destination name must not be null"); + } + if (exchangeName == null) + { + throw new IllegalArgumentException("Exchange name must not be null"); + } + if (exchangeClass == null) + { + throw new IllegalArgumentException("Exchange class must not be null"); + } + _exchangeName = exchangeName; + _exchangeClass = exchangeClass; + _destinationName = destinationName; + _isExclusive = isExclusive; + _isAutoDelete = isAutoDelete; + _queueName = queueName; + } + + public abstract String getEncodedName(); + + public boolean isDurable() + { + return _isDurable; + } + + public String getExchangeName() + { + return _exchangeName; + } + + public String getExchangeClass() + { + return _exchangeClass; + } + + public boolean isTopic() + { + return ExchangeDefaults.TOPIC_EXCHANGE_NAME.equals(_exchangeName); + } + + public boolean isQueue() + { + return ExchangeDefaults.DIRECT_EXCHANGE_NAME.equals(_exchangeName); + } + + public String getDestinationName() + { + return _destinationName; + } + + public String getQueueName() + { + return _queueName; + } + + public void setQueueName(String queueName) + { + _queueName = queueName; + } + + public abstract String getRoutingKey(); + + public boolean isExclusive() + { + return _isExclusive; + } + + public boolean isAutoDelete() + { + return _isAutoDelete; + } + + public abstract boolean isNameRequired(); + + public String toString() + { + return toURL(); + + /* + return "Destination: " + _destinationName + ", " + + "Queue Name: " + _queueName + ", Exchange: " + _exchangeName + + ", Exchange class: " + _exchangeClass + ", Exclusive: " + _isExclusive + + ", AutoDelete: " + _isAutoDelete + ", Routing Key: " + getRoutingKey(); + */ + } + + public String toURL() + { + StringBuffer sb = new StringBuffer(); + + sb.append(_exchangeClass); + sb.append("://"); + sb.append(_exchangeName); + + sb.append("/"); + + if (_destinationName != null) + { + sb.append(_destinationName); + } + + sb.append("/"); + + if (_queueName != null) + { + sb.append(_queueName); + } + + sb.append("?"); + + if (_isDurable) + { + sb.append(BindingURL.OPTION_DURABLE); + sb.append("='true'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + + if (_isExclusive) + { + sb.append(BindingURL.OPTION_EXCLUSIVE); + sb.append("='true'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + + if (_isAutoDelete) + { + sb.append(BindingURL.OPTION_AUTODELETE); + sb.append("='true'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + + //remove the last char '?' if there is no options , ',' if there are. + sb.deleteCharAt(sb.length() - 1); + + return sb.toString(); + } + + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final AMQDestination that = (AMQDestination) o; + + if (!_destinationName.equals(that._destinationName)) + { + return false; + } + if (!_exchangeClass.equals(that._exchangeClass)) + { + return false; + } + if (!_exchangeName.equals(that._exchangeName)) + { + return false; + } + if ((_queueName == null && that._queueName != null) || + (_queueName != null && !_queueName.equals(that._queueName))) + { + return false; + } + if (_isExclusive != that._isExclusive) + { + return false; + } + if (_isAutoDelete != that._isAutoDelete) + { + return false; + } + return true; + } + + public int hashCode() + { + int result; + result = _exchangeName.hashCode(); + result = 29 * result + _exchangeClass.hashCode(); + result = 29 * result + _destinationName.hashCode(); + if (_queueName != null) + { + result = 29 * result + _queueName.hashCode(); + } + result = result * (_isExclusive ? 13 : 7); + result = result * (_isAutoDelete ? 13 : 7); + return result; + } + + public Reference getReference() throws NamingException + { + return new Reference( + this.getClass().getName(), + new StringRefAddr(this.getClass().getName(), toURL()), + AMQConnectionFactory.class.getName(), + null); // factory location + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQDestination.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,49 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.exchange.ExchangeDefaults; + +/** + * A destination backed by a headers exchange + */ +public class AMQHeadersExchange extends AMQDestination +{ + public AMQHeadersExchange(String queueName) + { + super(queueName, ExchangeDefaults.HEADERS_EXCHANGE_CLASS, queueName, true, true, null); + } + + public String getEncodedName() + { + return getDestinationName(); + } + + public String getRoutingKey() + { + return getDestinationName(); + } + + public boolean isNameRequired() + { + //Not sure what the best approach is here, probably to treat this like a topic + //and allow server to generate names. As it is AMQ specific it doesn't need to + //fit the JMS API expectations so this is not as yet critical. + return false; + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQHeadersExchange.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoConsumersException.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoConsumersException.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoConsumersException.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoConsumersException.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,34 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.AMQUndeliveredException; +import org.apache.qpid.protocol.AMQConstant; + + +public class AMQNoConsumersException extends AMQUndeliveredException +{ + public AMQNoConsumersException(String msg, Object bounced) + { + super(AMQConstant.NO_CONSUMERS.getCode(), msg, bounced); + } + + +} + + Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoConsumersException.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoRouteException.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoRouteException.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoRouteException.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoRouteException.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,34 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.AMQUndeliveredException; +import org.apache.qpid.protocol.AMQConstant; + + +public class AMQNoRouteException extends AMQUndeliveredException +{ + public AMQNoRouteException(String msg, Object bounced) + { + super(AMQConstant.NO_ROUTE.getCode(), msg, bounced); + } + + +} + + Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQNoRouteException.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQQueue.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQQueue.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQQueue.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQQueue.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,91 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.exchange.ExchangeDefaults; + +import javax.jms.Queue; + +public class AMQQueue extends AMQDestination implements Queue +{ + + /** + * Create a reference to a non temporary queue using a BindingURL object. + * Note this does not actually imply the queue exists. + * @param binding a BindingURL object + */ + public AMQQueue(BindingURL binding) + { + super(binding); + } + + /** + * Create a reference to a non temporary queue. Note this does not actually imply the queue exists. + * @param name the name of the queue + */ + public AMQQueue(String name) + { + this(name, false); + } + + /** + * Create a queue with a specified name. + * + * @param name the destination name (used in the routing key) + * @param temporary if true the broker will generate a queue name, also if true then the queue is autodeleted + * and exclusive + */ + public AMQQueue(String name, boolean temporary) + { + // queue name is set to null indicating that the broker assigns a name in the case of temporary queues + // temporary queues are typically used as response queues + this(name, temporary?null:name, temporary, temporary); + _isDurable = !temporary; + } + + /** + * Create a reference to a queue. Note this does not actually imply the queue exists. + * @param destinationName the queue name + * @param queueName the queue name + * @param exclusive true if the queue should only permit a single consumer + * @param autoDelete true if the queue should be deleted automatically when the last consumers detaches + */ + public AMQQueue(String destinationName, String queueName, boolean exclusive, boolean autoDelete) + { + super(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, destinationName, exclusive, + autoDelete, queueName); + } + + public String getEncodedName() + { + return 'Q' + getQueueName(); + } + + public String getRoutingKey() + { + return getQueueName(); + } + + public boolean isNameRequired() + { + //If the name is null, we require one to be generated by the client so that it will# + //remain valid if we failover (see BLZ-24) + return getQueueName() == null; + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQQueue.java ------------------------------------------------------------------------------ svn:eol-style = native