Return-Path: Delivered-To: apmail-ws-synapse-dev-archive@www.apache.org Received: (qmail 681 invoked from network); 12 Sep 2007 06:16:57 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 12 Sep 2007 06:16:57 -0000 Received: (qmail 76642 invoked by uid 500); 12 Sep 2007 06:16:50 -0000 Delivered-To: apmail-ws-synapse-dev-archive@ws.apache.org Received: (qmail 76485 invoked by uid 500); 12 Sep 2007 06:16:50 -0000 Mailing-List: contact synapse-dev-help@ws.apache.org; run by ezmlm Precedence: bulk Reply-To: synapse-dev@ws.apache.org list-help: list-unsubscribe: List-Post: List-Id: Delivered-To: mailing list synapse-dev@ws.apache.org Received: (qmail 76474 invoked by uid 500); 12 Sep 2007 06:16:50 -0000 Delivered-To: apmail-ws-synapse-cvs@ws.apache.org Received: (qmail 76471 invoked by uid 99); 12 Sep 2007 06:16:50 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Sep 2007 23:16:50 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Sep 2007 06:18:29 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 7B87A1A9832; Tue, 11 Sep 2007 23:16:30 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r574793 - in /webservices/synapse/trunk/java/modules: core/src/main/java/org/apache/synapse/mediators/builtin/ transports/src/main/java/org/apache/axis2/transport/base/ transports/src/main/java/org/apache/axis2/transport/jms/ Date: Wed, 12 Sep 2007 06:16:29 -0000 To: synapse-cvs@ws.apache.org From: asankha@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20070912061630.7B87A1A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: asankha Date: Tue Sep 11 23:16:25 2007 New Revision: 574793 URL: http://svn.apache.org/viewvc?rev=574793&view=rev Log: More JMS 1.0.2b support conversions from 1.1 and minor fixes Also use a message selector on the JMSCorrelationID when waiting for a response on a destination Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/PropertyMediator.java webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/BaseUtils.java webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java Modified: webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/PropertyMediator.java URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/PropertyMediator.java?rev=574793&r1=574792&r2=574793&view=diff ============================================================================== --- webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/PropertyMediator.java (original) +++ webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/PropertyMediator.java Tue Sep 11 23:16:25 2007 @@ -100,7 +100,7 @@ Axis2MessageContext axis2smc = (Axis2MessageContext) smc; org.apache.axis2.context.MessageContext axis2MessageCtx = axis2smc.getAxis2MessageContext(); - axis2MessageCtx.setProperty(name, resultValue); + axis2MessageCtx.getOptions().setProperty(name, resultValue); } else if (Constants.SCOPE_TRANSPORT.equals(scope) && smc instanceof Axis2MessageContext) { Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/BaseUtils.java URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/BaseUtils.java?rev=574793&r1=574792&r2=574793&view=diff ============================================================================== --- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/BaseUtils.java (original) +++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/base/BaseUtils.java Tue Sep 11 23:16:25 2007 @@ -203,7 +203,6 @@ } } catch (Exception e) { - e.printStackTrace(); envelope = handleLegacyMessage(msgContext, message); } } @@ -257,7 +256,7 @@ OMTextImpl textData = (OMTextImpl) soapFactory.createOMText(textPayload); if (wrapperQName == null) { - wrapperQName = BaseConstants.DEFAULT_BINARY_WRAPPER; + wrapperQName = BaseConstants.DEFAULT_TEXT_WRAPPER; } wrapper = soapFactory.createOMElement(wrapperQName, null); wrapper.addChild(textData); Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java?rev=574793&r1=574792&r2=574793&view=diff ============================================================================== --- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java (original) +++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java Tue Sep 11 23:16:25 2007 @@ -133,7 +133,7 @@ try { log.info("Creating a JMS Queue with the JNDI name : " + destinationJNDIName + " using the connection factory definition named : " + name); - JMSUtils.createJMSQueue(conFactory.createConnection(), destinationJNDIName); + JMSUtils.createDestination(conFactory, destinationJNDIName); destinationName = getPhysicalDestinationName(destinationJNDIName); @@ -298,7 +298,7 @@ } try { - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session = JMSUtils.createSession(connection, false, Session.AUTO_ACKNOWLEDGE); Destination destination = null; try { @@ -306,10 +306,10 @@ } catch (NameNotFoundException e) { log.warn("Cannot find destination : " + destinationJNDIname + ". Creating a Queue"); - destination = session.createQueue(destinationJNDIname); + destination = JMSUtils.createDestination(session, destinationJNDIname); } - MessageConsumer consumer = session.createConsumer(destination); + MessageConsumer consumer = JMSUtils.createConsumer(session, destination); consumer.setMessageListener(jmsMessageReceiver); jmsSessions.put(destinationJNDIname, session); Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=574793&r1=574792&r2=574793&view=diff ============================================================================== --- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (original) +++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSSender.java Tue Sep 11 23:16:25 2007 @@ -19,6 +19,7 @@ import org.apache.axiom.om.OMElement; import org.apache.axiom.om.OMText; import org.apache.axiom.om.OMNode; +import org.apache.axiom.om.util.UUIDGenerator; import org.apache.axis2.AxisFault; import org.apache.axis2.context.MessageContext; import org.apache.axis2.context.ConfigurationContext; @@ -34,6 +35,7 @@ import org.apache.commons.logging.LogFactory; import javax.jms.*; +import javax.jms.Queue; import javax.activation.DataHandler; import javax.naming.Context; import javax.naming.NamingException; @@ -169,9 +171,13 @@ destination = jmsOut.getDestination(); } - String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_WAIT_REPLY); + String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO); if (replyDestName != null) { - replyDestination = jmsOut.getReplyDestination(replyDestName); + if (jmsConnectionFactory != null) { + replyDestination = jmsConnectionFactory.getDestination(replyDestName); + } else { + replyDestination = jmsOut.getReplyDestination(replyDestName); + } } // now we are going to use the JMS session, but if this was a session from a @@ -181,6 +187,7 @@ // convert the axis message context into a JMS Message that we can send over JMS Message message = null; + String correlationId = null; try { message = createJMSMessage(msgCtx, session); } catch (JMSException e) { @@ -198,6 +205,14 @@ if (waitForResponse) { replyDestination = JMSUtils.setReplyDestination( replyDestination, session, message); + // force the use of a JMS correlation ID if synchronous + try { + correlationId = message.getJMSCorrelationID(); + if (correlationId == null) { + correlationId = UUIDGenerator.getUUID(); + message.setJMSCorrelationID(correlationId); + } + } catch (JMSException ignore) {} } // send the outgoing message over JMS to the destination selected @@ -205,7 +220,7 @@ // if we are expecting a synchronous response back for the message sent out if (waitForResponse) { - waitForResponseAndProcess(session, replyDestination, msgCtx); + waitForResponseAndProcess(session, replyDestination, msgCtx, correlationId); } } @@ -228,9 +243,25 @@ * @throws AxisFault on error */ private void waitForResponseAndProcess(Session session, Destination replyDestination, - MessageContext msgCtx) throws AxisFault { + MessageContext msgCtx, String correlationId) throws AxisFault { + try { - MessageConsumer consumer = session.createConsumer(replyDestination); + MessageConsumer consumer = null; + if (replyDestination instanceof Queue) { + if (correlationId != null) { + consumer = ((QueueSession) session).createReceiver((Queue) replyDestination, + "JMSCorrelationID = '" + correlationId + "'"); + } else { + consumer = ((QueueSession) session).createReceiver((Queue) replyDestination); + } + } else { + if (correlationId != null) { + consumer = ((TopicSession) session).createSubscriber((Topic) replyDestination, + correlationId, false); + } else { + consumer = ((TopicSession) session).createSubscriber((Topic) replyDestination); + } + } // how long are we willing to wait for the sync response long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT; @@ -241,7 +272,8 @@ if (log.isDebugEnabled()) { log.debug("Waiting for a maximum of " + timeout + - "ms for a response message to destination : " + replyDestination); + "ms for a response message to destination : " + replyDestination + + " with JMS correlation ID : " + correlationId); } Message reply = consumer.receive(timeout); @@ -250,7 +282,8 @@ } else { log.warn("Did not receive a JMS response within " + - timeout + " ms to destination : " + replyDestination); + timeout + " ms to destination : " + replyDestination + + " with JMS correlation ID : " + correlationId); } } catch (JMSException e) { Modified: webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java?rev=574793&r1=574792&r2=574793&view=diff ============================================================================== --- webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java (original) +++ webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java Tue Sep 11 23:16:25 2007 @@ -50,6 +50,7 @@ import javax.naming.Context; import java.io.*; import java.util.*; +import java.nio.ByteBuffer; /** * Miscallaneous methods used for the JMS transport @@ -75,9 +76,9 @@ */ public static String createJMSQueue(Connection con, String destinationJNDIName) throws JMSException { try { - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + QueueSession session = ((QueueConnection) con).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(destinationJNDIName); - log.info("JMS Destination with JNDI name : " + destinationJNDIName + " created"); + log.info("JMS Queue with JNDI name : " + destinationJNDIName + " created"); return queue.getQueueName(); } finally { @@ -88,6 +89,29 @@ } /** + * Create a JMS Topic using the given connection with the JNDI destination name, and return the + * JMS Destination name of the created queue + * + * @param con the JMS Connection to be used + * @param destinationJNDIName the JNDI name of the Topic to be created + * @return the JMS Destination name of the created Topic + * @throws JMSException on error + */ + public static String createJMSTopic(Connection con, String destinationJNDIName) throws JMSException { + try { + TopicSession session = ((TopicConnection) con).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(destinationJNDIName); + log.info("JMS Topic with JNDI name : " + destinationJNDIName + " created"); + return topic.getTopicName(); + + } finally { + try { + con.close(); + } catch (JMSException ignore) {} + } + } + + /** * Should this service be enabled over the JMS transport? * * @param service the Axis service @@ -301,7 +325,7 @@ if (replyDestination == null) { try { // create temporary queue to receive the reply - replyDestination = session.createTemporaryQueue(); + replyDestination = createTemporaryDestination(session); } catch (JMSException e) { handleException("Error creating temporary queue for response"); } @@ -343,7 +367,7 @@ } try { - destination = session.createQueue(name); + destination = createDestination(session, name); } catch (JMSException e) { handleException("Error creating destination Queue : " + name, e); } @@ -604,18 +628,79 @@ } public byte[] getMessageBinaryPayload(Object message) { + if (message instanceof BytesMessage) { BytesMessage bytesMessage = (BytesMessage) message; - byte[] msgBytes; + ByteBuffer msgBytes = ByteBuffer.allocate(1024); try { - msgBytes = new byte[(int) bytesMessage.getBodyLength()]; - bytesMessage.reset(); - bytesMessage.readBytes(msgBytes); + while (true) { + byte[] temp = new byte[1024]; + int read = bytesMessage.readBytes(temp); + if (read > 0) { + msgBytes.put(temp, 0, read); + } else { + msgBytes.flip(); + return msgBytes.array(); + } + } } catch (JMSException e) { handleException("Error reading JMS binary message payload", e); } } return null; + } + + // ----------- JMS 1.0.2b compatibility methods ------------- + public static Session createSession(Connection con, + boolean transacted, int acknowledgeMode) throws JMSException { + + if (con instanceof QueueConnection) { + return ((QueueConnection) con).createQueueSession(transacted, acknowledgeMode); + } else { + return ((TopicConnection) con).createTopicSession(transacted, acknowledgeMode); + } + } + + public static Destination createDestination(Session session, String destName) + throws JMSException { + + if (session instanceof QueueSession) { + return ((QueueSession) session).createQueue(destName); + } else { + return ((TopicSession) session).createTopic(destName); + } + } + + public static void createDestination(ConnectionFactory conFactory, + String destinationJNDIName) throws JMSException { + + if (conFactory instanceof QueueConnectionFactory) { + JMSUtils.createJMSQueue( + ((QueueConnectionFactory) conFactory).createQueueConnection(), + destinationJNDIName); + } else { + JMSUtils.createJMSTopic( + ((TopicConnectionFactory) conFactory).createTopicConnection(), + destinationJNDIName); + } + } + public static MessageConsumer createConsumer(Session session, Destination dest) + throws JMSException { + + if (dest instanceof Queue) { + return ((QueueSession) session).createReceiver((Queue) dest); + } else { + return ((TopicSession) session).createSubscriber((Topic) dest); + } + } + + public static Destination createTemporaryDestination(Session session) throws JMSException { + + if (session instanceof QueueSession) { + return ((QueueSession) session).createTemporaryQueue(); + } else { + return ((TopicSession) session).createTemporaryTopic(); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org For additional commands, e-mail: synapse-dev-help@ws.apache.org