Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 3364 invoked from network); 8 Feb 2008 10:12:06 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 8 Feb 2008 10:12:06 -0000 Received: (qmail 75087 invoked by uid 500); 8 Feb 2008 10:11:58 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 75057 invoked by uid 500); 8 Feb 2008 10:11:58 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 75026 invoked by uid 99); 8 Feb 2008 10:11:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Feb 2008 02:11:58 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Feb 2008 10:11:34 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0112C1A9866; Fri, 8 Feb 2008 02:11:13 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r619823 [12/19] - in /incubator/qpid/branches/thegreatmerge/qpid: ./ cpp/ dotnet/ dotnet/Qpid.Buffer.Tests/Properties/ dotnet/Qpid.Buffer/Properties/ dotnet/Qpid.Client.Tests/ dotnet/Qpid.Client.Tests/Channel/ dotnet/Qpid.Client.Tests/Commo... Date: Fri, 08 Feb 2008 10:10:11 -0000 To: qpid-commits@incubator.apache.org From: aidan@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080208101113.0112C1A9866@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java Fri Feb 8 02:09:37 2008 @@ -33,7 +33,7 @@ /** * @author Apache Software Foundation */ -public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener +public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener { private static final Logger _logger = LoggerFactory.getLogger(ExchangeBoundOkMethodHandler.class); private static final ExchangeBoundOkMethodHandler _instance = new ExchangeBoundOkMethodHandler(); @@ -46,14 +46,14 @@ private ExchangeBoundOkMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) - throws AMQException + public void methodReceived(AMQStateManager stateManager, ExchangeBoundOkBody body, int channelId) + throws AMQException { if (_logger.isDebugEnabled()) { - ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod(); - _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: " - + body.replyText); + _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.getReplyCode() + " text: " + + body.getReplyText()); } } + } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java Fri Feb 8 02:09:37 2008 @@ -33,7 +33,7 @@ /** * @author Apache Software Foundation */ -public class QueueDeleteOkMethodHandler implements StateAwareMethodListener +public class QueueDeleteOkMethodHandler implements StateAwareMethodListener { private static final Logger _logger = LoggerFactory.getLogger(QueueDeleteOkMethodHandler.class); private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler(); @@ -46,13 +46,14 @@ private QueueDeleteOkMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) - throws AMQException - { + public void methodReceived(AMQStateManager stateManager, QueueDeleteOkBody body, int channelId) + throws AMQException + { if (_logger.isDebugEnabled()) { - QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod(); - _logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount); + _logger.debug("Received Queue.Delete-Ok message, message count: " + body.getMessageCount()); } } + + } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Fri Feb 8 02:09:37 2008 @@ -26,6 +26,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicDeliverBody; +import org.apache.qpid.framing.BasicReturnBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; @@ -51,6 +52,18 @@ super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered); } + public UnprocessedMessage_0_8(int channelId, BasicReturnBody body) + { + //FIXME: TGM, SRSLY 4RL + super(channelId, 0, null, body.getExchange(), body.getRoutingKey(), false); + } + + public UnprocessedMessage_0_8(int channelId, BasicDeliverBody body) + { + //FIXME: TGM, SRSLY 4RL + super(channelId, 0, null, body.getExchange(), body.getRoutingKey(), false); + } + public void receiveBody(ContentBody body) { @@ -119,8 +132,8 @@ } if(_deliverBody != null) { - buf.append("Delivery tag " + _deliverBody.deliveryTag); - buf.append("Consumer tag " + _deliverBody.consumerTag); + buf.append("Delivery tag " + _deliverBody.getDeliveryTag()); + buf.append("Consumer tag " + _deliverBody.getConsumerTag()); buf.append("Deliver Body " + _deliverBody); } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Feb 8 02:09:37 2008 @@ -21,10 +21,16 @@ package org.apache.qpid.client.protocol; import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; +import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.WriteBufferLimitFilterBuilder; +import org.apache.mina.filter.codec.ProtocolCodecException; import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.mina.filter.executor.ExecutorFilter; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; @@ -39,16 +45,7 @@ import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionCloseOkBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.*; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -58,6 +55,7 @@ import org.slf4j.LoggerFactory; import java.util.Iterator; +import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; @@ -214,6 +212,36 @@ e.printStackTrace(); } + if (!System.getProperties().containsKey("protectio") || Boolean.getBoolean("protectio")) + { + try + { + //Add IO Protection Filters + IoFilterChain chain = session.getFilterChain(); + + int buf_size = 32768; + if (session.getConfig() instanceof SocketSessionConfig) + { + buf_size = ((SocketSessionConfig) session.getConfig()).getReceiveBufferSize(); + } + session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); + + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize(buf_size); + readfilter.attach(chain); + + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + writefilter.setMaximumConnectionBufferSize(buf_size * 2); + writefilter.attach(chain); + session.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); + + _logger.info("Using IO Read/Write Filter Protection"); + } + catch (Exception e) + { + _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); + } + } _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); _protocolSession.init(); } @@ -380,94 +408,109 @@ public void messageReceived(IoSession session, Object message) throws Exception { - final boolean debug = _logger.isDebugEnabled(); - final long msgNumber = ++_messageReceivedCount; - - if (debug && ((msgNumber % 1000) == 0)) + if(message instanceof AMQFrame) { - _logger.debug("Received " + _messageReceivedCount + " protocol messages"); - } + final boolean debug = _logger.isDebugEnabled(); + final long msgNumber = ++_messageReceivedCount; - AMQFrame frame = (AMQFrame) message; + if (debug && ((msgNumber % 1000) == 0)) + { + _logger.debug("Received " + _messageReceivedCount + " protocol messages"); + } - final AMQBody bodyFrame = frame.getBodyFrame(); + AMQFrame frame = (AMQFrame) message; - HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); + final AMQBody bodyFrame = frame.getBodyFrame(); - switch (bodyFrame.getFrameType()) - { - case AMQMethodBody.TYPE: + HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - if (debug) - { - _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); - } + switch (bodyFrame.getFrameType()) + { + case AMQMethodBody.TYPE: - final AMQMethodEvent evt = - new AMQMethodEvent(frame.getChannel(), (AMQMethodBody) bodyFrame); + if (debug) + { + _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); + } - try - { + final AMQMethodEvent evt = + new AMQMethodEvent(frame.getChannel(), (AMQMethodBody) bodyFrame); - boolean wasAnyoneInterested = getStateManager().methodReceived(evt); - if (!_frameListeners.isEmpty()) + try { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) + + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); + if (!_frameListeners.isEmpty()) { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + } } - } - if (!wasAnyoneInterested) - { - throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" - + _frameListeners, null); + if (!wasAnyoneInterested) + { + throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + + _frameListeners, null); + } } - } - catch (AMQException e) - { - getStateManager().error(e); - if (!_frameListeners.isEmpty()) + catch (AMQException e) { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) + getStateManager().error(e); + if (!_frameListeners.isEmpty()) { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + listener.error(e); + } } + + exceptionCaught(session, e); } - exceptionCaught(session, e); - } + break; - break; + case ContentHeaderBody.TYPE: - case ContentHeaderBody.TYPE: + _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame); + break; - _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame); - break; + case ContentBody.TYPE: - case ContentBody.TYPE: + _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); + break; - _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); - break; + case HeartbeatBody.TYPE: - case HeartbeatBody.TYPE: + if (debug) + { + _logger.debug("Received heartbeat"); + } - if (debug) - { - _logger.debug("Received heartbeat"); - } + break; - break; + default: - default: + } + _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); } + else if (message instanceof ProtocolInitiation) + { + // We get here if the server sends a response to our initial protocol header + // suggesting an alternate ProtocolVersion; the server will then close the + // connection. + ProtocolInitiation protocolInit = (ProtocolInitiation) message; + ProtocolVersion pv = protocolInit.checkVersion(); + getConnection().setProtocolVersion(pv); - _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); + // get round a bug in old versions of qpid whereby the connection is not closed + _stateManager.changeState(AMQState.CONNECTION_CLOSED); + } } private static int _messagesOut; @@ -506,6 +549,12 @@ getStateManager().attainState(s); } + public AMQState attainState(Set states) throws AMQException + { + return getStateManager().attainState(states); + } + + /** * Convenience method that writes a frame to the protocol session. Equivalent to calling * getProtocolSession().write(). @@ -600,16 +649,11 @@ { getStateManager().changeState(AMQState.CONNECTION_CLOSING); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - final AMQFrame frame = - ConnectionCloseBody.createAMQFrame(0, _protocolSession.getProtocolMajorVersion(), - _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection.")); // replyText + ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client is closing the connection."),0,0); + + + final AMQFrame frame = body.generateFrame(0); try { @@ -708,5 +752,15 @@ public byte getProtocolMinorVersion() { return _protocolSession.getProtocolMinorVersion(); + } + + public MethodRegistry getMethodRegistry() + { + return getStateManager().getMethodRegistry(); + } + + public ProtocolVersion getProtocolVersion() + { + return _protocolSession.getProtocolVersion(); } } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Feb 8 02:09:37 2008 @@ -21,40 +21,31 @@ package org.apache.qpid.client.protocol; import org.apache.commons.lang.StringUtils; - import org.apache.mina.common.CloseFuture; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoSession; import org.apache.mina.common.WriteFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import javax.security.sasl.SaslClient; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.ConnectionTuneParameters; -// import org.apache.qpid.client.message.UnexpectedBodyReceivedException; import org.apache.qpid.client.message.ReturnMessage; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.message.UnprocessedMessage_0_8; import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.MainRegistry; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.framing.VersionSpecificRegistry; +import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.JMSException; -import javax.security.sasl.SaslClient; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import org.apache.qpid.client.handler.ClientMethodDispatcherImpl; /** * Wrapper for protocol session that provides type-safe access to session attributes.

The underlying protocol @@ -101,12 +92,19 @@ protected int _queueId = 1; protected final Object _queueIdLock = new Object(); - private byte _protocolMinorVersion; - private byte _protocolMajorVersion; - private VersionSpecificRegistry _registry = - MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion()); + private ProtocolVersion _protocolVersion; +// private VersionSpecificRegistry _registry = +// MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion()); + + + private MethodRegistry _methodRegistry = + MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion()); - private final AMQConnection _connection; + + private MethodDispatcher _methodDispatcher; + + + private final AMQConnection _connection; public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) { @@ -126,6 +124,9 @@ _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT); _stateManager = stateManager; _stateManager.setProtocolSession(this); + _protocolVersion = connection.getProtocolVersion(); + _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), + stateManager); _connection = connection; } @@ -135,7 +136,7 @@ // start the process of setting up the connection. This is the first place that // data is written to the server. - _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); + _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion())); } public String getClientID() @@ -164,6 +165,8 @@ public void setStateManager(AMQStateManager stateManager) { _stateManager = stateManager; + _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(_protocolVersion, + stateManager); } public String getVirtualHost() @@ -440,26 +443,55 @@ session.confirmConsumerCancelled(consumerTag); } - public void setProtocolVersion(final byte versionMajor, final byte versionMinor) + public void setProtocolVersion(final ProtocolVersion pv) { - _protocolMajorVersion = versionMajor; - _protocolMinorVersion = versionMinor; - _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor); + _protocolVersion = pv; + _methodRegistry = MethodRegistry.getMethodRegistry(pv); + _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, _stateManager); + + // _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor); } public byte getProtocolMinorVersion() { - return _protocolMinorVersion; + return _protocolVersion.getMinorVersion(); } public byte getProtocolMajorVersion() { - return _protocolMajorVersion; + return _protocolVersion.getMajorVersion(); + } + + public ProtocolVersion getProtocolVersion() + { + return _protocolVersion; } - public VersionSpecificRegistry getRegistry() +// public VersionSpecificRegistry getRegistry() +// { +// return _registry; +// } + + public MethodRegistry getMethodRegistry() { - return _registry; + return _methodRegistry; } + public MethodDispatcher getMethodDispatcher() + { + return _methodDispatcher; + } + + + public void setTicket(int ticket, int channelId) + { + final AMQSession session = getSession(channelId); + session.setTicket(ticket); + } + + + public void setMethodDispatcher(MethodDispatcher methodDispatcher) + { + _methodDispatcher = methodDispatcher; + } } Copied: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java (from r616809, incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java) URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java?p2=incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java&p1=incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java&r1=616809&r2=619823&rev=619823&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java Fri Feb 8 02:09:37 2008 @@ -27,6 +27,6 @@ { public AMQMethodNotImplementedException(AMQMethodBody body) { - super("Unexpected Method Received: " + body.getClass().getName()); + super(null, "Unexpected Method Received: " + body.getClass().getName(), null); } } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java Fri Feb 8 02:09:37 2008 @@ -24,8 +24,22 @@ * States used in the AMQ protocol. Used by the finite state machine to determine * valid responses. */ -public class AMQState +public enum AMQState { + + CONNECTION_NOT_STARTED(1, "CONNECTION_NOT_STARTED"), + + CONNECTION_NOT_TUNED(2, "CONNECTION_NOT_TUNED"), + + CONNECTION_NOT_OPENED(3, "CONNECTION_NOT_OPENED"), + + CONNECTION_OPEN(4, "CONNECTION_OPEN"), + + CONNECTION_CLOSING(5, "CONNECTION_CLOSING"), + + CONNECTION_CLOSED(6, "CONNECTION_CLOSED"); + + private final int _id; private final String _name; @@ -41,16 +55,6 @@ return "AMQState: id = " + _id + " name: " + _name; } - public static final AMQState CONNECTION_NOT_STARTED = new AMQState(1, "CONNECTION_NOT_STARTED"); - - public static final AMQState CONNECTION_NOT_TUNED = new AMQState(2, "CONNECTION_NOT_TUNED"); - - public static final AMQState CONNECTION_NOT_OPENED = new AMQState(3, "CONNECTION_NOT_OPENED"); - - public static final AMQState CONNECTION_OPEN = new AMQState(4, "CONNECTION_OPEN"); - - public static final AMQState CONNECTION_CLOSING = new AMQState(5, "CONNECTION_CLOSING"); - - public static final AMQState CONNECTION_CLOSED = new AMQState(6, "CONNECTION_CLOSED"); - + + } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Fri Feb 8 02:09:37 2008 @@ -21,42 +21,15 @@ package org.apache.qpid.client.state; import org.apache.qpid.AMQException; -import org.apache.qpid.client.handler.BasicCancelOkMethodHandler; -import org.apache.qpid.client.handler.BasicDeliverMethodHandler; -import org.apache.qpid.client.handler.BasicReturnMethodHandler; -import org.apache.qpid.client.handler.ChannelCloseMethodHandler; -import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler; -import org.apache.qpid.client.handler.ChannelFlowOkMethodHandler; -import org.apache.qpid.client.handler.ConnectionCloseMethodHandler; -import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler; -import org.apache.qpid.client.handler.ConnectionSecureMethodHandler; -import org.apache.qpid.client.handler.ConnectionStartMethodHandler; -import org.apache.qpid.client.handler.ConnectionTuneMethodHandler; -import org.apache.qpid.client.handler.ExchangeBoundOkMethodHandler; -import org.apache.qpid.client.handler.QueueDeleteOkMethodHandler; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.framing.BasicDeliverBody; -import org.apache.qpid.framing.BasicReturnBody; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionOpenOkBody; -import org.apache.qpid.framing.ConnectionSecureBody; -import org.apache.qpid.framing.ConnectionStartBody; -import org.apache.qpid.framing.ConnectionTuneBody; -import org.apache.qpid.framing.ExchangeBoundOkBody; -import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; import java.util.Iterator; -import java.util.Map; +import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; /** @@ -72,11 +45,11 @@ /** The current state */ private AMQState _currentState; + /** * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of * AMQFrame. */ - protected final Map _state2HandlersMap = new HashMap(); private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet(); private final Object _stateLock = new Object(); @@ -96,53 +69,10 @@ { _protocolSession = protocolSession; _currentState = state; - if (register) - { - registerListeners(); - } + } - protected void registerListeners() - { - Map frame2handlerMap = new HashMap(); - // we need to register a map for the null (i.e. all state) handlers otherwise you get - // a stack overflow in the handler searching code when you present it with a frame for which - // no handlers are registered - // - _state2HandlersMap.put(null, frame2handlerMap); - - frame2handlerMap = new HashMap(); - frame2handlerMap.put(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance()); - frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); - _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap); - - frame2handlerMap = new HashMap(); - frame2handlerMap.put(ConnectionTuneBody.class, ConnectionTuneMethodHandler.getInstance()); - frame2handlerMap.put(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance()); - frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); - _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap); - - frame2handlerMap = new HashMap(); - frame2handlerMap.put(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance()); - frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); - _state2HandlersMap.put(AMQState.CONNECTION_NOT_OPENED, frame2handlerMap); - - // - // ConnectionOpen handlers - // - frame2handlerMap = new HashMap(); - frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandler.getInstance()); - frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance()); - frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); - frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance()); - frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance()); - frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance()); - frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance()); - frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance()); - frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance()); - _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap); - } public AMQState getCurrentState() { @@ -176,56 +106,14 @@ public boolean methodReceived(AMQMethodEvent evt) throws AMQException { - StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod()); - if (handler != null) - { - handler.methodReceived(this, _protocolSession, evt); - return true; - } - - return false; + B method = evt.getMethod(); + + // StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod()); + method.execute(_protocolSession.getMethodDispatcher(), evt.getChannelId()); + return true; } - protected StateAwareMethodListener findStateTransitionHandler(AMQState currentState, AMQMethodBody frame) - // throws IllegalStateTransitionException - { - final Class clazz = frame.getClass(); - if (_logger.isDebugEnabled()) - { - _logger.debug("Looking for state[" + currentState + "] transition handler for frame " + clazz); - } - - final Map classToHandlerMap = (Map) _state2HandlersMap.get(currentState); - - if (classToHandlerMap == null) - { - // if no specialised per state handler is registered look for a - // handler registered for "all" states - return findStateTransitionHandler(null, frame); - } - - final StateAwareMethodListener handler = (StateAwareMethodListener) classToHandlerMap.get(clazz); - if (handler == null) - { - if (currentState == null) - { - _logger.debug("No state transition handler defined for receiving frame " + frame); - - return null; - } - else - { - // if no specialised per state handler is registered look for a - // handler registered for "all" states - return findStateTransitionHandler(null, frame); - } - } - else - { - return handler; - } - } public void attainState(final AMQState s) throws AMQException { @@ -271,5 +159,47 @@ public void setProtocolSession(AMQProtocolSession session) { _protocolSession = session; + } + + public MethodRegistry getMethodRegistry() + { + return getProtocolSession().getMethodRegistry(); + } + + public AMQState attainState(Set stateSet) throws AMQException + { + synchronized (_stateLock) + { + final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME; + long waitTime = MAXIMUM_STATE_WAIT_TIME; + + while (!stateSet.contains(_currentState) && (waitTime > 0)) + { + try + { + _stateLock.wait(MAXIMUM_STATE_WAIT_TIME); + } + catch (InterruptedException e) + { + _logger.warn("Thread interrupted"); + } + + if (!stateSet.contains(_currentState)) + { + waitTime = waitUntilTime - System.currentTimeMillis(); + } + } + + if (!stateSet.contains(_currentState)) + { + _logger.warn("State not achieved within permitted time. Current state " + _currentState + + ", desired state: " + stateSet); + throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState + + ", desired state: " + stateSet, null); + } + return _currentState; + } + + } } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java Fri Feb 8 02:09:37 2008 @@ -21,6 +21,7 @@ package org.apache.qpid.client.state; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.protocol.AMQMethodEvent; @@ -29,8 +30,9 @@ * the opportunity to update state. * */ -public interface StateAwareMethodListener +public interface StateAwareMethodListener { - void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, - AMQMethodEvent evt) throws AMQException; + + void methodReceived(AMQStateManager stateManager, B body, int channelId) throws AMQException; + } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Fri Feb 8 02:09:37 2008 @@ -23,6 +23,7 @@ import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; @@ -34,7 +35,6 @@ import java.io.IOException; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; /** @@ -95,28 +95,26 @@ { SocketConnector result; // FIXME - this needs to be sorted to use the new Mina MultiThread SA. - if (Boolean.getBoolean("qpidnio")) + if (!System.getProperties().containsKey("qpidnio") || Boolean.getBoolean("qpidnio")) { - _logger.error("Using Qpid NIO - sysproperty 'qpidnio' is set."); - // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector + _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio") + ? "Qpid NIO is new default" + : "Sysproperty 'qpidnio' is set")); + result = new MultiThreadSocketConnector(); } - // else - + else { _logger.info("Using Mina NIO"); result = new SocketConnector(); // non-blocking connector } - // Don't have the connector's worker thread wait around for other connections (we only use // one SocketConnector per connection at the moment anyway). This allows short-running // clients (like unit tests) to complete quickly. result.setWorkerTimeout(0); - return result; } }); break; - case VM: { _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); @@ -151,7 +149,15 @@ { if (AutoCreate) { - createVMBroker(port); + if (AutoCreate) + { + createVMBroker(port); + } + else + { + throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port + + " does not exist. Auto create disabled.", null); + } } else { @@ -271,8 +277,7 @@ } AMQVMBrokerCreationException amqbce = - new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null); - amqbce.initCause(e); + new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", e); throw amqbce; } @@ -282,16 +287,17 @@ public static void killAllVMBrokers() { _logger.info("Killing all VM Brokers"); - _acceptor.unbindAll(); - - Iterator keys = _inVmPipeAddress.keySet().iterator(); - - while (keys.hasNext()) + if (_acceptor != null) { - int id = (Integer) keys.next(); - _inVmPipeAddress.remove(id); + _acceptor.unbindAll(); } - + synchronized (_inVmPipeAddress) + { + _inVmPipeAddress.clear(); + } + _acceptor = null; + _currentInstance = -1; + _currentVMPort = -1; } public static void killVMBroker(int port) Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java Fri Feb 8 02:09:37 2008 @@ -22,15 +22,12 @@ import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.transport.vmpipe.QpidVmPipeConnector; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; - import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.PoolingFilter; -import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.pool.ReadWriteThreadModel; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,10 +46,10 @@ public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { - final VmPipeConnector ioConnector = new VmPipeConnector(); + final VmPipeConnector ioConnector = new QpidVmPipeConnector(); final IoServiceConfig cfg = ioConnector.getDefaultConfig(); - cfg.setThreadModel(ReadWriteThreadModel.getInstance()); + cfg.setThreadModel(ReadWriteThreadModel.getInstance()); final VmPipeAddress address = new VmPipeAddress(_port); _logger.info("Attempting connection to " + address); Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java Fri Feb 8 02:09:37 2008 @@ -33,6 +33,7 @@ */ public static final String OPTIONS_RETRY = "retries"; public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout"; + public static final String OPTIONS_CONNECT_DELAY = "connectdelay"; public static final int DEFAULT_PORT = 5672; public static final String TCP = "tcp"; Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Fri Feb 8 02:09:37 2008 @@ -21,6 +21,7 @@ package org.apache.qpid.jms; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ProtocolVersion; import java.util.List; @@ -43,6 +44,7 @@ public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange"; public static final byte URL_0_8 = 1; public static final byte URL_0_10 = 2; + public static final String OPTIONS_PROTOCOL_VERSION = "protocolVersion"; byte getURLVersion(); @@ -91,4 +93,6 @@ AMQShortString getTemporaryQueueExchangeName(); AMQShortString getTemporaryTopicExchangeName(); + + ProtocolVersion getProtocolVersion(); } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java Fri Feb 8 02:09:37 2008 @@ -22,7 +22,6 @@ import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionURL; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,34 +34,22 @@ /** The default number of times to retry each server */ public static final int DEFAULT_SERVER_RETRIES = 0; - /** - * The index into the hostDetails array of the broker to which we are connected - */ + /** The index into the hostDetails array of the broker to which we are connected */ private int _currentBrokerIndex = -1; - /** - * The number of times to retry connecting for each server - */ + /** The number of times to retry connecting for each server */ private int _serverRetries; - /** - * The current number of retry attempts made - */ + /** The current number of retry attempts made */ private int _currentServerRetry; - /** - * The number of times to cycle through the servers - */ + /** The number of times to cycle through the servers */ private int _cycleRetries; - /** - * The current number of cycles performed. - */ + /** The current number of cycles performed. */ private int _currentCycleRetries; - /** - * Array of BrokerDetail used to make connections. - */ + /** Array of BrokerDetail used to make connections. */ private ConnectionURL _connectionDetails; public FailoverRoundRobinServers(ConnectionURL connectionDetails) @@ -128,6 +115,8 @@ public BrokerDetails getNextBrokerDetails() { + boolean doDelay = false; + if (_currentBrokerIndex == (_connectionDetails.getBrokerCount() - 1)) { if (_currentServerRetry < _serverRetries) @@ -143,6 +132,7 @@ else { _logger.info("Retrying " + _connectionDetails.getBrokerDetails(_currentBrokerIndex)); + doDelay=true; } _currentServerRetry++; @@ -175,6 +165,7 @@ else { _logger.info("Retrying " + _connectionDetails.getBrokerDetails(_currentBrokerIndex)); + doDelay=true; } _currentServerRetry++; @@ -189,7 +180,28 @@ } } - return _connectionDetails.getBrokerDetails(_currentBrokerIndex); + BrokerDetails broker = _connectionDetails.getBrokerDetails(_currentBrokerIndex); + + String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY); + if (delayStr != null && doDelay) + { + Long delay = Long.parseLong(delayStr); + _logger.info("Delay between connect retries:" + delay); + try + { + Thread.sleep(delay); + } + catch (InterruptedException ie) + { + return null; + } + } + else + { + _logger.info("No delay between connect retries, use tcp://host:port?connectdelay='value' to enable."); + } + + return broker; } public void setBroker(BrokerDetails broker) Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java Fri Feb 8 02:09:37 2008 @@ -22,25 +22,23 @@ import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionURL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FailoverSingleServer implements FailoverMethod { + private static final Logger _logger = LoggerFactory.getLogger(FailoverSingleServer.class); + /** The default number of times to rety a conection to this server */ public static final int DEFAULT_SERVER_RETRIES = 1; - /** - * The details of the Single Server - */ + /** The details of the Single Server */ private BrokerDetails _brokerDetail; - /** - * The number of times to retry connecting to the sever - */ + /** The number of times to retry connecting to the sever */ private int _retries; - /** - * The current number of attempts made to the server - */ + /** The current number of attempts made to the server */ private int _currentRetries; @@ -78,7 +76,7 @@ public BrokerDetails getCurrentBrokerDetails() { - return _brokerDetail; + return _brokerDetail; } public BrokerDetails getNextBrokerDetails() @@ -91,11 +89,29 @@ { if (_currentRetries < _retries) { - _currentRetries ++; + _currentRetries++; } + } + - return _brokerDetail; + String delayStr = _brokerDetail.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY); + if (delayStr != null && _currentRetries != 1) + { + Long delay = Long.parseLong(delayStr); + _logger.info("Delay between connect retries:" + delay); + try + { + + Thread.sleep(delay); + } + catch (InterruptedException ie) + { + _logger.info("No delay between connect retries, use tcp://host:port?connectdelay='value' to enable."); + return null; + } } + + return _brokerDetail; } public void setBroker(BrokerDetails broker) @@ -138,10 +154,10 @@ public String toString() { - return "SingleServer:\n"+ - "Max Retries:"+_retries+ - "\nCurrent Retry:"+_currentRetries+ - "\n"+_brokerDetail+"\n"; + return "SingleServer:\n" + + "Max Retries:" + _retries + + "\nCurrent Retry:" + _currentRetries + + "\n" + _brokerDetail + "\n"; } } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Fri Feb 8 02:09:37 2008 @@ -99,8 +99,6 @@ _logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL)); } - - createConnectionFactories(data, environment); createDestinations(data, environment); Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java Fri Feb 8 02:09:37 2008 @@ -126,7 +126,7 @@ _logger.info("Consuming messages"); for (int i = 0; i < NUM_MESSAGES; i++) { - Message msg = consumer.receive(1500); + Message msg = consumer.receive(3000); assertNotNull("Message should not be null", msg); assertTrue("Message should be a text message", msg instanceof TextMessage); assertEquals("Message content does not match expected", Integer.toString(i), ((TextMessage) msg).getText()); Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java Fri Feb 8 02:09:37 2008 @@ -20,13 +20,17 @@ */ package org.apache.qpid.client; +import junit.framework.TestCase; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.testutil.QpidTestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -34,7 +38,9 @@ import javax.jms.Queue; import javax.jms.Session; import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java Fri Feb 8 02:09:37 2008 @@ -20,13 +20,17 @@ */ package org.apache.qpid.client; +import junit.framework.TestCase; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.testutil.QpidTestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -34,6 +38,9 @@ import javax.jms.Queue; import javax.jms.Session; import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; + +import java.util.Hashtable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java Fri Feb 8 02:09:37 2008 @@ -29,7 +29,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.*; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.spi.InitialContextFactory; @@ -65,12 +74,15 @@ private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(2); // all messages Sent Lock private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(2); // all messages Sent Lock private final CountDownLatch _allFirstMessagesSent010 = new CountDownLatch(MSG_COUNT); // all messages Sent Lock - private final CountDownLatch _allSecondMessagesSent010 = new CountDownLatch(MSG_COUNT); // all messages Sent Lock + private final CountDownLatch _allSecondMessagesSent010 = new CountDownLatch(MSG_COUNT); // all messages Sent Lock + + private String oldImmediatePrefetch; protected void setUp() throws Exception { super.setUp(); + oldImmediatePrefetch = System.getProperty(AMQSession.IMMEDIATE_PREFETCH); System.setProperty(AMQSession.IMMEDIATE_PREFETCH, "true"); _clientConnection = getConnection("guest", "guest"); @@ -109,8 +121,12 @@ { _clientConnection.close(); - _producerConnection.close(); super.tearDown(); + if (oldImmediatePrefetch == null) + { + oldImmediatePrefetch = AMQSession.IMMEDIATE_PREFETCH_DEFAULT; + } + System.setProperty(AMQSession.IMMEDIATE_PREFETCH, oldImmediatePrefetch); } public void testAsynchronousRecieve() @@ -238,7 +254,7 @@ try { - _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS); + _allSecondMessagesSent.await(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java Fri Feb 8 02:09:37 2008 @@ -37,7 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener +public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener { private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseMethodHandlerNoCloseOk.class); @@ -48,14 +48,15 @@ return _handler; } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) + public void methodReceived(AMQStateManager stateManager, ChannelCloseBody method, int channelId) throws AMQException { _logger.debug("ChannelClose method received"); - ChannelCloseBody method = (ChannelCloseBody) evt.getMethod(); + final AMQProtocolSession session = stateManager.getProtocolSession(); - AMQConstant errorCode = AMQConstant.getConstant(method.replyCode); - AMQShortString reason = method.replyText; + + AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode()); + AMQShortString reason = method.getReplyText(); if (_logger.isDebugEnabled()) { _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason); @@ -95,6 +96,6 @@ } - protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason)); + session.channelClosed(channelId, errorCode, String.valueOf(reason)); } } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java Fri Feb 8 02:09:37 2008 @@ -25,17 +25,13 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.handler.ClientMethodDispatcherImpl; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelOpenBody; -import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.framing.*; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; @@ -52,6 +48,9 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; public class ChannelCloseTest extends TestCase implements ExceptionListener, ConnectionListener { @@ -135,8 +134,11 @@ /* close channel and send guff then send ok no errors + REMOVE TEST - The behaviour after server has sent close is undefined. + the server should be free to fail as it may wish to reclaim its resources + immediately after close. */ - public void testSendingMethodsAfterClose() throws Exception + /*public void testSendingMethodsAfterClose() throws Exception { try { @@ -158,6 +160,17 @@ // Set StateManager to manager that ignores Close-oks AMQProtocolSession protocolSession = ((AMQConnection) _connection).getProtocolHandler().getProtocolSession(); + + MethodDispatcher d = protocolSession.getMethodDispatcher(); + + MethodDispatcher wrappedDispatcher = (MethodDispatcher) + Proxy.newProxyInstance(d.getClass().getClassLoader(), + d.getClass().getInterfaces(), + new MethodDispatcherProxyHandler( + (ClientMethodDispatcherImpl) d)); + + protocolSession.setMethodDispatcher(wrappedDispatcher); + AMQStateManager newStateManager = new NoCloseOKStateManager(protocolSession); newStateManager.changeState(oldStateManager.getCurrentState()); @@ -247,7 +260,7 @@ } } } - +*/ private void createChannelAndTest(int channel) throws FailoverException { // Create A channel @@ -274,10 +287,9 @@ private void sendClose(int channel) { - AMQFrame frame = - ChannelCloseOkBody.createAMQFrame(channel, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion()); + ChannelCloseOkBody body = + ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelCloseOkBody(); + AMQFrame frame = body.generateFrame(channel); ((AMQConnection) _connection).getProtocolHandler().writeFrame(frame); } @@ -335,35 +347,43 @@ private void declareExchange(int channelId, String _type, String _name, boolean nowait) throws AMQException, FailoverException { - AMQFrame exchangeDeclare = - ExchangeDeclareBody.createAMQFrame(channelId, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null, // arguments - false, // autoDelete - false, // durable - new AMQShortString(_name), // exchange - false, // internal - nowait, // nowait - true, // passive - 0, // ticket - new AMQShortString(_type)); // type + ExchangeDeclareBody body = + ((AMQConnection) _connection).getProtocolHandler() + .getMethodRegistry() + .createExchangeDeclareBody(0, + new AMQShortString(_name), + new AMQShortString(_type), + true, + false, + false, + false, + nowait, + null); + AMQFrame exchangeDeclare = body.generateFrame(channelId); + AMQProtocolHandler protocolHandler = ((AMQConnection) _connection).getProtocolHandler(); + + + if (nowait) + { + protocolHandler.writeFrame(exchangeDeclare); + } + else + { + protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT); + } + +// return null; +// } +// }, (AMQConnection)_connection).execute(); - if (nowait) - { - ((AMQConnection) _connection).getProtocolHandler().writeFrame(exchangeDeclare); - } - else - { - ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, - SYNC_TIMEOUT); - } } private void createChannel(int channelId) throws AMQException, FailoverException { - ((AMQConnection) _connection).getProtocolHandler().syncWrite(ChannelOpenBody.createAMQFrame(channelId, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null), // outOfBand + ChannelOpenBody body = + ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelOpenBody(null); + + ((AMQConnection) _connection).getProtocolHandler().syncWrite(body.generateFrame(channelId), // outOfBand ChannelOpenOkBody.class); } @@ -392,4 +412,28 @@ public void failoverComplete() { } + + private static final class MethodDispatcherProxyHandler implements InvocationHandler + { + private final ClientMethodDispatcherImpl _underlyingDispatcher; + private final ChannelCloseMethodHandlerNoCloseOk _handler = ChannelCloseMethodHandlerNoCloseOk.getInstance(); + + + public MethodDispatcherProxyHandler(ClientMethodDispatcherImpl dispatcher) + { + _underlyingDispatcher = dispatcher; + } + + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable + { + if(method.getName().equals("dispatchChannelClose")) + { + _handler.methodReceived(_underlyingDispatcher.getStateManager(), + (ChannelCloseBody) args[0], (Integer)args[1]); + } + Method dispatcherMethod = _underlyingDispatcher.getClass().getMethod(method.getName(), method.getParameterTypes()); + return dispatcherMethod.invoke(_underlyingDispatcher, args); + + } + } } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java Fri Feb 8 02:09:37 2008 @@ -59,49 +59,7 @@ super(protocolSession); } - protected void registerListeners() - { - Map frame2handlerMap = new HashMap(); - - // we need to register a map for the null (i.e. all state) handlers otherwise you get - // a stack overflow in the handler searching code when you present it with a frame for which - // no handlers are registered - // - _state2HandlersMap.put(null, frame2handlerMap); - - frame2handlerMap = new HashMap(); - frame2handlerMap.put(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance()); - frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); - _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap); - - frame2handlerMap = new HashMap(); - frame2handlerMap.put(ConnectionTuneBody.class, ConnectionTuneMethodHandler.getInstance()); - frame2handlerMap.put(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance()); - frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); - _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap); - - frame2handlerMap = new HashMap(); - frame2handlerMap.put(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance()); - frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); - _state2HandlersMap.put(AMQState.CONNECTION_NOT_OPENED, frame2handlerMap); - - // - // ConnectionOpen handlers - // - frame2handlerMap = new HashMap(); - // Use Test Handler for Close methods to not send Close-OKs - frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandlerNoCloseOk.getInstance()); - - frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance()); - frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); - frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance()); - frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance()); - frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance()); - frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance()); - frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance()); - frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance()); - _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap); - } + } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=619823&r1=619822&r2=619823&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Fri Feb 8 02:09:37 2008 @@ -41,6 +41,14 @@ import javax.jms.TextMessage; import javax.jms.TopicSubscriber; +/** + * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as + * a static on a base test helper class, e.g. TestUtils. + * + * @todo Code to create test end-points using session per connection, or all sessions on one connection, to be factored + * out to make creating this test variation simpler. Want to make this variation available through LocalCircuit, + * driven by the test model. + */ public class DurableSubscriptionTest extends QpidTestCase { private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class); @@ -113,12 +121,26 @@ con.close(); } - public void testDurability() throws Exception + public void testDurabilityAUTOACK() throws AMQException, JMSException, URLSyntaxException { + durabilityImpl(Session.AUTO_ACKNOWLEDGE); + } - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - AMQTopic topic = new AMQTopic(con, "MyDurableSubscriptionTestTopic"); - Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + public void testDurabilityNOACKSessionPerConnection() throws AMQException, JMSException, URLSyntaxException + { + durabilityImplSessionPerConnection(AMQSession.NO_ACKNOWLEDGE); + } + + public void testDurabilityAUTOACKSessionPerConnection() throws AMQException, JMSException, URLSyntaxException + { + durabilityImplSessionPerConnection(Session.AUTO_ACKNOWLEDGE); + } + + private void durabilityImpl(int ackMode) throws AMQException, JMSException, URLSyntaxException + { + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); + AMQTopic topic = new AMQTopic(con, "MyTopic"); + Session session1 = con.createSession(false, ackMode); MessageConsumer consumer1 = session1.createConsumer(topic); Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); @@ -144,10 +166,83 @@ consumer2.close(); - Session session3 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + producer.send(session1.createTextMessage("B")); + + _logger.info("Receive message on consumer 1 :expecting B"); + msg = consumer1.receive(500); + assertNotNull("Consumer 1 should get message 'B'.", msg); + assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText()); + _logger.info("Receive message on consumer 1 :expecting null"); + msg = consumer1.receive(500); + assertNull("There should be no more messages for consumption on consumer1.", msg); + + Session session3 = con.createSession(false, ackMode); MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription"); - producer.send(session1.createTextMessage("B")); + _logger.info("Receive message on consumer 3 :expecting B"); + msg = consumer3.receive(500); + assertNotNull("Consumer 3 should get message 'B'.", msg); + assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText()); + _logger.info("Receive message on consumer 3 :expecting null"); + msg = consumer3.receive(500); + assertNull("There should be no more messages for consumption on consumer3.", msg); + + consumer1.close(); + consumer3.close(); + + con.close(); + } + + private void durabilityImplSessionPerConnection(int ackMode) throws AMQException, JMSException, URLSyntaxException + { + Message msg; + + // Create producer. + AMQConnection con0 = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); + con0.start(); + Session session0 = con0.createSession(false, ackMode); + + AMQTopic topic = new AMQTopic(con0, "MyTopic"); + + Session sessionProd = con0.createSession(false, ackMode); + MessageProducer producer = sessionProd.createProducer(topic); + + // Create consumer 1. + AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); + con1.start(); + Session session1 = con1.createSession(false, ackMode); + + MessageConsumer consumer1 = session0.createConsumer(topic); + + // Create consumer 2. + AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); + con2.start(); + Session session2 = con2.createSession(false, ackMode); + + TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); + + // Send message and check that both consumers get it and only it. + producer.send(session0.createTextMessage("A")); + + msg = consumer1.receive(500); + assertNotNull("Message should be available", msg); + assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText()); + msg = consumer1.receive(500); + assertNull("There should be no more messages for consumption on consumer1.", msg); + + msg = consumer2.receive(); + assertNotNull(msg); + assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText()); + msg = consumer2.receive(500); + assertNull("There should be no more messages for consumption on consumer2.", msg); + + // Detach the durable subscriber. + consumer2.close(); + session2.close(); + con2.close(); + + // Send message and receive on open consumer. + producer.send(session0.createTextMessage("B")); _logger.info("Receive message on consumer 1 :expecting B"); msg = consumer1.receive(100); @@ -156,14 +251,26 @@ msg = consumer1.receive(100); assertEquals(null, msg); + // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed. + AMQConnection con3 = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); + con3.start(); + Session session3 = con3.createSession(false, ackMode); + + TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription"); + _logger.info("Receive message on consumer 3 :expecting B"); msg = consumer3.receive(100); assertEquals("B", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 3 :expecting null"); - msg = consumer3.receive(100); - assertEquals(null, msg); + msg = consumer3.receive(500); + assertNull("There should be no more messages for consumption on consumer3.", msg); - con.close(); + consumer1.close(); + consumer3.close(); + + con0.close(); + con1.close(); + con3.close(); } public static junit.framework.Test suite()