Return-Path:
Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org
Received: (qmail 10823 invoked from network); 19 Sep 2006 22:39:16 -0000
Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199)
by minotaur.apache.org with SMTP; 19 Sep 2006 22:39:16 -0000
Received: (qmail 35429 invoked by uid 500); 19 Sep 2006 22:39:16 -0000
Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org
Received: (qmail 35407 invoked by uid 500); 19 Sep 2006 22:39:16 -0000
Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: qpid-dev@incubator.apache.org
Delivered-To: mailing list qpid-commits@incubator.apache.org
Delivered-To: moderator for qpid-commits@incubator.apache.org
Received: (qmail 92169 invoked by uid 99); 19 Sep 2006 22:09:09 -0000
X-ASF-Spam-Status: No, hits=-9.8 required=5.0 tests=ALL_TRUSTED,NO_REAL_NAME
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Subject: svn commit: r447994 [20/46] - in /incubator/qpid/trunk/qpid: ./ cpp/
cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/
cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/
cpp/common/concurrent/ cpp/common/concur...
Date: Tue, 19 Sep 2006 22:07:25 -0000
To: qpid-commits@incubator.apache.org
From: rhs@apache.org
X-Mailer: svnmailer-1.1.0
Message-Id: <20060919220808.407061A985C@eris.apache.org>
X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/TopicSubscriberAdaptor.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/TopicSubscriberAdaptor.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/TopicSubscriberAdaptor.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,91 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+/**
+ * Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
+ *
+ */
+class TopicSubscriberAdaptor implements TopicSubscriber
+{
+ private final Topic _topic;
+ private final MessageConsumer _consumer;
+ private final boolean _noLocal;
+
+ TopicSubscriberAdaptor(Topic topic, MessageConsumer consumer, boolean noLocal)
+ {
+ _topic = topic;
+ _consumer = consumer;
+ _noLocal = noLocal;
+ }
+ TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer)
+ {
+ this(topic, consumer, consumer.isNoLocal());
+ }
+ public Topic getTopic() throws JMSException
+ {
+ return _topic;
+ }
+
+ public boolean getNoLocal() throws JMSException
+ {
+ return _noLocal;
+ }
+
+ public String getMessageSelector() throws JMSException
+ {
+ return _consumer.getMessageSelector();
+ }
+
+ public MessageListener getMessageListener() throws JMSException
+ {
+ return _consumer.getMessageListener();
+ }
+
+ public void setMessageListener(MessageListener messageListener) throws JMSException
+ {
+ _consumer.setMessageListener(messageListener);
+ }
+
+ public Message receive() throws JMSException
+ {
+ return _consumer.receive();
+ }
+
+ public Message receive(long l) throws JMSException
+ {
+ return _consumer.receive(l);
+ }
+
+ public Message receiveNoWait() throws JMSException
+ {
+ return _consumer.receiveNoWait();
+ }
+
+ public void close() throws JMSException
+ {
+ _consumer.close();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/TopicSubscriberAdaptor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverException.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.failover;
+
+/**
+ * This exception is thrown when failover is taking place and we need to let other
+ * parts of the client know about this.
+ */
+public class FailoverException extends RuntimeException
+{
+ public FailoverException(String message)
+ {
+ super(message);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,180 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.failover;
+
+import org.apache.mina.common.IoSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.failover.FailoverState;
+import org.apache.qpid.AMQDisconnectedException;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * When failover is required, we need a separate thread to handle the establishment of the new connection and
+ * the transfer of subscriptions.
+ *
+ * The reason this needs to be a separate thread is because you cannot do this work inside the MINA IO processor
+ * thread. One significant task is the connection setup which involves a protocol exchange until a particular state
+ * is achieved. However if you do this in the MINA thread, you have to block until the state is achieved which means
+ * the IO processor is not able to do anything at all.
+ */
+public class FailoverHandler implements Runnable
+{
+ private static final Logger _logger = Logger.getLogger(FailoverHandler.class);
+
+ private final IoSession _session;
+ private AMQProtocolHandler _amqProtocolHandler;
+
+ /**
+ * Used where forcing the failover host
+ */
+ private String _host;
+
+ /**
+ * Used where forcing the failover port
+ */
+ private int _port;
+
+ public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session)
+ {
+ _amqProtocolHandler = amqProtocolHandler;
+ _session = session;
+ }
+
+ public void run()
+ {
+ if (Thread.currentThread().isDaemon())
+ {
+ throw new IllegalStateException("FailoverHandler must run on a non-daemon thread.");
+ }
+ //Thread.currentThread().setName("Failover Thread");
+
+ _amqProtocolHandler.setFailoverLatch(new CountDownLatch(1));
+
+ // We wake up listeners. If they can handle failover, they will extend the
+ // FailoverSupport class and will in turn block on the latch until failover
+ // has completed before retrying the operation
+ _amqProtocolHandler.propagateExceptionToWaiters(new FailoverException("Failing over about to start"));
+
+ // Since failover impacts several structures we protect them all with a single mutex. These structures
+ // are also in child objects of the connection. This allows us to manipulate them without affecting
+ // client code which runs in a separate thread.
+ synchronized (_amqProtocolHandler.getConnection().getFailoverMutex())
+ {
+ _logger.info("Starting failover process");
+
+ // We switch in a new state manager temporarily so that the interaction to get to the "connection open"
+ // state works, without us having to terminate any existing "state waiters". We could theoretically
+ // have a state waiter waiting until the connection is closed for some reason. Or in future we may have
+ // a slightly more complex state model therefore I felt it was worthwhile doing this.
+ AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
+ _amqProtocolHandler.setStateManager(new AMQStateManager());
+ if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
+ {
+ _amqProtocolHandler.setStateManager(existingStateManager);
+ if (_host != null)
+ {
+ _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Redirect was vetoed by client"));
+ }
+ else
+ {
+ _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Failover was vetoed by client"));
+ }
+ _amqProtocolHandler.getFailoverLatch().countDown();
+ _amqProtocolHandler.setFailoverLatch(null);
+ return;
+ }
+ boolean failoverSucceeded;
+ // when host is non null we have a specified failover host otherwise we all the client to cycle through
+ // all specified hosts
+
+ // if _host has value then we are performing a redirect.
+ if (_host != null)
+ {
+ failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection(_host, _port, _amqProtocolHandler.isUseSSL());
+ }
+ else
+ {
+ failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection();
+ }
+ if (!failoverSucceeded)
+ {
+ _amqProtocolHandler.setStateManager(existingStateManager);
+ _amqProtocolHandler.getConnection().exceptionReceived(
+ new AMQDisconnectedException("Server closed connection and no failover " +
+ "was successful"));
+ }
+ else
+ {
+ _amqProtocolHandler.setStateManager(existingStateManager);
+ try
+ {
+ if (_amqProtocolHandler.getConnection().firePreResubscribe())
+ {
+ _logger.info("Resubscribing on new connection");
+ _amqProtocolHandler.getConnection().resubscribeSessions();
+ }
+ else
+ {
+ _logger.info("Client vetoed automatic resubscription");
+ }
+ _amqProtocolHandler.getConnection().fireFailoverComplete();
+ _amqProtocolHandler.setFailoverState(FailoverState.NOT_STARTED);
+ _logger.info("Connection failover completed successfully");
+ }
+ catch (Exception e)
+ {
+ _logger.info("Failover process failed - exception being propagated by protocol handler");
+ _amqProtocolHandler.setFailoverState(FailoverState.FAILED);
+ try
+ {
+ _amqProtocolHandler.exceptionCaught(_session, e);
+ }
+ catch (Exception ex)
+ {
+ _logger.error("Error notifying protocol session of error: " + ex, ex);
+ }
+ }
+ }
+ }
+ _amqProtocolHandler.getFailoverLatch().countDown();
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public void setHost(String host)
+ {
+ _host = host;
+ }
+
+ public int getPort()
+ {
+ return _port;
+ }
+
+ public void setPort(int port)
+ {
+ _port = port;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverState.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverState.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverState.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverState.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.failover;
+
+/**
+ * Enumeration of failover states. Used to handle failover from within AMQProtocolHandler where MINA events need to be
+ * dealt with and can happen during failover.
+ */
+public final class FailoverState
+{
+ private final String _state;
+
+ /** Failover has not yet been attempted on this connection */
+ public static final FailoverState NOT_STARTED = new FailoverState("NOT STARTED");
+
+ /** Failover has been requested on this connection but has not completed */
+ public static final FailoverState IN_PROGRESS = new FailoverState("IN PROGRESS");
+
+ /** Failover has been attempted and failed */
+ public static final FailoverState FAILED = new FailoverState("FAILED");
+
+ private FailoverState(String state)
+ {
+ _state = state;
+ }
+
+ public String toString()
+ {
+ return "FailoverState: " + _state;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverState.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverSupport.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverSupport.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverSupport.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverSupport.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.failover;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.failover.FailoverException;
+
+import javax.jms.JMSException;
+
+public abstract class FailoverSupport
+{
+ private static final Logger _log = Logger.getLogger(FailoverSupport.class);
+
+ public Object execute(AMQConnection con) throws JMSException
+ {
+ // We wait until we are not in the middle of failover before acquiring the mutex and then proceeding.
+ // Any method that can potentially block for any reason should use this class so that deadlock will not
+ // occur. The FailoverException is propagated by the AMQProtocolHandler to any listeners (e.g. frame listeners)
+ // that might be causing a block. When that happens, the exception is caught here and the mutex is released
+ // before waiting for the failover to complete (either successfully or unsuccessfully).
+ while (true)
+ {
+ try
+ {
+ con.blockUntilNotFailingOver();
+ }
+ catch (InterruptedException e)
+ {
+ _log.info("Interrupted: " + e, e);
+ return null;
+ }
+ synchronized (con.getFailoverMutex())
+ {
+ try
+ {
+ return operation();
+ }
+ catch (FailoverException e)
+ {
+ _log.info("Failover exception caught during operation");
+ }
+ }
+ }
+ }
+
+ protected abstract Object operation() throws JMSException;
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/failover/FailoverSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.message.UnprocessedMessage;
+
+public class BasicDeliverMethodHandler implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(BasicDeliverMethodHandler.class);
+
+ private static final BasicDeliverMethodHandler _instance = new BasicDeliverMethodHandler();
+
+ public static BasicDeliverMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ final UnprocessedMessage msg = new UnprocessedMessage();
+ msg.deliverBody = (BasicDeliverBody) evt.getMethod();
+ msg.channelId = evt.getChannelId();
+ _logger.debug("New JmsDeliver method received");
+ evt.getProtocolSession().unprocessedMessageReceived(msg);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicReturnMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicReturnMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.BasicReturnBody;
+
+public class BasicReturnMethodHandler implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(BasicReturnMethodHandler.class);
+
+ private static final BasicReturnMethodHandler _instance = new BasicReturnMethodHandler();
+
+ public static BasicReturnMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ _logger.debug("New JmsBounce method received");
+ final UnprocessedMessage msg = new UnprocessedMessage();
+ msg.deliverBody = null;
+ msg.bounceBody = (BasicReturnBody) evt.getMethod();
+ msg.channelId = evt.getChannelId();
+
+ evt.getProtocolSession().unprocessedMessageReceived(msg);
+ }
+}
\ No newline at end of file
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+
+public class ChannelCloseMethodHandler implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(ChannelCloseMethodHandler.class);
+
+ private static ChannelCloseMethodHandler _handler = new ChannelCloseMethodHandler();
+
+ public static ChannelCloseMethodHandler getInstance()
+ {
+ return _handler;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ _logger.debug("ChannelClose method received");
+ ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
+
+ int errorCode = method.replyCode;
+ String reason = method.replyText;
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
+ }
+
+ AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId());
+ evt.getProtocolSession().writeFrame(frame);
+ if (errorCode != AMQConstant.REPLY_SUCCESS.getCode())
+ {
+ _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason);
+ if (errorCode == AMQConstant.NO_CONSUMERS.getCode())
+ {
+ throw new AMQNoConsumersException("Error: " + reason, null);
+ }
+ else
+ {
+ if (errorCode == AMQConstant.NO_ROUTE.getCode())
+ {
+ throw new AMQNoRouteException("Error: " + reason, null);
+ }
+ else
+ {
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason);
+ }
+ }
+ }
+ evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
+
+public class ChannelCloseOkMethodHandler implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(ChannelCloseOkMethodHandler.class);
+
+ private static final ChannelCloseOkMethodHandler _instance = new ChannelCloseOkMethodHandler();
+
+ public static ChannelCloseOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
+
+ //todo this should do the closure
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+
+public class ChannelFlowOkMethodHandler implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(ChannelFlowOkMethodHandler.class);
+ private static final ChannelFlowOkMethodHandler _instance = new ChannelFlowOkMethodHandler();
+
+ public static ChannelFlowOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ChannelFlowOkMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ ChannelFlowOkBody method = (ChannelFlowOkBody) evt.getMethod();
+ _logger.debug("Received Channel.Flow-Ok message, active = " + method.active);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,89 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionClosedException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.AMQAuthenticationException;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+
+public class ConnectionCloseMethodHandler implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class);
+
+ private static ConnectionCloseMethodHandler _handler = new ConnectionCloseMethodHandler();
+
+ public static ConnectionCloseMethodHandler getInstance()
+ {
+ return _handler;
+ }
+
+ private ConnectionCloseMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ _logger.info("ConnectionClose frame received");
+ ConnectionCloseBody method = (ConnectionCloseBody) evt.getMethod();
+
+ // does it matter
+ //stateManager.changeState(AMQState.CONNECTION_CLOSING);
+
+ int errorCode = method.replyCode;
+ String reason = method.replyText;
+
+ // TODO: check whether channel id of zero is appropriate
+ evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0));
+
+ if (errorCode != 200)
+ {
+ if(errorCode == AMQConstant.NOT_ALLOWED.getCode())
+ {
+ _logger.info("Authentication Error:"+Thread.currentThread().getName());
+
+ evt.getProtocolSession().closeProtocolSession();
+
+ //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
+ stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
+
+ throw new AMQAuthenticationException(errorCode, reason);
+ }
+ else
+ {
+ _logger.info("Connection close received with error code " + errorCode);
+
+
+ throw new AMQConnectionClosedException(errorCode, "Error: " + reason);
+ }
+ }
+
+ // this actually closes the connection in the case where it is not an error.
+
+ evt.getProtocolSession().closeProtocolSession();
+
+ stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+
+public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener
+{
+
+ private static final Logger _logger = Logger.getLogger(ConnectionOpenOkMethodHandler.class);
+
+ private static final ConnectionOpenOkMethodHandler _instance = new ConnectionOpenOkMethodHandler();
+
+ public static ConnectionOpenOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ConnectionOpenOkMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ AMQProtocolSession session = evt.getProtocolSession();
+ ConnectionOpenOkBody method = (ConnectionOpenOkBody) evt.getMethod();
+ stateManager.changeState(AMQState.CONNECTION_OPEN);
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.ConnectionRedirectBody;
+
+public class ConnectionRedirectMethodHandler implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionRedirectMethodHandler.class);
+
+ private static final int DEFAULT_REDIRECT_PORT = 5672;
+
+ private static ConnectionRedirectMethodHandler _handler = new ConnectionRedirectMethodHandler();
+
+ public static ConnectionRedirectMethodHandler getInstance()
+ {
+ return _handler;
+ }
+
+ private ConnectionRedirectMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ _logger.info("ConnectionRedirect frame received");
+ ConnectionRedirectBody method = (ConnectionRedirectBody) evt.getMethod();
+
+ // the host is in the form hostname:port with the port being optional
+ int portIndex = method.host.indexOf(':');
+ String host;
+ int port;
+ if (portIndex == -1)
+ {
+ host = method.host;
+ port = DEFAULT_REDIRECT_PORT;
+ }
+ else
+ {
+ host = method.host.substring(0, portIndex);
+ port = Integer.parseInt(method.host.substring(portIndex + 1));
+ }
+ evt.getProtocolSession().failover(host, port);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,64 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+public class ConnectionSecureMethodHandler implements StateAwareMethodListener
+{
+ private static final ConnectionSecureMethodHandler _instance = new ConnectionSecureMethodHandler();
+
+ public static ConnectionSecureMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ SaslClient client = evt.getProtocolSession().getSaslClient();
+ if (client == null)
+ {
+ throw new AMQException("No SASL client set up - cannot proceed with authentication");
+ }
+
+ ConnectionSecureBody body = (ConnectionSecureBody) evt.getMethod();
+
+ try
+ {
+ // Evaluate server challenge
+ byte[] response = client.evaluateChallenge(body.challenge);
+ AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(), response);
+ evt.getProtocolSession().writeFrame(responseFrame);
+ }
+ catch (SaslException e)
+ {
+ throw new AMQException("Error processing SASL challenge: " + e, e);
+ }
+
+
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,184 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.security.AMQCallbackHandler;
+import org.apache.qpid.client.security.CallbackHandlerRegistry;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.FieldTable;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.StringTokenizer;
+
+public class ConnectionStartMethodHandler implements StateAwareMethodListener
+{
+
+ private static final Logger _log = Logger.getLogger(ConnectionStartMethodHandler.class);
+
+ private static final ConnectionStartMethodHandler _instance = new ConnectionStartMethodHandler();
+
+ public static ConnectionStartMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ConnectionStartMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ ConnectionStartBody body = (ConnectionStartBody) evt.getMethod();
+
+ try
+ {
+ // the mechanism we are going to use
+ String mechanism;
+ if (body.mechanisms == null)
+ {
+ throw new AMQException("mechanism not specified in ConnectionStart method frame");
+ }
+ else
+ {
+ mechanism = chooseMechanism(body.mechanisms);
+ }
+
+ if (mechanism == null)
+ {
+ throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms));
+ }
+
+ final AMQProtocolSession ps = evt.getProtocolSession();
+ byte[] saslResponse;
+ try
+ {
+ SaslClient sc = Sasl.createSaslClient(new String[]{mechanism},
+ null, "AMQP", "localhost",
+ null, createCallbackHandler(mechanism, ps));
+ if (sc == null)
+ {
+ throw new AMQException("Client SASL configuration error: no SaslClient could be created for mechanism " +
+ mechanism + ". Please ensure all factories are registered. See DynamicSaslRegistrar for " +
+ " details of how to register non-standard SASL client providers.");
+ }
+ ps.setSaslClient(sc);
+ saslResponse = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null);
+ }
+ catch (SaslException e)
+ {
+ ps.setSaslClient(null);
+ throw new AMQException("Unable to create SASL client: " + e, e);
+ }
+
+ if (body.locales == null)
+ {
+ throw new AMQException("Locales is not defined in Connection Start method");
+ }
+ final String locales = new String(body.locales, "utf8");
+ final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
+ String selectedLocale = null;
+ if (tokenizer.hasMoreTokens())
+ {
+ selectedLocale = tokenizer.nextToken();
+ }
+ else
+ {
+ throw new AMQException("No locales sent from server, passed: " + locales);
+ }
+
+ stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+ FieldTable clientProperties = new FieldTable();
+ clientProperties.put("instance", ps.getClientID());
+ clientProperties.put("product", "Qpid");
+ clientProperties.put("version", "1.0");
+ clientProperties.put("platform", getFullSystemInfo());
+ ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism,
+ saslResponse, selectedLocale));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new AMQException(_log, "Unable to decode data: " + e, e);
+ }
+ }
+
+ private String getFullSystemInfo()
+ {
+ StringBuffer fullSystemInfo = new StringBuffer();
+ fullSystemInfo.append(System.getProperty("java.runtime.name"));
+ fullSystemInfo.append(", " + System.getProperty("java.runtime.version"));
+ fullSystemInfo.append(", " + System.getProperty("java.vendor"));
+ fullSystemInfo.append(", " + System.getProperty("os.arch"));
+ fullSystemInfo.append(", " + System.getProperty("os.name"));
+ fullSystemInfo.append(", " + System.getProperty("os.version"));
+ fullSystemInfo.append(", " + System.getProperty("sun.os.patch.level"));
+
+ return fullSystemInfo.toString();
+ }
+
+ private String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException
+ {
+ final String mechanisms = new String(availableMechanisms, "utf8");
+ StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
+ HashSet mechanismSet = new HashSet();
+ while (tokenizer.hasMoreTokens())
+ {
+ mechanismSet.add(tokenizer.nextToken());
+ }
+
+ String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms();
+ StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " ");
+ while (prefTokenizer.hasMoreTokens())
+ {
+ String mech = prefTokenizer.nextToken();
+ if (mechanismSet.contains(mech))
+ {
+ return mech;
+ }
+ }
+ return null;
+ }
+
+ private AMQCallbackHandler createCallbackHandler(String mechanism, AMQProtocolSession protocolSession)
+ throws AMQException
+ {
+ Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism);
+ try
+ {
+ Object instance = mechanismClass.newInstance();
+ AMQCallbackHandler cbh = (AMQCallbackHandler) instance;
+ cbh.initialise(protocolSession);
+ return cbh;
+ }
+ catch (Exception e)
+ {
+ throw new AMQException("Unable to create callback handler: " + e, e);
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.AMQFrame;
+
+public class ConnectionTuneMethodHandler implements StateAwareMethodListener
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionTuneMethodHandler.class);
+
+ private static final ConnectionTuneMethodHandler _instance = new ConnectionTuneMethodHandler();
+
+ public static ConnectionTuneMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ protected ConnectionTuneMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ {
+ _logger.debug("ConnectionTune frame received");
+ ConnectionTuneBody frame = (ConnectionTuneBody) evt.getMethod();
+ AMQProtocolSession session = evt.getProtocolSession();
+
+ ConnectionTuneParameters params = session.getConnectionTuneParameters();
+ if (params == null)
+ {
+ params = new ConnectionTuneParameters();
+ }
+
+ params.setFrameMax(frame.frameMax);
+ params.setChannelMax(frame.channelMax);
+ params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
+ session.setConnectionTuneParameters(params);
+
+ stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+ session.writeFrame(createTuneOkFrame(evt.getChannelId(), params));
+ session.writeFrame(createConnectionOpenFrame(evt.getChannelId(), session.getAMQConnection().getVirtualHost(), null, true));
+ }
+
+ protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist)
+ {
+ return ConnectionOpenBody.createAMQFrame(channel, path, capabilities, insist);
+ }
+
+ protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params)
+ {
+ return ConnectionTuneOkBody.createAMQFrame(channel, params.getChannelMax(), params.getFrameMax(), params.getHeartbeat());
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AMQMessage.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AMQMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AMQMessage.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,68 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.client.AMQSession;
+
+public class AMQMessage
+{
+ protected ContentHeaderProperties _contentHeaderProperties;
+
+ /**
+ * If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required
+ */
+ protected AMQSession _session;
+
+ protected final long _deliveryTag;
+
+ public AMQMessage(ContentHeaderProperties properties, long deliveryTag)
+ {
+ _contentHeaderProperties = properties;
+ _deliveryTag = deliveryTag;
+ }
+
+ public AMQMessage(ContentHeaderProperties properties)
+ {
+ this(properties, -1);
+ }
+
+ /**
+ * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user
+ * calls acknowledge()
+ * @param s the AMQ session that delivered this message
+ */
+ public void setAMQSession(AMQSession s)
+ {
+ _session = s;
+ }
+
+ public AMQSession getAMQSession()
+ {
+ return _session;
+ }
+
+ /**
+ * Get the AMQ message number assigned to this message
+ * @return the message number
+ */
+ public long getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AMQMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessage.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessage.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,677 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableKeyEnumeration;
+import org.apache.commons.collections.map.ReferenceMap;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.mina.common.ByteBuffer;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.Map;
+
+public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms.Message
+{
+ private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
+
+// protected Map _messageProperties;
+
+ public static final char BOOLEAN_PROPERTY_PREFIX = 'B';
+ public static final char BYTE_PROPERTY_PREFIX = 'b';
+ public static final char SHORT_PROPERTY_PREFIX = 's';
+ public static final char INT_PROPERTY_PREFIX = 'i';
+ public static final char LONG_PROPERTY_PREFIX = 'l';
+ public static final char FLOAT_PROPERTY_PREFIX = 'f';
+ public static final char DOUBLE_PROPERTY_PREFIX = 'd';
+ public static final char STRING_PROPERTY_PREFIX = 'S';
+
+ protected boolean _redelivered;
+
+ protected ByteBuffer _data;
+
+ protected AbstractJMSMessage(ByteBuffer data)
+ {
+ super(new BasicContentHeaderProperties());
+ _data = data;
+ if (_data != null)
+ {
+ _data.acquire();
+ }
+ }
+
+ protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException
+ {
+ this(contentHeader, deliveryTag);
+ _data = data;
+ if (_data != null)
+ {
+ _data.acquire();
+ }
+ }
+
+ protected AbstractJMSMessage(BasicContentHeaderProperties contentHeader, long deliveryTag)
+ {
+ super(contentHeader, deliveryTag);
+ }
+
+ public String getJMSMessageID() throws JMSException
+ {
+ if (getJmsContentHeaderProperties().getMessageId() == null)
+ {
+ getJmsContentHeaderProperties().setMessageId("ID:" + _deliveryTag);
+ }
+ return getJmsContentHeaderProperties().getMessageId();
+ }
+
+ public void setJMSMessageID(String messageId) throws JMSException
+ {
+ getJmsContentHeaderProperties().setMessageId(messageId);
+ }
+
+ public long getJMSTimestamp() throws JMSException
+ {
+ return new Long(getJmsContentHeaderProperties().getTimestamp()).longValue();
+ }
+
+ public void setJMSTimestamp(long timestamp) throws JMSException
+ {
+ getJmsContentHeaderProperties().setTimestamp(timestamp);
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException
+ {
+ return getJmsContentHeaderProperties().getCorrelationId().getBytes();
+ }
+
+ public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
+ {
+ getJmsContentHeaderProperties().setCorrelationId(new String(bytes));
+ }
+
+ public void setJMSCorrelationID(String correlationId) throws JMSException
+ {
+ getJmsContentHeaderProperties().setCorrelationId(correlationId);
+ }
+
+ public String getJMSCorrelationID() throws JMSException
+ {
+ return getJmsContentHeaderProperties().getCorrelationId();
+ }
+
+ public Destination getJMSReplyTo() throws JMSException
+ {
+ String replyToEncoding = getJmsContentHeaderProperties().getReplyTo();
+ if (replyToEncoding == null)
+ {
+ return null;
+ }
+ else
+ {
+ Destination dest = (Destination) _destinationCache.get(replyToEncoding);
+ if (dest == null)
+ {
+ char destType = replyToEncoding.charAt(0);
+ if (destType == 'Q')
+ {
+ dest = new AMQQueue(replyToEncoding.substring(1));
+ }
+ else if (destType == 'T')
+ {
+ dest = new AMQTopic(replyToEncoding.substring(1));
+ }
+ else
+ {
+ throw new JMSException("Illegal value in JMS_ReplyTo property: " + replyToEncoding);
+ }
+ _destinationCache.put(replyToEncoding, dest);
+ }
+ return dest;
+ }
+ }
+
+ public void setJMSReplyTo(Destination destination) throws JMSException
+ {
+ if (destination == null)
+ {
+ throw new IllegalArgumentException("Null destination not allowed");
+ }
+ if (!(destination instanceof AMQDestination))
+ {
+ throw new IllegalArgumentException("ReplyTo destination my be an AMQ destination - passed argument was type " +
+ destination.getClass());
+ }
+ final AMQDestination amqd = (AMQDestination) destination;
+
+ final String encodedDestination = amqd.getEncodedName();
+ _destinationCache.put(encodedDestination, destination);
+ getJmsContentHeaderProperties().setReplyTo(encodedDestination);
+ }
+
+ public Destination getJMSDestination() throws JMSException
+ {
+ // TODO: implement this once we have sorted out how to figure out the exchange class
+ throw new NotImplementedException();
+ }
+
+ public void setJMSDestination(Destination destination) throws JMSException
+ {
+ throw new NotImplementedException();
+ }
+
+ public int getJMSDeliveryMode() throws JMSException
+ {
+ return getJmsContentHeaderProperties().getDeliveryMode();
+ }
+
+ public void setJMSDeliveryMode(int i) throws JMSException
+ {
+ getJmsContentHeaderProperties().setDeliveryMode((byte) i);
+ }
+
+ public boolean getJMSRedelivered() throws JMSException
+ {
+ return _redelivered;
+ }
+
+ public void setJMSRedelivered(boolean b) throws JMSException
+ {
+ _redelivered = b;
+ }
+
+ public String getJMSType() throws JMSException
+ {
+ return getMimeType();
+ }
+
+ public void setJMSType(String string) throws JMSException
+ {
+ throw new JMSException("Cannot set JMS Type - it is implicitly defined based on message type");
+ }
+
+ public long getJMSExpiration() throws JMSException
+ {
+ return new Long(getJmsContentHeaderProperties().getExpiration()).longValue();
+ }
+
+ public void setJMSExpiration(long l) throws JMSException
+ {
+ getJmsContentHeaderProperties().setExpiration(l);
+ }
+
+ public int getJMSPriority() throws JMSException
+ {
+ return getJmsContentHeaderProperties().getPriority();
+ }
+
+ public void setJMSPriority(int i) throws JMSException
+ {
+ getJmsContentHeaderProperties().setPriority((byte) i);
+ }
+
+ public void clearProperties() throws JMSException
+ {
+ if (getJmsContentHeaderProperties().getHeaders() != null)
+ {
+ getJmsContentHeaderProperties().getHeaders().clear();
+ }
+ }
+
+ public boolean propertyExists(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ if (getJmsContentHeaderProperties().getHeaders() == null)
+ {
+ return false;
+ }
+ else
+ {
+ // TODO: fix this
+ return getJmsContentHeaderProperties().getHeaders().containsKey(STRING_PROPERTY_PREFIX + propertyName);
+ }
+ }
+
+ public boolean getBooleanProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ if (getJmsContentHeaderProperties().getHeaders() == null)
+ {
+ return Boolean.valueOf(null).booleanValue();
+ }
+ else
+ {
+ // store as integer as temporary workaround
+ //Boolean b = (Boolean) getJmsContentHeaderProperties().headers.get(BOOLEAN_PROPERTY_PREFIX + propertyName);
+ Long b = (Long) getJmsContentHeaderProperties().getHeaders().get(BOOLEAN_PROPERTY_PREFIX + propertyName);
+
+ if (b == null)
+ {
+ return Boolean.valueOf(null).booleanValue();
+ }
+ else
+ {
+ return b.longValue() != 0;
+ }
+ }
+ }
+
+ public byte getByteProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ if (getJmsContentHeaderProperties().getHeaders() == null)
+ {
+ return Byte.valueOf(null).byteValue();
+ }
+ else
+ {
+ Byte b = (Byte) getJmsContentHeaderProperties().getHeaders().get(BYTE_PROPERTY_PREFIX + propertyName);
+ if (b == null)
+ {
+ return Byte.valueOf(null).byteValue();
+ }
+ else
+ {
+ return b.byteValue();
+ }
+ }
+ }
+
+ public short getShortProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ if (getJmsContentHeaderProperties().getHeaders() == null)
+ {
+ return Short.valueOf(null).shortValue();
+ }
+ else
+ {
+ Short s = (Short) getJmsContentHeaderProperties().getHeaders().get(SHORT_PROPERTY_PREFIX + propertyName);
+ if (s == null)
+ {
+ return Short.valueOf(null).shortValue();
+ }
+ else
+ {
+ return s.shortValue();
+ }
+ }
+ }
+
+ public int getIntProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ if (getJmsContentHeaderProperties().getHeaders() == null)
+ {
+ return Integer.valueOf(null).intValue();
+ }
+ else
+ {
+ Integer i = (Integer) getJmsContentHeaderProperties().getHeaders().get(INT_PROPERTY_PREFIX + propertyName);
+ if (i == null)
+ {
+ return Integer.valueOf(null).intValue();
+ }
+ else
+ {
+ return i.intValue();
+ }
+ }
+ }
+
+ public long getLongProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ if (getJmsContentHeaderProperties().getHeaders() == null)
+ {
+ return Long.valueOf(null).longValue();
+ }
+ else
+ {
+ Long l = (Long) getJmsContentHeaderProperties().getHeaders().get(LONG_PROPERTY_PREFIX + propertyName);
+ if (l == null)
+ {
+ // temp - the spec says do this but this throws a NumberFormatException
+ //return Long.valueOf(null).longValue();
+ return 0;
+ }
+ else
+ {
+ return l.longValue();
+ }
+ }
+ }
+
+ public float getFloatProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ if (getJmsContentHeaderProperties().getHeaders() == null)
+ {
+ return Float.valueOf(null).floatValue();
+ }
+ else
+ {
+ final Float f = (Float) getJmsContentHeaderProperties().getHeaders().get(FLOAT_PROPERTY_PREFIX + propertyName);
+ if (f == null)
+ {
+ return Float.valueOf(null).floatValue();
+ }
+ else
+ {
+ return f.floatValue();
+ }
+ }
+ }
+
+ public double getDoubleProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ if (getJmsContentHeaderProperties().getHeaders() == null)
+ {
+ return Double.valueOf(null).doubleValue();
+ }
+ else
+ {
+ final Double d = (Double) getJmsContentHeaderProperties().getHeaders().get(DOUBLE_PROPERTY_PREFIX + propertyName);
+ if (d == null)
+ {
+ return Double.valueOf(null).doubleValue();
+ }
+ else
+ {
+ return d.shortValue();
+ }
+ }
+ }
+
+ public String getStringProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ if (getJmsContentHeaderProperties().getHeaders() == null)
+ {
+ return null;
+ }
+ else
+ {
+ return (String) getJmsContentHeaderProperties().getHeaders().get(STRING_PROPERTY_PREFIX + propertyName);
+ }
+ }
+
+ public Object getObjectProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ throw new JMSException("Not implemented yet");
+ }
+
+ public Enumeration getPropertyNames() throws JMSException
+ {
+ return new FieldTableKeyEnumeration(getJmsContentHeaderProperties().getHeaders())
+ {
+ public Object nextElement()
+ {
+ String propName = (String) _iterator.next();
+
+ //The propertyName has a single Char prefix. Skip this.
+ return propName.substring(1);
+ }
+ };
+ }
+
+ public void setBooleanProperty(String propertyName, boolean b) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ //getJmsContentHeaderProperties().headers.put(BOOLEAN_PROPERTY_PREFIX + propertyName, Boolean.valueOf(b));
+ getJmsContentHeaderProperties().getHeaders().put(BOOLEAN_PROPERTY_PREFIX + propertyName, b ? new Long(1) : new Long(0));
+ }
+
+ public void setByteProperty(String propertyName, byte b) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ getJmsContentHeaderProperties().getHeaders().put(BYTE_PROPERTY_PREFIX + propertyName, new Byte(b));
+ }
+
+ public void setShortProperty(String propertyName, short i) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ getJmsContentHeaderProperties().getHeaders().put(SHORT_PROPERTY_PREFIX + propertyName, new Short(i));
+ }
+
+ public void setIntProperty(String propertyName, int i) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ getJmsContentHeaderProperties().getHeaders().put(INT_PROPERTY_PREFIX + propertyName, new Integer(i));
+ }
+
+ public void setLongProperty(String propertyName, long l) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ getJmsContentHeaderProperties().getHeaders().put(LONG_PROPERTY_PREFIX + propertyName, new Long(l));
+ }
+
+ public void setFloatProperty(String propertyName, float f) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ getJmsContentHeaderProperties().getHeaders().put(FLOAT_PROPERTY_PREFIX + propertyName, new Float(f));
+ }
+
+ public void setDoubleProperty(String propertyName, double v) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ getJmsContentHeaderProperties().getHeaders().put(DOUBLE_PROPERTY_PREFIX + propertyName, new Double(v));
+ }
+
+ public void setStringProperty(String propertyName, String value) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ createPropertyMapIfRequired();
+ propertyName = STRING_PROPERTY_PREFIX + propertyName;
+ getJmsContentHeaderProperties().getHeaders().put(propertyName, value);
+ }
+
+ private void createPropertyMapIfRequired()
+ {
+ if (getJmsContentHeaderProperties().getHeaders() == null)
+ {
+ getJmsContentHeaderProperties().setHeaders(new FieldTable());
+ }
+ }
+
+ public void setObjectProperty(String string, Object object) throws JMSException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void acknowledge() throws JMSException
+ {
+ // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+ // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+ if (_session != null)
+ {
+ // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+ // received on the session
+ _session.acknowledgeMessage(_deliveryTag, true);
+ }
+ }
+
+ public abstract void clearBody() throws JMSException;
+
+ /**
+ * Get a String representation of the body of the message. Used in the
+ * toString() method which outputs this before message properties.
+ */
+ public abstract String toBodyString() throws JMSException;
+
+ public abstract String getMimeType();
+
+ public String toString()
+ {
+ try
+ {
+ StringBuffer buf = new StringBuffer("Body:\n");
+ buf.append(toBodyString());
+ buf.append("\nJMS timestamp: ").append(getJMSTimestamp());
+ buf.append("\nJMS expiration: ").append(getJMSExpiration());
+ buf.append("\nJMS priority: ").append(getJMSPriority());
+ buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode());
+ buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo()));
+ buf.append("\nAMQ message number: ").append(_deliveryTag);
+ buf.append("\nProperties:");
+ if (getJmsContentHeaderProperties().getHeaders() == null)
+ {
+ buf.append("");
+ }
+ else
+ {
+ final Iterator it = getJmsContentHeaderProperties().getHeaders().entrySet().iterator();
+ while (it.hasNext())
+ {
+ final Map.Entry entry = (Map.Entry) it.next();
+ final String propertyName = (String) entry.getKey();
+ if (propertyName == null)
+ {
+ buf.append("\nInternal error: Property with NULL key defined");
+ }
+ else
+ {
+ buf.append('\n').append(propertyName.substring(1));
+
+ char typeIdentifier = propertyName.charAt(0);
+ switch (typeIdentifier)
+ {
+ case org.apache.qpid.client.message.AbstractJMSMessage.BOOLEAN_PROPERTY_PREFIX:
+ buf.append(" ");
+ break;
+ case org.apache.qpid.client.message.AbstractJMSMessage.BYTE_PROPERTY_PREFIX:
+ buf.append(" ");
+ break;
+ case org.apache.qpid.client.message.AbstractJMSMessage.SHORT_PROPERTY_PREFIX:
+ buf.append(" ");
+ break;
+ case org.apache.qpid.client.message.AbstractJMSMessage.INT_PROPERTY_PREFIX:
+ buf.append(" ");
+ break;
+ case org.apache.qpid.client.message.AbstractJMSMessage.LONG_PROPERTY_PREFIX:
+ buf.append(" ");
+ break;
+ case org.apache.qpid.client.message.AbstractJMSMessage.FLOAT_PROPERTY_PREFIX:
+ buf.append(" ");
+ break;
+ case org.apache.qpid.client.message.AbstractJMSMessage.DOUBLE_PROPERTY_PREFIX:
+ buf.append(" ");
+ break;
+ case org.apache.qpid.client.message.AbstractJMSMessage.STRING_PROPERTY_PREFIX:
+ buf.append(" ");
+ break;
+ default:
+ buf.append("