Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 10823 invoked from network); 19 Sep 2006 22:39:16 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 19 Sep 2006 22:39:16 -0000 Received: (qmail 35429 invoked by uid 500); 19 Sep 2006 22:39:16 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 35407 invoked by uid 500); 19 Sep 2006 22:39:16 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Delivered-To: moderator for qpid-commits@incubator.apache.org Received: (qmail 92169 invoked by uid 99); 19 Sep 2006 22:09:09 -0000 X-ASF-Spam-Status: No, hits=-9.8 required=5.0 tests=ALL_TRUSTED,NO_REAL_NAME Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r447994 [20/46] - in /incubator/qpid/trunk/qpid: ./ cpp/ cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/ cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/ cpp/common/concurrent/ cpp/common/concur... Date: Tue, 19 Sep 2006 22:07:25 -0000 To: qpid-commits@incubator.apache.org From: rhs@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20060919220808.407061A985C@eris.apache.org> X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/TopicSubscriberAdaptor.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/TopicSubscriberAdaptor.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/TopicSubscriberAdaptor.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/TopicSubscriberAdaptor.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,91 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; + +/** + * Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract + * + */ +class TopicSubscriberAdaptor implements TopicSubscriber +{ + private final Topic _topic; + private final MessageConsumer _consumer; + private final boolean _noLocal; + + TopicSubscriberAdaptor(Topic topic, MessageConsumer consumer, boolean noLocal) + { + _topic = topic; + _consumer = consumer; + _noLocal = noLocal; + } + TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer) + { + this(topic, consumer, consumer.isNoLocal()); + } + public Topic getTopic() throws JMSException + { + return _topic; + } + + public boolean getNoLocal() throws JMSException + { + return _noLocal; + } + + public String getMessageSelector() throws JMSException + { + return _consumer.getMessageSelector(); + } + + public MessageListener getMessageListener() throws JMSException + { + return _consumer.getMessageListener(); + } + + public void setMessageListener(MessageListener messageListener) throws JMSException + { + _consumer.setMessageListener(messageListener); + } + + public Message receive() throws JMSException + { + return _consumer.receive(); + } + + public Message receive(long l) throws JMSException + { + return _consumer.receive(l); + } + + public Message receiveNoWait() throws JMSException + { + return _consumer.receiveNoWait(); + } + + public void close() throws JMSException + { + _consumer.close(); + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/TopicSubscriberAdaptor.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverException.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverException.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverException.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverException.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,30 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.failover; + +/** + * This exception is thrown when failover is taking place and we need to let other + * parts of the client know about this. + */ +public class FailoverException extends RuntimeException +{ + public FailoverException(String message) + { + super(message); + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverException.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverHandler.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverHandler.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverHandler.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,180 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.failover; + +import org.apache.mina.common.IoSession; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.failover.FailoverState; +import org.apache.qpid.AMQDisconnectedException; +import org.apache.log4j.Logger; + +import java.util.concurrent.CountDownLatch; + +/** + * When failover is required, we need a separate thread to handle the establishment of the new connection and + * the transfer of subscriptions. + *

+ * The reason this needs to be a separate thread is because you cannot do this work inside the MINA IO processor + * thread. One significant task is the connection setup which involves a protocol exchange until a particular state + * is achieved. However if you do this in the MINA thread, you have to block until the state is achieved which means + * the IO processor is not able to do anything at all. + */ +public class FailoverHandler implements Runnable +{ + private static final Logger _logger = Logger.getLogger(FailoverHandler.class); + + private final IoSession _session; + private AMQProtocolHandler _amqProtocolHandler; + + /** + * Used where forcing the failover host + */ + private String _host; + + /** + * Used where forcing the failover port + */ + private int _port; + + public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session) + { + _amqProtocolHandler = amqProtocolHandler; + _session = session; + } + + public void run() + { + if (Thread.currentThread().isDaemon()) + { + throw new IllegalStateException("FailoverHandler must run on a non-daemon thread."); + } + //Thread.currentThread().setName("Failover Thread"); + + _amqProtocolHandler.setFailoverLatch(new CountDownLatch(1)); + + // We wake up listeners. If they can handle failover, they will extend the + // FailoverSupport class and will in turn block on the latch until failover + // has completed before retrying the operation + _amqProtocolHandler.propagateExceptionToWaiters(new FailoverException("Failing over about to start")); + + // Since failover impacts several structures we protect them all with a single mutex. These structures + // are also in child objects of the connection. This allows us to manipulate them without affecting + // client code which runs in a separate thread. + synchronized (_amqProtocolHandler.getConnection().getFailoverMutex()) + { + _logger.info("Starting failover process"); + + // We switch in a new state manager temporarily so that the interaction to get to the "connection open" + // state works, without us having to terminate any existing "state waiters". We could theoretically + // have a state waiter waiting until the connection is closed for some reason. Or in future we may have + // a slightly more complex state model therefore I felt it was worthwhile doing this. + AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager(); + _amqProtocolHandler.setStateManager(new AMQStateManager()); + if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null)) + { + _amqProtocolHandler.setStateManager(existingStateManager); + if (_host != null) + { + _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Redirect was vetoed by client")); + } + else + { + _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Failover was vetoed by client")); + } + _amqProtocolHandler.getFailoverLatch().countDown(); + _amqProtocolHandler.setFailoverLatch(null); + return; + } + boolean failoverSucceeded; + // when host is non null we have a specified failover host otherwise we all the client to cycle through + // all specified hosts + + // if _host has value then we are performing a redirect. + if (_host != null) + { + failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection(_host, _port, _amqProtocolHandler.isUseSSL()); + } + else + { + failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection(); + } + if (!failoverSucceeded) + { + _amqProtocolHandler.setStateManager(existingStateManager); + _amqProtocolHandler.getConnection().exceptionReceived( + new AMQDisconnectedException("Server closed connection and no failover " + + "was successful")); + } + else + { + _amqProtocolHandler.setStateManager(existingStateManager); + try + { + if (_amqProtocolHandler.getConnection().firePreResubscribe()) + { + _logger.info("Resubscribing on new connection"); + _amqProtocolHandler.getConnection().resubscribeSessions(); + } + else + { + _logger.info("Client vetoed automatic resubscription"); + } + _amqProtocolHandler.getConnection().fireFailoverComplete(); + _amqProtocolHandler.setFailoverState(FailoverState.NOT_STARTED); + _logger.info("Connection failover completed successfully"); + } + catch (Exception e) + { + _logger.info("Failover process failed - exception being propagated by protocol handler"); + _amqProtocolHandler.setFailoverState(FailoverState.FAILED); + try + { + _amqProtocolHandler.exceptionCaught(_session, e); + } + catch (Exception ex) + { + _logger.error("Error notifying protocol session of error: " + ex, ex); + } + } + } + } + _amqProtocolHandler.getFailoverLatch().countDown(); + } + + public String getHost() + { + return _host; + } + + public void setHost(String host) + { + _host = host; + } + + public int getPort() + { + return _port; + } + + public void setPort(int port) + { + _port = port; + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverState.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverState.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverState.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverState.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,46 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.failover; + +/** + * Enumeration of failover states. Used to handle failover from within AMQProtocolHandler where MINA events need to be + * dealt with and can happen during failover. + */ +public final class FailoverState +{ + private final String _state; + + /** Failover has not yet been attempted on this connection */ + public static final FailoverState NOT_STARTED = new FailoverState("NOT STARTED"); + + /** Failover has been requested on this connection but has not completed */ + public static final FailoverState IN_PROGRESS = new FailoverState("IN PROGRESS"); + + /** Failover has been attempted and failed */ + public static final FailoverState FAILED = new FailoverState("FAILED"); + + private FailoverState(String state) + { + _state = state; + } + + public String toString() + { + return "FailoverState: " + _state; + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverState.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverSupport.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverSupport.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverSupport.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverSupport.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,63 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.failover; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.failover.FailoverException; + +import javax.jms.JMSException; + +public abstract class FailoverSupport +{ + private static final Logger _log = Logger.getLogger(FailoverSupport.class); + + public Object execute(AMQConnection con) throws JMSException + { + // We wait until we are not in the middle of failover before acquiring the mutex and then proceeding. + // Any method that can potentially block for any reason should use this class so that deadlock will not + // occur. The FailoverException is propagated by the AMQProtocolHandler to any listeners (e.g. frame listeners) + // that might be causing a block. When that happens, the exception is caught here and the mutex is released + // before waiting for the failover to complete (either successfully or unsuccessfully). + while (true) + { + try + { + con.blockUntilNotFailingOver(); + } + catch (InterruptedException e) + { + _log.info("Interrupted: " + e, e); + return null; + } + synchronized (con.getFailoverMutex()) + { + try + { + return operation(); + } + catch (FailoverException e) + { + _log.info("Failover exception caught during operation"); + } + } + } + } + + protected abstract Object operation() throws JMSException; +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverSupport.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,47 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicDeliverBody; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.message.UnprocessedMessage; + +public class BasicDeliverMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(BasicDeliverMethodHandler.class); + + private static final BasicDeliverMethodHandler _instance = new BasicDeliverMethodHandler(); + + public static BasicDeliverMethodHandler getInstance() + { + return _instance; + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + final UnprocessedMessage msg = new UnprocessedMessage(); + msg.deliverBody = (BasicDeliverBody) evt.getMethod(); + msg.channelId = evt.getChannelId(); + _logger.debug("New JmsDeliver method received"); + evt.getProtocolSession().unprocessedMessageReceived(msg); + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicReturnMethodHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicReturnMethodHandler.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicReturnMethodHandler.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,49 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.BasicReturnBody; + +public class BasicReturnMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(BasicReturnMethodHandler.class); + + private static final BasicReturnMethodHandler _instance = new BasicReturnMethodHandler(); + + public static BasicReturnMethodHandler getInstance() + { + return _instance; + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + _logger.debug("New JmsBounce method received"); + final UnprocessedMessage msg = new UnprocessedMessage(); + msg.deliverBody = null; + msg.bounceBody = (BasicReturnBody) evt.getMethod(); + msg.channelId = evt.getChannelId(); + + evt.getProtocolSession().unprocessedMessageReceived(msg); + } +} \ No newline at end of file Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicReturnMethodHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,79 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQChannelClosedException; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelCloseOkBody; + +public class ChannelCloseMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ChannelCloseMethodHandler.class); + + private static ChannelCloseMethodHandler _handler = new ChannelCloseMethodHandler(); + + public static ChannelCloseMethodHandler getInstance() + { + return _handler; + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + _logger.debug("ChannelClose method received"); + ChannelCloseBody method = (ChannelCloseBody) evt.getMethod(); + + int errorCode = method.replyCode; + String reason = method.replyText; + if (_logger.isDebugEnabled()) + { + _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason); + } + + AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId()); + evt.getProtocolSession().writeFrame(frame); + if (errorCode != AMQConstant.REPLY_SUCCESS.getCode()) + { + _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason); + if (errorCode == AMQConstant.NO_CONSUMERS.getCode()) + { + throw new AMQNoConsumersException("Error: " + reason, null); + } + else + { + if (errorCode == AMQConstant.NO_ROUTE.getCode()) + { + throw new AMQNoRouteException("Error: " + reason, null); + } + else + { + throw new AMQChannelClosedException(errorCode, "Error: " + reason); + } + } + } + evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason); + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,43 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.log4j.Logger; + +public class ChannelCloseOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ChannelCloseOkMethodHandler.class); + + private static final ChannelCloseOkMethodHandler _instance = new ChannelCloseOkMethodHandler(); + + public static ChannelCloseOkMethodHandler getInstance() + { + return _instance; + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId()); + + //todo this should do the closure + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,46 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.ChannelFlowOkBody; + +public class ChannelFlowOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ChannelFlowOkMethodHandler.class); + private static final ChannelFlowOkMethodHandler _instance = new ChannelFlowOkMethodHandler(); + + public static ChannelFlowOkMethodHandler getInstance() + { + return _instance; + } + + private ChannelFlowOkMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + ChannelFlowOkBody method = (ChannelFlowOkBody) evt.getMethod(); + _logger.debug("Received Channel.Flow-Ok message, active = " + method.active); + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,89 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQConnectionClosedException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.AMQAuthenticationException; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionCloseOkBody; + +public class ConnectionCloseMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class); + + private static ConnectionCloseMethodHandler _handler = new ConnectionCloseMethodHandler(); + + public static ConnectionCloseMethodHandler getInstance() + { + return _handler; + } + + private ConnectionCloseMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + _logger.info("ConnectionClose frame received"); + ConnectionCloseBody method = (ConnectionCloseBody) evt.getMethod(); + + // does it matter + //stateManager.changeState(AMQState.CONNECTION_CLOSING); + + int errorCode = method.replyCode; + String reason = method.replyText; + + // TODO: check whether channel id of zero is appropriate + evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0)); + + if (errorCode != 200) + { + if(errorCode == AMQConstant.NOT_ALLOWED.getCode()) + { + _logger.info("Authentication Error:"+Thread.currentThread().getName()); + + evt.getProtocolSession().closeProtocolSession(); + + //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state. + stateManager.changeState(AMQState.CONNECTION_NOT_STARTED); + + throw new AMQAuthenticationException(errorCode, reason); + } + else + { + _logger.info("Connection close received with error code " + errorCode); + + + throw new AMQConnectionClosedException(errorCode, "Error: " + reason); + } + } + + // this actually closes the connection in the case where it is not an error. + + evt.getProtocolSession().closeProtocolSession(); + + stateManager.changeState(AMQState.CONNECTION_CLOSED); + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,52 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.ConnectionOpenOkBody; + +public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener +{ + + private static final Logger _logger = Logger.getLogger(ConnectionOpenOkMethodHandler.class); + + private static final ConnectionOpenOkMethodHandler _instance = new ConnectionOpenOkMethodHandler(); + + public static ConnectionOpenOkMethodHandler getInstance() + { + return _instance; + } + + private ConnectionOpenOkMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + AMQProtocolSession session = evt.getProtocolSession(); + ConnectionOpenOkBody method = (ConnectionOpenOkBody) evt.getMethod(); + stateManager.changeState(AMQState.CONNECTION_OPEN); + } + +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,65 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.ConnectionRedirectBody; + +public class ConnectionRedirectMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ConnectionRedirectMethodHandler.class); + + private static final int DEFAULT_REDIRECT_PORT = 5672; + + private static ConnectionRedirectMethodHandler _handler = new ConnectionRedirectMethodHandler(); + + public static ConnectionRedirectMethodHandler getInstance() + { + return _handler; + } + + private ConnectionRedirectMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + _logger.info("ConnectionRedirect frame received"); + ConnectionRedirectBody method = (ConnectionRedirectBody) evt.getMethod(); + + // the host is in the form hostname:port with the port being optional + int portIndex = method.host.indexOf(':'); + String host; + int port; + if (portIndex == -1) + { + host = method.host; + port = DEFAULT_REDIRECT_PORT; + } + else + { + host = method.host.substring(0, portIndex); + port = Integer.parseInt(method.host.substring(portIndex + 1)); + } + evt.getProtocolSession().failover(host, port); + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,64 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.protocol.AMQMethodEvent; + +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + +public class ConnectionSecureMethodHandler implements StateAwareMethodListener +{ + private static final ConnectionSecureMethodHandler _instance = new ConnectionSecureMethodHandler(); + + public static ConnectionSecureMethodHandler getInstance() + { + return _instance; + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + SaslClient client = evt.getProtocolSession().getSaslClient(); + if (client == null) + { + throw new AMQException("No SASL client set up - cannot proceed with authentication"); + } + + ConnectionSecureBody body = (ConnectionSecureBody) evt.getMethod(); + + try + { + // Evaluate server challenge + byte[] response = client.evaluateChallenge(body.challenge); + AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(), response); + evt.getProtocolSession().writeFrame(responseFrame); + } + catch (SaslException e) + { + throw new AMQException("Error processing SASL challenge: " + e, e); + } + + + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,184 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.security.AMQCallbackHandler; +import org.apache.qpid.client.security.CallbackHandlerRegistry; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionStartOkBody; +import org.apache.qpid.framing.FieldTable; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import java.io.UnsupportedEncodingException; +import java.util.HashSet; +import java.util.StringTokenizer; + +public class ConnectionStartMethodHandler implements StateAwareMethodListener +{ + + private static final Logger _log = Logger.getLogger(ConnectionStartMethodHandler.class); + + private static final ConnectionStartMethodHandler _instance = new ConnectionStartMethodHandler(); + + public static ConnectionStartMethodHandler getInstance() + { + return _instance; + } + + private ConnectionStartMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + ConnectionStartBody body = (ConnectionStartBody) evt.getMethod(); + + try + { + // the mechanism we are going to use + String mechanism; + if (body.mechanisms == null) + { + throw new AMQException("mechanism not specified in ConnectionStart method frame"); + } + else + { + mechanism = chooseMechanism(body.mechanisms); + } + + if (mechanism == null) + { + throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms)); + } + + final AMQProtocolSession ps = evt.getProtocolSession(); + byte[] saslResponse; + try + { + SaslClient sc = Sasl.createSaslClient(new String[]{mechanism}, + null, "AMQP", "localhost", + null, createCallbackHandler(mechanism, ps)); + if (sc == null) + { + throw new AMQException("Client SASL configuration error: no SaslClient could be created for mechanism " + + mechanism + ". Please ensure all factories are registered. See DynamicSaslRegistrar for " + + " details of how to register non-standard SASL client providers."); + } + ps.setSaslClient(sc); + saslResponse = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null); + } + catch (SaslException e) + { + ps.setSaslClient(null); + throw new AMQException("Unable to create SASL client: " + e, e); + } + + if (body.locales == null) + { + throw new AMQException("Locales is not defined in Connection Start method"); + } + final String locales = new String(body.locales, "utf8"); + final StringTokenizer tokenizer = new StringTokenizer(locales, " "); + String selectedLocale = null; + if (tokenizer.hasMoreTokens()) + { + selectedLocale = tokenizer.nextToken(); + } + else + { + throw new AMQException("No locales sent from server, passed: " + locales); + } + + stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); + FieldTable clientProperties = new FieldTable(); + clientProperties.put("instance", ps.getClientID()); + clientProperties.put("product", "Qpid"); + clientProperties.put("version", "1.0"); + clientProperties.put("platform", getFullSystemInfo()); + ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism, + saslResponse, selectedLocale)); + } + catch (UnsupportedEncodingException e) + { + throw new AMQException(_log, "Unable to decode data: " + e, e); + } + } + + private String getFullSystemInfo() + { + StringBuffer fullSystemInfo = new StringBuffer(); + fullSystemInfo.append(System.getProperty("java.runtime.name")); + fullSystemInfo.append(", " + System.getProperty("java.runtime.version")); + fullSystemInfo.append(", " + System.getProperty("java.vendor")); + fullSystemInfo.append(", " + System.getProperty("os.arch")); + fullSystemInfo.append(", " + System.getProperty("os.name")); + fullSystemInfo.append(", " + System.getProperty("os.version")); + fullSystemInfo.append(", " + System.getProperty("sun.os.patch.level")); + + return fullSystemInfo.toString(); + } + + private String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException + { + final String mechanisms = new String(availableMechanisms, "utf8"); + StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); + HashSet mechanismSet = new HashSet(); + while (tokenizer.hasMoreTokens()) + { + mechanismSet.add(tokenizer.nextToken()); + } + + String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms(); + StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " "); + while (prefTokenizer.hasMoreTokens()) + { + String mech = prefTokenizer.nextToken(); + if (mechanismSet.contains(mech)) + { + return mech; + } + } + return null; + } + + private AMQCallbackHandler createCallbackHandler(String mechanism, AMQProtocolSession protocolSession) + throws AMQException + { + Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism); + try + { + Object instance = mechanismClass.newInstance(); + AMQCallbackHandler cbh = (AMQCallbackHandler) instance; + cbh.initialise(protocolSession); + return cbh; + } + catch (Exception e) + { + throw new AMQException("Unable to create callback handler: " + e, e); + } + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,79 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.ConnectionTuneParameters; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.ConnectionOpenBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.framing.AMQFrame; + +public class ConnectionTuneMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ConnectionTuneMethodHandler.class); + + private static final ConnectionTuneMethodHandler _instance = new ConnectionTuneMethodHandler(); + + public static ConnectionTuneMethodHandler getInstance() + { + return _instance; + } + + protected ConnectionTuneMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + _logger.debug("ConnectionTune frame received"); + ConnectionTuneBody frame = (ConnectionTuneBody) evt.getMethod(); + AMQProtocolSession session = evt.getProtocolSession(); + + ConnectionTuneParameters params = session.getConnectionTuneParameters(); + if (params == null) + { + params = new ConnectionTuneParameters(); + } + + params.setFrameMax(frame.frameMax); + params.setChannelMax(frame.channelMax); + params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat)); + session.setConnectionTuneParameters(params); + + stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); + session.writeFrame(createTuneOkFrame(evt.getChannelId(), params)); + session.writeFrame(createConnectionOpenFrame(evt.getChannelId(), session.getAMQConnection().getVirtualHost(), null, true)); + } + + protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist) + { + return ConnectionOpenBody.createAMQFrame(channel, path, capabilities, insist); + } + + protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params) + { + return ConnectionTuneOkBody.createAMQFrame(channel, params.getChannelMax(), params.getFrameMax(), params.getHeartbeat()); + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AMQMessage.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AMQMessage.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AMQMessage.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AMQMessage.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,68 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.message; + +import org.apache.qpid.framing.ContentHeaderProperties; +import org.apache.qpid.client.AMQSession; + +public class AMQMessage +{ + protected ContentHeaderProperties _contentHeaderProperties; + + /** + * If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required + */ + protected AMQSession _session; + + protected final long _deliveryTag; + + public AMQMessage(ContentHeaderProperties properties, long deliveryTag) + { + _contentHeaderProperties = properties; + _deliveryTag = deliveryTag; + } + + public AMQMessage(ContentHeaderProperties properties) + { + this(properties, -1); + } + + /** + * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user + * calls acknowledge() + * @param s the AMQ session that delivered this message + */ + public void setAMQSession(AMQSession s) + { + _session = s; + } + + public AMQSession getAMQSession() + { + return _session; + } + + /** + * Get the AMQ message number assigned to this message + * @return the message number + */ + public long getDeliveryTag() + { + return _deliveryTag; + } +} Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AMQMessage.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessage.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessage.java?view=auto&rev=447994 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessage.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessage.java Tue Sep 19 15:06:50 2006 @@ -0,0 +1,677 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.message; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableKeyEnumeration; +import org.apache.commons.collections.map.ReferenceMap; +import org.apache.commons.lang.NotImplementedException; +import org.apache.mina.common.ByteBuffer; + +import javax.jms.Destination; +import javax.jms.JMSException; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.Map; + +public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms.Message +{ + private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap()); + +// protected Map _messageProperties; + + public static final char BOOLEAN_PROPERTY_PREFIX = 'B'; + public static final char BYTE_PROPERTY_PREFIX = 'b'; + public static final char SHORT_PROPERTY_PREFIX = 's'; + public static final char INT_PROPERTY_PREFIX = 'i'; + public static final char LONG_PROPERTY_PREFIX = 'l'; + public static final char FLOAT_PROPERTY_PREFIX = 'f'; + public static final char DOUBLE_PROPERTY_PREFIX = 'd'; + public static final char STRING_PROPERTY_PREFIX = 'S'; + + protected boolean _redelivered; + + protected ByteBuffer _data; + + protected AbstractJMSMessage(ByteBuffer data) + { + super(new BasicContentHeaderProperties()); + _data = data; + if (_data != null) + { + _data.acquire(); + } + } + + protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException + { + this(contentHeader, deliveryTag); + _data = data; + if (_data != null) + { + _data.acquire(); + } + } + + protected AbstractJMSMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) + { + super(contentHeader, deliveryTag); + } + + public String getJMSMessageID() throws JMSException + { + if (getJmsContentHeaderProperties().getMessageId() == null) + { + getJmsContentHeaderProperties().setMessageId("ID:" + _deliveryTag); + } + return getJmsContentHeaderProperties().getMessageId(); + } + + public void setJMSMessageID(String messageId) throws JMSException + { + getJmsContentHeaderProperties().setMessageId(messageId); + } + + public long getJMSTimestamp() throws JMSException + { + return new Long(getJmsContentHeaderProperties().getTimestamp()).longValue(); + } + + public void setJMSTimestamp(long timestamp) throws JMSException + { + getJmsContentHeaderProperties().setTimestamp(timestamp); + } + + public byte[] getJMSCorrelationIDAsBytes() throws JMSException + { + return getJmsContentHeaderProperties().getCorrelationId().getBytes(); + } + + public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException + { + getJmsContentHeaderProperties().setCorrelationId(new String(bytes)); + } + + public void setJMSCorrelationID(String correlationId) throws JMSException + { + getJmsContentHeaderProperties().setCorrelationId(correlationId); + } + + public String getJMSCorrelationID() throws JMSException + { + return getJmsContentHeaderProperties().getCorrelationId(); + } + + public Destination getJMSReplyTo() throws JMSException + { + String replyToEncoding = getJmsContentHeaderProperties().getReplyTo(); + if (replyToEncoding == null) + { + return null; + } + else + { + Destination dest = (Destination) _destinationCache.get(replyToEncoding); + if (dest == null) + { + char destType = replyToEncoding.charAt(0); + if (destType == 'Q') + { + dest = new AMQQueue(replyToEncoding.substring(1)); + } + else if (destType == 'T') + { + dest = new AMQTopic(replyToEncoding.substring(1)); + } + else + { + throw new JMSException("Illegal value in JMS_ReplyTo property: " + replyToEncoding); + } + _destinationCache.put(replyToEncoding, dest); + } + return dest; + } + } + + public void setJMSReplyTo(Destination destination) throws JMSException + { + if (destination == null) + { + throw new IllegalArgumentException("Null destination not allowed"); + } + if (!(destination instanceof AMQDestination)) + { + throw new IllegalArgumentException("ReplyTo destination my be an AMQ destination - passed argument was type " + + destination.getClass()); + } + final AMQDestination amqd = (AMQDestination) destination; + + final String encodedDestination = amqd.getEncodedName(); + _destinationCache.put(encodedDestination, destination); + getJmsContentHeaderProperties().setReplyTo(encodedDestination); + } + + public Destination getJMSDestination() throws JMSException + { + // TODO: implement this once we have sorted out how to figure out the exchange class + throw new NotImplementedException(); + } + + public void setJMSDestination(Destination destination) throws JMSException + { + throw new NotImplementedException(); + } + + public int getJMSDeliveryMode() throws JMSException + { + return getJmsContentHeaderProperties().getDeliveryMode(); + } + + public void setJMSDeliveryMode(int i) throws JMSException + { + getJmsContentHeaderProperties().setDeliveryMode((byte) i); + } + + public boolean getJMSRedelivered() throws JMSException + { + return _redelivered; + } + + public void setJMSRedelivered(boolean b) throws JMSException + { + _redelivered = b; + } + + public String getJMSType() throws JMSException + { + return getMimeType(); + } + + public void setJMSType(String string) throws JMSException + { + throw new JMSException("Cannot set JMS Type - it is implicitly defined based on message type"); + } + + public long getJMSExpiration() throws JMSException + { + return new Long(getJmsContentHeaderProperties().getExpiration()).longValue(); + } + + public void setJMSExpiration(long l) throws JMSException + { + getJmsContentHeaderProperties().setExpiration(l); + } + + public int getJMSPriority() throws JMSException + { + return getJmsContentHeaderProperties().getPriority(); + } + + public void setJMSPriority(int i) throws JMSException + { + getJmsContentHeaderProperties().setPriority((byte) i); + } + + public void clearProperties() throws JMSException + { + if (getJmsContentHeaderProperties().getHeaders() != null) + { + getJmsContentHeaderProperties().getHeaders().clear(); + } + } + + public boolean propertyExists(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + if (getJmsContentHeaderProperties().getHeaders() == null) + { + return false; + } + else + { + // TODO: fix this + return getJmsContentHeaderProperties().getHeaders().containsKey(STRING_PROPERTY_PREFIX + propertyName); + } + } + + public boolean getBooleanProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + if (getJmsContentHeaderProperties().getHeaders() == null) + { + return Boolean.valueOf(null).booleanValue(); + } + else + { + // store as integer as temporary workaround + //Boolean b = (Boolean) getJmsContentHeaderProperties().headers.get(BOOLEAN_PROPERTY_PREFIX + propertyName); + Long b = (Long) getJmsContentHeaderProperties().getHeaders().get(BOOLEAN_PROPERTY_PREFIX + propertyName); + + if (b == null) + { + return Boolean.valueOf(null).booleanValue(); + } + else + { + return b.longValue() != 0; + } + } + } + + public byte getByteProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + if (getJmsContentHeaderProperties().getHeaders() == null) + { + return Byte.valueOf(null).byteValue(); + } + else + { + Byte b = (Byte) getJmsContentHeaderProperties().getHeaders().get(BYTE_PROPERTY_PREFIX + propertyName); + if (b == null) + { + return Byte.valueOf(null).byteValue(); + } + else + { + return b.byteValue(); + } + } + } + + public short getShortProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + if (getJmsContentHeaderProperties().getHeaders() == null) + { + return Short.valueOf(null).shortValue(); + } + else + { + Short s = (Short) getJmsContentHeaderProperties().getHeaders().get(SHORT_PROPERTY_PREFIX + propertyName); + if (s == null) + { + return Short.valueOf(null).shortValue(); + } + else + { + return s.shortValue(); + } + } + } + + public int getIntProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + if (getJmsContentHeaderProperties().getHeaders() == null) + { + return Integer.valueOf(null).intValue(); + } + else + { + Integer i = (Integer) getJmsContentHeaderProperties().getHeaders().get(INT_PROPERTY_PREFIX + propertyName); + if (i == null) + { + return Integer.valueOf(null).intValue(); + } + else + { + return i.intValue(); + } + } + } + + public long getLongProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + if (getJmsContentHeaderProperties().getHeaders() == null) + { + return Long.valueOf(null).longValue(); + } + else + { + Long l = (Long) getJmsContentHeaderProperties().getHeaders().get(LONG_PROPERTY_PREFIX + propertyName); + if (l == null) + { + // temp - the spec says do this but this throws a NumberFormatException + //return Long.valueOf(null).longValue(); + return 0; + } + else + { + return l.longValue(); + } + } + } + + public float getFloatProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + if (getJmsContentHeaderProperties().getHeaders() == null) + { + return Float.valueOf(null).floatValue(); + } + else + { + final Float f = (Float) getJmsContentHeaderProperties().getHeaders().get(FLOAT_PROPERTY_PREFIX + propertyName); + if (f == null) + { + return Float.valueOf(null).floatValue(); + } + else + { + return f.floatValue(); + } + } + } + + public double getDoubleProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + if (getJmsContentHeaderProperties().getHeaders() == null) + { + return Double.valueOf(null).doubleValue(); + } + else + { + final Double d = (Double) getJmsContentHeaderProperties().getHeaders().get(DOUBLE_PROPERTY_PREFIX + propertyName); + if (d == null) + { + return Double.valueOf(null).doubleValue(); + } + else + { + return d.shortValue(); + } + } + } + + public String getStringProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + if (getJmsContentHeaderProperties().getHeaders() == null) + { + return null; + } + else + { + return (String) getJmsContentHeaderProperties().getHeaders().get(STRING_PROPERTY_PREFIX + propertyName); + } + } + + public Object getObjectProperty(String propertyName) throws JMSException + { + checkPropertyName(propertyName); + throw new JMSException("Not implemented yet"); + } + + public Enumeration getPropertyNames() throws JMSException + { + return new FieldTableKeyEnumeration(getJmsContentHeaderProperties().getHeaders()) + { + public Object nextElement() + { + String propName = (String) _iterator.next(); + + //The propertyName has a single Char prefix. Skip this. + return propName.substring(1); + } + }; + } + + public void setBooleanProperty(String propertyName, boolean b) throws JMSException + { + checkPropertyName(propertyName); + //getJmsContentHeaderProperties().headers.put(BOOLEAN_PROPERTY_PREFIX + propertyName, Boolean.valueOf(b)); + getJmsContentHeaderProperties().getHeaders().put(BOOLEAN_PROPERTY_PREFIX + propertyName, b ? new Long(1) : new Long(0)); + } + + public void setByteProperty(String propertyName, byte b) throws JMSException + { + checkPropertyName(propertyName); + getJmsContentHeaderProperties().getHeaders().put(BYTE_PROPERTY_PREFIX + propertyName, new Byte(b)); + } + + public void setShortProperty(String propertyName, short i) throws JMSException + { + checkPropertyName(propertyName); + getJmsContentHeaderProperties().getHeaders().put(SHORT_PROPERTY_PREFIX + propertyName, new Short(i)); + } + + public void setIntProperty(String propertyName, int i) throws JMSException + { + checkPropertyName(propertyName); + getJmsContentHeaderProperties().getHeaders().put(INT_PROPERTY_PREFIX + propertyName, new Integer(i)); + } + + public void setLongProperty(String propertyName, long l) throws JMSException + { + checkPropertyName(propertyName); + getJmsContentHeaderProperties().getHeaders().put(LONG_PROPERTY_PREFIX + propertyName, new Long(l)); + } + + public void setFloatProperty(String propertyName, float f) throws JMSException + { + checkPropertyName(propertyName); + getJmsContentHeaderProperties().getHeaders().put(FLOAT_PROPERTY_PREFIX + propertyName, new Float(f)); + } + + public void setDoubleProperty(String propertyName, double v) throws JMSException + { + checkPropertyName(propertyName); + getJmsContentHeaderProperties().getHeaders().put(DOUBLE_PROPERTY_PREFIX + propertyName, new Double(v)); + } + + public void setStringProperty(String propertyName, String value) throws JMSException + { + checkPropertyName(propertyName); + createPropertyMapIfRequired(); + propertyName = STRING_PROPERTY_PREFIX + propertyName; + getJmsContentHeaderProperties().getHeaders().put(propertyName, value); + } + + private void createPropertyMapIfRequired() + { + if (getJmsContentHeaderProperties().getHeaders() == null) + { + getJmsContentHeaderProperties().setHeaders(new FieldTable()); + } + } + + public void setObjectProperty(String string, Object object) throws JMSException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void acknowledge() throws JMSException + { + // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge + // is not specified. In our case, we only set the session field where client acknowledge mode is specified. + if (_session != null) + { + // we set multiple to true here since acknowledgement implies acknowledge of all previous messages + // received on the session + _session.acknowledgeMessage(_deliveryTag, true); + } + } + + public abstract void clearBody() throws JMSException; + + /** + * Get a String representation of the body of the message. Used in the + * toString() method which outputs this before message properties. + */ + public abstract String toBodyString() throws JMSException; + + public abstract String getMimeType(); + + public String toString() + { + try + { + StringBuffer buf = new StringBuffer("Body:\n"); + buf.append(toBodyString()); + buf.append("\nJMS timestamp: ").append(getJMSTimestamp()); + buf.append("\nJMS expiration: ").append(getJMSExpiration()); + buf.append("\nJMS priority: ").append(getJMSPriority()); + buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode()); + buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo())); + buf.append("\nAMQ message number: ").append(_deliveryTag); + buf.append("\nProperties:"); + if (getJmsContentHeaderProperties().getHeaders() == null) + { + buf.append(""); + } + else + { + final Iterator it = getJmsContentHeaderProperties().getHeaders().entrySet().iterator(); + while (it.hasNext()) + { + final Map.Entry entry = (Map.Entry) it.next(); + final String propertyName = (String) entry.getKey(); + if (propertyName == null) + { + buf.append("\nInternal error: Property with NULL key defined"); + } + else + { + buf.append('\n').append(propertyName.substring(1)); + + char typeIdentifier = propertyName.charAt(0); + switch (typeIdentifier) + { + case org.apache.qpid.client.message.AbstractJMSMessage.BOOLEAN_PROPERTY_PREFIX: + buf.append(" "); + break; + case org.apache.qpid.client.message.AbstractJMSMessage.BYTE_PROPERTY_PREFIX: + buf.append(" "); + break; + case org.apache.qpid.client.message.AbstractJMSMessage.SHORT_PROPERTY_PREFIX: + buf.append(" "); + break; + case org.apache.qpid.client.message.AbstractJMSMessage.INT_PROPERTY_PREFIX: + buf.append(" "); + break; + case org.apache.qpid.client.message.AbstractJMSMessage.LONG_PROPERTY_PREFIX: + buf.append(" "); + break; + case org.apache.qpid.client.message.AbstractJMSMessage.FLOAT_PROPERTY_PREFIX: + buf.append(" "); + break; + case org.apache.qpid.client.message.AbstractJMSMessage.DOUBLE_PROPERTY_PREFIX: + buf.append(" "); + break; + case org.apache.qpid.client.message.AbstractJMSMessage.STRING_PROPERTY_PREFIX: + buf.append(" "); + break; + default: + buf.append("