Return-Path: Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: (qmail 18607 invoked from network); 31 Jul 2009 19:55:00 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 31 Jul 2009 19:55:00 -0000 Received: (qmail 93566 invoked by uid 500); 31 Jul 2009 19:55:01 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 93498 invoked by uid 500); 31 Jul 2009 19:55:00 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 93472 invoked by uid 99); 31 Jul 2009 19:54:59 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 31 Jul 2009 19:54:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 31 Jul 2009 19:54:57 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0058D2388989; Fri, 31 Jul 2009 19:54:37 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r799724 - in /cxf/trunk: integration/jca/src/main/java/org/apache/cxf/jca/inbound/ rt/transports/jms/ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ Date: Fri, 31 Jul 2009 19:54:36 -0000 To: commits@cxf.apache.org From: dkulp@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090731195437.0058D2388989@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dkulp Date: Fri Jul 31 19:54:36 2009 New Revision: 799724 URL: http://svn.apache.org/viewvc?rev=799724&view=rev Log: [CXF-2372] JCA + XA transaction work Patch from Seumas Soltysik applied (with a lot of mods due to conflicts on trunk) Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java (with props) cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java (with props) Modified: cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java cxf/trunk/rt/transports/jms/pom.xml cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java Modified: cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java URL: http://svn.apache.org/viewvc/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java?rev=799724&r1=799723&r2=799724&view=diff ============================================================================== --- cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java (original) +++ cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java Fri Jul 31 19:54:36 2009 @@ -53,8 +53,20 @@ @Override public Object getServiceObject(Exchange context) { + MessageEndpoint ep = null; + MessageEndpoint epFromMessage = null; + + if (context != null) { + epFromMessage = context.getInMessage().getContent(MessageEndpoint.class); + } + + if (epFromMessage == null) { + ep = getMessageEndpoint(); + } else { + ep = epFromMessage; + } + Object target = null; - MessageEndpoint ep = getMessageEndpoint(); if (ep == null) { LOG.log(Level.SEVERE, "Failed to obtain MessageEndpoint"); @@ -68,9 +80,11 @@ LOG.log(Level.SEVERE, "Failed to obtain service object " + targetJndiName, e); return null; } finally { - releaseEndpoint(ep); + if (epFromMessage == null) { + releaseEndpoint(ep); + } } - + return target; } Modified: cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java URL: http://svn.apache.org/viewvc/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java?rev=799724&r1=799723&r2=799724&view=diff ============================================================================== --- cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java (original) +++ cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java Fri Jul 31 19:54:36 2009 @@ -18,6 +18,7 @@ */ package org.apache.cxf.jca.inbound; +import java.lang.reflect.Method; import java.net.URL; import java.util.Arrays; import java.util.List; @@ -39,6 +40,7 @@ import org.apache.cxf.jaxws.EndpointImpl; import org.apache.cxf.jaxws.EndpointUtils; import org.apache.cxf.jaxws.JaxWsServerFactoryBean; +import org.apache.cxf.service.model.EndpointInfo; /** * @@ -51,6 +53,9 @@ public class MDBActivationWork implements Work { private static final Logger LOG = LogUtils.getL7dLogger(MDBActivationWork.class); + private static final String MESSAGE_LISTENER_METHOD = "lookupTargetObject"; + private static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpointFactory"; + private static final String MDB_TRANSACTED_METHOD = "MDBTransactedMethod"; private MDBActivationSpec spec; private MessageEndpointFactory endpointFactory; @@ -126,6 +131,16 @@ if (bus == null) { bus = BusFactory.getDefaultBus(); } + + Method method = null; + + try { + Class clazz = org.apache.cxf.jca.inbound.DispatchMDBMessageListener.class; + method = clazz.getMethod(MESSAGE_LISTENER_METHOD, new Class[] {String.class}); + } catch (Exception ex) { + LOG.severe("Failed to get method " + MESSAGE_LISTENER_METHOD + + " from class DispatchMDBMessageListener."); + } Server server = createServer(bus, serviceClass, invoker); @@ -134,6 +149,10 @@ return; } + EndpointInfo ei = server.getEndpoint().getEndpointInfo(); + ei.setProperty(MESSAGE_ENDPOINT_FACTORY, endpointFactory); + ei.setProperty(MDB_TRANSACTED_METHOD, method); + server.start(); // save the server for clean up later Modified: cxf/trunk/rt/transports/jms/pom.xml URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/pom.xml?rev=799724&r1=799723&r2=799724&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/pom.xml (original) +++ cxf/trunk/rt/transports/jms/pom.xml Fri Jul 31 19:54:36 2009 @@ -112,6 +112,11 @@ commons-lang commons-lang + + org.apache.geronimo.specs + geronimo-j2ee-connector_1.5_spec + provided + Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java?rev=799724&view=auto ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java (added) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java Fri Jul 31 19:54:36 2009 @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.jms; + +import java.lang.reflect.Method; + +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.XASession; +import javax.resource.spi.endpoint.MessageEndpoint; +import javax.resource.spi.endpoint.MessageEndpointFactory; +import javax.transaction.xa.XAResource; + +import org.apache.cxf.service.model.EndpointInfo; +import org.springframework.jms.listener.DefaultMessageListenerContainer; +import org.springframework.jms.support.JmsUtils; + +public class JCATransactionalMessageListenerContainer extends DefaultMessageListenerContainer { + static final ThreadLocal ENDPOINT_LOCAL = new ThreadLocal(); + static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpointFactory"; + static final String MDB_TRANSACTED_METHOD = "MDBTransactedMethod"; + private MessageEndpointFactory factory; + private Method method; + + public JCATransactionalMessageListenerContainer(EndpointInfo ei) { + factory = ei.getProperty(MESSAGE_ENDPOINT_FACTORY, + MessageEndpointFactory.class); + method = ei.getProperty(MDB_TRANSACTED_METHOD, Method.class); + this.setCacheLevel(CACHE_CONNECTION); + } + + protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer) + throws JMSException { + boolean messageReceived = false; + MessageEndpoint ep = null; + MessageConsumer mc = null; + XASession xa = null; + Session s = null; + + try { + xa = (XASession)createSession(getSharedConnection()); + XAResource xar = xa.getXAResource(); + s = xa.getSession(); + mc = s.createConsumer(getDestination()); + ep = factory.createEndpoint(xar); + ENDPOINT_LOCAL.set(ep); + ep.beforeDelivery(method); + messageReceived = doReceiveAndExecute(invoker, s, mc, null); + ep.afterDelivery(); + } catch (Exception ex) { + throw new JMSException(ex.getMessage()); + } finally { + ep.release(); + JmsUtils.closeMessageConsumer(mc); + JmsUtils.closeSession(xa); + JmsUtils.closeSession(s); + } + + return messageReceived; + } + +} Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=799724&r1=799723&r2=799724&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java Fri Jul 31 19:54:36 2009 @@ -20,6 +20,7 @@ import javax.jms.ConnectionFactory; import javax.jms.Message; +import javax.jms.XAConnectionFactory; import org.apache.cxf.configuration.ConfigurationException; import org.springframework.beans.factory.InitializingBean; @@ -419,7 +420,11 @@ if (wrapInSingleConnectionFactory && !(connectionFactory instanceof SingleConnectionFactory)) { SingleConnectionFactory scf; if (useJms11) { - scf = new SingleConnectionFactory(connectionFactory); + if (connectionFactory instanceof XAConnectionFactory) { + scf = new XASingleConnectionFactory(connectionFactory); + } else { + scf = new SingleConnectionFactory(connectionFactory); + } } else { scf = new SingleConnectionFactory102(connectionFactory, pubSubDomain); } Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=799724&r1=799723&r2=799724&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Fri Jul 31 19:54:36 2009 @@ -39,6 +39,7 @@ import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; +import javax.resource.spi.endpoint.MessageEndpoint; import org.apache.cxf.Bus; import org.apache.cxf.BusFactory; @@ -73,6 +74,7 @@ private JMSConfiguration jmsConfig; private Bus bus; + private EndpointInfo ei; private DefaultMessageListenerContainer jmsListener; private Collection continuations = new ConcurrentLinkedQueue(); @@ -80,6 +82,7 @@ public JMSDestination(Bus b, EndpointInfo info, JMSConfiguration jmsConfig) { super(b, getTargetReference(info, b), info); this.bus = b; + this.ei = info; this.jmsConfig = jmsConfig; info.setProperty(OneWayProcessorInterceptor.USE_ORIGINAL_THREAD, Boolean.TRUE); } @@ -102,8 +105,8 @@ org.apache.cxf.common.i18n.Message msg = new org.apache.cxf.common.i18n.Message("INSUFFICIENT_CONFIGURATION_DESTINATION", LOG, name); jmsConfig.ensureProperlyConfigured(msg); - jmsListener = JMSFactory.createJmsListener(jmsConfig, this, - jmsConfig.getTargetDestination(), null); + jmsListener = JMSFactory.createJmsListener(ei, jmsConfig, this, + jmsConfig.getTargetDestination()); } public void deactivate() { @@ -190,6 +193,12 @@ BusFactory.setThreadDefaultBus(bus); + MessageEndpoint ep = JCATransactionalMessageListenerContainer.ENDPOINT_LOCAL.get(); + if (ep != null) { + inMessage.setContent(MessageEndpoint.class, ep); + JCATransactionalMessageListenerContainer.ENDPOINT_LOCAL.remove(); + } + // handle the incoming message incomingObserver.onMessage(inMessage); } catch (SuspendedInvocationException ex) { Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java?rev=799724&r1=799723&r2=799724&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java Fri Jul 31 19:54:36 2009 @@ -18,6 +18,7 @@ */ package org.apache.cxf.transport.jms; +import java.lang.reflect.Method; import java.util.logging.Logger; import javax.jms.ConnectionFactory; @@ -26,9 +27,11 @@ import javax.jms.MessageListener; import javax.jms.QueueSession; import javax.jms.Session; +import javax.jms.XAConnectionFactory; import javax.naming.NamingException; import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.service.model.EndpointInfo; import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.JmsTemplate102; @@ -112,23 +115,49 @@ } return jmsTemplate; } - /** * Create and start listener using configuration information from jmsConfig. Uses * resolveOrCreateDestination to determine the destination for the listener. * + * @param ei the EndpointInfo for the listener * @param jmsConfig configuration information * @param listenerHandler object to be called when a message arrives * @param destinationName null for temp dest or a destination name - * @param conduitId prefix for the messageselector * @return */ - public static DefaultMessageListenerContainer createJmsListener(JMSConfiguration jmsConfig, + public static DefaultMessageListenerContainer createJmsListener(EndpointInfo ei, + JMSConfiguration jmsConfig, MessageListener listenerHandler, - String destinationName, - String conduitId) { - DefaultMessageListenerContainer jmsListener = jmsConfig.isUseJms11() - ? new DefaultMessageListenerContainer() : new DefaultMessageListenerContainer102(); + String destinationName) { + DefaultMessageListenerContainer jmsListener = null; + + if (jmsConfig.isUseJms11()) { + //Check to see if transport is being used in JCA RA with XA + Method method = ei.getProperty(JCATransactionalMessageListenerContainer.MDB_TRANSACTED_METHOD, + java.lang.reflect.Method.class); + if (method != null + && + jmsConfig.getConnectionFactory() instanceof XAConnectionFactory) { + jmsListener = new JCATransactionalMessageListenerContainer(ei); + } else { + jmsListener = new DefaultMessageListenerContainer(); + } + } else { + jmsListener = new DefaultMessageListenerContainer102(); + } + + return createJmsListener(jmsListener, + jmsConfig, + listenerHandler, + destinationName); + } + + public static DefaultMessageListenerContainer createJmsListener( + DefaultMessageListenerContainer jmsListener, + JMSConfiguration jmsConfig, + MessageListener listenerHandler, + String destinationName) { + jmsListener.setConcurrentConsumers(jmsConfig.getConcurrentConsumers()); jmsListener.setMaxConcurrentConsumers(jmsConfig.getMaxConcurrentConsumers()); jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain()); @@ -161,15 +190,11 @@ if (jmsConfig.isAcceptMessagesWhileStopping()) { jmsListener.setAcceptMessagesWhileStopping(jmsConfig.isAcceptMessagesWhileStopping()); } - /*String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix(); - if (conduitId != null && jmsConfig.isUseConduitIdSelector()) { - jmsListener.setMessageSelector("JMSCorrelationID LIKE '" - + staticSelectorPrefix - + conduitId + "%'"); - } else if (staticSelectorPrefix.length() > 0) { + String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix(); + if (staticSelectorPrefix.length() > 0) { jmsListener.setMessageSelector("JMSCorrelationID LIKE '" + staticSelectorPrefix + "%'"); - }*/ + } if (jmsConfig.getDestinationResolver() != null) { jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver()); } Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java?rev=799724&view=auto ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java (added) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java Fri Jul 31 19:54:36 2009 @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.jms; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Session; +import javax.jms.XAConnection; +import javax.jms.XAConnectionFactory; + +import org.springframework.jms.connection.SingleConnectionFactory; + +public class XASingleConnectionFactory extends SingleConnectionFactory { + + public XASingleConnectionFactory(ConnectionFactory targetConnectionFactory) { + super(targetConnectionFactory); + } + + protected Connection doCreateConnection() throws JMSException { + XAConnectionFactory xcf = (XAConnectionFactory)getTargetConnectionFactory(); + return xcf.createXAConnection(); + } + + protected Session getSession(Connection con, Integer mode) throws JMSException { + XAConnection xac = (XAConnection)con; + return xac.createXASession(); + } + +} Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java ------------------------------------------------------------------------------ svn:keywords = Rev Date