cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject git commit: CXF-5680 Support case when jms server is down at start
Date Thu, 10 Apr 2014 11:45:55 GMT
Repository: cxf
Updated Branches:
  refs/heads/master 728a647a2 -> 638f4e8df


CXF-5680 Support case when jms server is down at start


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/638f4e8d
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/638f4e8d
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/638f4e8d

Branch: refs/heads/master
Commit: 638f4e8df6ae21b40244366a18b1af236ead0f0e
Parents: 728a647
Author: Christian Schneider <chris@die-schneider.net>
Authored: Thu Apr 10 13:45:42 2014 +0200
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Thu Apr 10 13:45:42 2014 +0200

----------------------------------------------------------------------
 .../cxf/transport/jms/BackChannelConduit.java   |   4 +-
 .../apache/cxf/transport/jms/JMSConduit.java    |   5 +-
 .../cxf/transport/jms/JMSConfiguration.java     |  11 ++
 .../cxf/transport/jms/JMSDestination.java       |  44 ++++--
 .../util/AbstractMessageListenerContainer.java  | 121 +++++++++++++++
 .../cxf/transport/jms/util/JMSSender.java       |  18 ++-
 .../jms/util/MessageListenerContainer.java      |  90 +----------
 .../util/PollingMessageListenerContainer.java   |  62 +-------
 .../transport/jms/util/MessageListenerTest.java |   4 +-
 .../testcases/SOAPJMSTestSuiteTest.java         |   2 +-
 .../jms/tx/GreeterImplWithTransaction.java      |  27 +++-
 .../jms/tx/JMSTransactionClientServerTest.java  | 149 ++++---------------
 12 files changed, 238 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java
index 321eded..08bb4f3 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java
@@ -114,7 +114,7 @@ class BackChannelConduit extends AbstractConduit implements JMSExchangeSender
{
         ResourceCloser closer = new ResourceCloser();
         try {
             Session session = closer
-                .register(connection.createSession(jmsConfig.isSessionTransacted(), Session.AUTO_ACKNOWLEDGE));
+                .register(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
 
             final JMSMessageHeadersType messageProperties = (JMSMessageHeadersType)outMessage
                 .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
@@ -160,7 +160,7 @@ class BackChannelConduit extends AbstractConduit implements JMSExchangeSender
{
                                       correlationId, JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
             JMSSender sender = JMSFactory.createJmsSender(jmsConfig, messageProperties);
             LOG.log(Level.FINE, "server sending reply: ", reply);
-            sender.sendMessage(closer, session, replyTo, reply);
+            sender.sendMessage(session, replyTo, reply);
         } catch (JMSException ex) {
             throw JMSUtil.convertJmsException(ex);
         } finally {

http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
index 93771a2..291b608 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
@@ -148,7 +148,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender,
Me
         ResourceCloser closer = new ResourceCloser();
         try {
             Connection c = getConnection();
-            Session session = closer.register(c.createSession(jmsConfig.isSessionTransacted(),

+            Session session = closer.register(c.createSession(false, 
                                                               Session.AUTO_ACKNOWLEDGE));
             
             if (exchange.isOneWay()) {
@@ -203,7 +203,6 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender,
Me
                 .getReplyToDestination(session, headers.getJMSReplyTo());
             String jmsMessageID = sendMessage(request, outMessage, replyToDestination, correlationId,
closer,
                                               session);
-
             boolean useSyncReceive = ((correlationId == null || userCID != null) &&
!jmsConfig.isPubSubDomain())
                 || !replyToDestination.equals(staticReplyDestination);
             if (correlationId == null) {
@@ -254,7 +253,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender,
Me
         JMSSender sender = JMSFactory.createJmsSender(jmsConfig, headers);
         
         Destination targetDest = jmsConfig.getTargetDestination(session);
-        sender.sendMessage(closer, session, targetDest, message);
+        sender.sendMessage(session, targetDest, message);
         String jmsMessageID = message.getJMSMessageID();
         LOG.log(Level.FINE, "client sending request message " 
             + jmsMessageID + " to " + targetDest);

http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
index 84321c6..ecda3b0 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
@@ -25,6 +25,7 @@ import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Session;
+import javax.transaction.TransactionManager;
 
 import org.apache.cxf.common.injection.NoJSR250Annotations;
 import org.apache.cxf.transport.jms.util.DestinationResolver;
@@ -85,6 +86,8 @@ public class JMSConfiguration {
     private boolean useConduitIdSelector = true;
     private String conduitSelectorPrefix;
     private boolean jmsProviderTibcoEms;
+    
+    private TransactionManager transactionManager;
 
     // For jms spec. Do not configure manually
     private String targetService;
@@ -413,4 +416,12 @@ public class JMSConfiguration {
         return destinationResolver.resolveDestinationName(session, replyToName, replyPubSubDomain);
     }
 
+    public TransactionManager getTransactionManager() {
+        return transactionManager;
+    }
+
+    public void setTransactionManager(TransactionManager transactionManager) {
+        this.transactionManager = transactionManager;
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index e9dc970..34398c3 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -46,9 +46,11 @@ import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.AbstractMultiplexDestination;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.jms.continuations.JMSContinuationProvider;
+import org.apache.cxf.transport.jms.util.AbstractMessageListenerContainer;
 import org.apache.cxf.transport.jms.util.JMSListenerContainer;
 import org.apache.cxf.transport.jms.util.JMSUtil;
 import org.apache.cxf.transport.jms.util.MessageListenerContainer;
+import org.apache.cxf.transport.jms.util.PollingMessageListenerContainer;
 import org.apache.cxf.transport.jms.util.ResourceCloser;
 
 public class JMSDestination extends AbstractMultiplexDestination implements MessageListener
{
@@ -93,7 +95,18 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
     public void activate() {
         getLogger().log(Level.FINE, "JMSDestination activate().... ");
         jmsConfig.ensureProperlyConfigured();
-        jmsListener = createTargetDestinationListener();
+        try {
+            this.jmsListener = createTargetDestinationListener();
+        } catch (Exception e) {
+            // If first connect fails we will try to establish the connection in the background

+            new Thread(new Runnable() {
+                
+                @Override
+                public void run() {
+                    restartConnection();
+                }
+            }).start();
+        }
     }
     
     
@@ -102,21 +115,25 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
         try {
             connection = JMSFactory.createConnection(jmsConfig);
             connection.setExceptionListener(new ExceptionListener() {
-                
-                @Override
                 public void onException(JMSException exception) {
-                    restartConnection(exception);
+                    LOG.log(Level.WARNING, "Exception on JMS connection. Trying to reconnect",
exception);
+                    restartConnection();
                 }
             });
-            connection.start();
-            session = connection.createSession(jmsConfig.isSessionTransacted(), Session.AUTO_ACKNOWLEDGE);
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             Destination destination = jmsConfig.getTargetDestination(session);
-            MessageListenerContainer container = new MessageListenerContainer(connection,
destination, this);
+            AbstractMessageListenerContainer container = jmsConfig.getTransactionManager()
!= null
+                ? new PollingMessageListenerContainer(connection, destination, this)
+                : new MessageListenerContainer(connection, destination, this);
+            container.setTransactionManager(jmsConfig.getTransactionManager());
             container.setMessageSelector(jmsConfig.getMessageSelector());
+            container.setTransacted(jmsConfig.isSessionTransacted());
+
             Executor executor = JMSFactory.createExecutor(bus, "jms-destination");
             container.setExecutor(executor);
             container.start();
             suspendedContinuations.setListenerContainer(container);
+            connection.start();
             return container;
         } catch (JMSException e) {
             throw JMSUtil.convertJmsException(e);
@@ -125,20 +142,19 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
         }
     }
 
-    protected void restartConnection(JMSException e) {
-        LOG.log(Level.WARNING, "Exception on JMS connection. Trying to reconnect", e);
+    protected void restartConnection() {
         int tries = 0;
         do {
             tries++;
             try {
                 deactivate();
-                activate();
-                LOG.log(Level.INFO, "Reestablished JMS connection");
+                this.jmsListener = createTargetDestinationListener();
+                LOG.log(Level.INFO, "Established JMS connection");
             } catch (Exception e1) {
                 jmsListener = null;
                 String message = "Exception on reconnect. Trying again, attempt num " + tries;
                 if (LOG.isLoggable(Level.FINE)) {
-                    LOG.log(Level.WARNING, message, e);
+                    LOG.log(Level.WARNING, message, e1);
                 } else {
                     LOG.log(Level.WARNING, message);
                 }
@@ -155,6 +171,7 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
         if (jmsListener != null) {
             jmsListener.shutdown();
         }
+        ResourceCloser.close(connection);
         suspendedContinuations.setListenerContainer(null);
         connection = null;
     }
@@ -198,9 +215,6 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
 
             origBus = BusFactory.getAndSetThreadDefaultBus(bus);
 
-            // FIXME
-            // JCATransactionalMessageListenerContainer.setMessageEndpoint(inMessage);
-
             // handle the incoming message
             incomingObserver.onMessage(inMessage);
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
new file mode 100644
index 0000000..f33b3c8
--- /dev/null
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms.util;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.logging.Logger;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.transaction.TransactionManager;
+
+import org.apache.cxf.common.logging.LogUtils;
+
+public abstract class AbstractMessageListenerContainer implements JMSListenerContainer {
+
+    protected static final Logger LOG = LogUtils.getL7dLogger(MessageListenerContainer.class);
+    protected Connection connection;
+    protected Destination destination;
+    protected MessageListener listenerHandler;
+    protected boolean transacted;
+    protected int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+    protected String messageSelector;
+    protected boolean running;
+    protected MessageConsumer consumer;
+    protected Session session;
+    protected Executor executor;
+    protected String durableSubscriptionName;
+    protected boolean pubSubNoLocal;
+    protected TransactionManager transactionManager;
+
+    public AbstractMessageListenerContainer() {
+        super();
+    }
+
+    public Connection getConnection() {
+        return connection;
+    }
+
+    public void setTransacted(boolean transacted) {
+        this.transacted = transacted;
+        if (this.transacted) {
+            this.acknowledgeMode = Session.SESSION_TRANSACTED;
+        }
+    }
+
+    public void setAcknowledgeMode(int acknowledgeMode) {
+        this.acknowledgeMode = acknowledgeMode;
+    }
+
+    public void setMessageSelector(String messageSelector) {
+        this.messageSelector = messageSelector;
+    }
+
+    protected Executor getExecutor() {
+        if (executor == null) {
+            executor = Executors.newFixedThreadPool(10);
+        }
+        return executor;
+    }
+
+    public void setExecutor(Executor executor) {
+        this.executor = executor;
+    }
+
+    public void setDurableSubscriptionName(String durableSubscriptionName) {
+        this.durableSubscriptionName = durableSubscriptionName;
+    }
+
+    public void setPubSubNoLocal(boolean pubSubNoLocal) {
+        this.pubSubNoLocal = pubSubNoLocal;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return running;
+    }
+    
+    public void setTransactionManager(TransactionManager transactionManager) {
+        this.transactionManager = transactionManager;
+    }
+
+
+    
+    
+
+    /*
+    protected TransactionManager getTransactionManager() {
+        if (this.transactionManager == null) {
+            try {
+                InitialContext ctx = new InitialContext();
+                this.transactionManager = (TransactionManager)ctx
+                    .lookup("javax.transaction.TransactionManager");
+            } catch (NamingException e) {
+                // Ignore
+            }
+        }
+        return this.transactionManager;
+    }
+    */
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSSender.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSSender.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSSender.java
index a5d8424..5a0d099 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSSender.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSSender.java
@@ -45,13 +45,19 @@ public class JMSSender {
         this.timeToLive = timeToLive;
     }
 
-    public void sendMessage(ResourceCloser closer, Session session, Destination targetDest,
+    public void sendMessage(Session session, Destination targetDest,
                             javax.jms.Message message) throws JMSException {
-        MessageProducer producer = closer.register(session.createProducer(targetDest));
-        if (explicitQosEnabled) {
-            producer.send(message, deliveryMode, priority, timeToLive);
-        } else {
-            producer.send(message);
+        MessageProducer producer = null;
+        try {
+            producer = session.createProducer(targetDest);
+            if (explicitQosEnabled) {
+                producer.send(message, deliveryMode, priority, timeToLive);
+            } else {
+                producer.send(message);
+            }
+        } finally {
+            ResourceCloser.close(producer);
         }
+        
     }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
index 106934c..3a7b2e3 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
@@ -19,25 +19,18 @@
 package org.apache.cxf.transport.jms.util;
 
 import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
 import javax.jms.Topic;
 import javax.jms.XASession;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
 import javax.transaction.TransactionManager;
 
-import org.apache.cxf.common.logging.LogUtils;
-
 /**
  * Listen for messages on a queue or topic asynchronously by registering a
  * MessageListener.
@@ -45,74 +38,14 @@ import org.apache.cxf.common.logging.LogUtils;
  * Warning: This class does not refresh connections when the server goes away
  * This has to be handled outside.
  */
-public class MessageListenerContainer implements JMSListenerContainer {
-    private static final Logger LOG = LogUtils.getL7dLogger(MessageListenerContainer.class);
-
-    private Connection connection;
-    private Destination destination;
-    private MessageListener listenerHandler;
-    private boolean transacted;
-    private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-    private String messageSelector;
-    private boolean running;
-    private MessageConsumer consumer;
-    private Session session;
-    private Executor executor;
-    private String durableSubscriptionName;
-    private boolean pubSubNoLocal;
-    private TransactionManager transactionManager;
-
+public class MessageListenerContainer extends AbstractMessageListenerContainer {
     public MessageListenerContainer(Connection connection, Destination destination,
                                     MessageListener listenerHandler) {
         this.connection = connection;
         this.destination = destination;
         this.listenerHandler = listenerHandler;
     }
-
-    public Connection getConnection() {
-        return connection;
-    }
-
-    public void setTransacted(boolean transacted) {
-        this.transacted = transacted;
-    }
-
-    public void setAcknowledgeMode(int acknowledgeMode) {
-        this.acknowledgeMode = acknowledgeMode;
-    }
-
-    public void setMessageSelector(String messageSelector) {
-        this.messageSelector = messageSelector;
-    }
-
-    protected Executor getExecutor() {
-        if (executor == null) {
-            executor = Executors.newFixedThreadPool(10);
-        }
-        return executor;
-    }
-
-    public void setExecutor(Executor executor) {
-        this.executor = executor;
-    }
-
-    public void setDurableSubscriptionName(String durableSubscriptionName) {
-        this.durableSubscriptionName = durableSubscriptionName;
-    }
-
-    public void setPubSubNoLocal(boolean pubSubNoLocal) {
-        this.pubSubNoLocal = pubSubNoLocal;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return running;
-    }
-
-    public void setTransactionManager(TransactionManager transactionManager) {
-        this.transactionManager = transactionManager;
-    }
-
+    
     @Override
     public void start() {
         try {
@@ -124,9 +57,7 @@ public class MessageListenerContainer implements JMSListenerContainer {
                 consumer = session.createConsumer(destination, messageSelector);
             }
             
-            MessageListener intListener = (transactionManager != null)
-                ? new XATransactionalMessageListener(transactionManager, session, listenerHandler)
-                : new LocalTransactionalMessageListener(session, listenerHandler); 
+            MessageListener intListener = new LocalTransactionalMessageListener(session,
listenerHandler); 
             // new DispachingListener(getExecutor(), listenerHandler);
             consumer.setMessageListener(intListener);
             
@@ -144,26 +75,13 @@ public class MessageListenerContainer implements JMSListenerContainer
{
         consumer = null;
         session = null;
     }
-
+    
     @Override
     public void shutdown() {
         stop();
         ResourceCloser.close(connection);
     }
 
-    protected TransactionManager getTransactionManager() {
-        if (this.transactionManager == null) {
-            try {
-                InitialContext ctx = new InitialContext();
-                this.transactionManager = (TransactionManager)ctx
-                    .lookup("javax.transaction.TransactionManager");
-            } catch (NamingException e) {
-                // Ignore
-            }
-        }
-        return this.transactionManager;
-    }
-
     static class DispachingListener implements MessageListener {
         private Executor executor;
         private MessageListener listenerHandler;

http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index b7c725d..1d81a7e 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -18,7 +18,6 @@
  */
 package org.apache.cxf.transport.jms.util;
 
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -31,27 +30,12 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
-import javax.transaction.TransactionManager;
 
 import org.apache.cxf.common.logging.LogUtils;
 
-public class PollingMessageListenerContainer implements JMSListenerContainer {
+public class PollingMessageListenerContainer extends AbstractMessageListenerContainer {
     private static final Logger LOG = LogUtils.getL7dLogger(PollingMessageListenerContainer.class);
 
-    private Connection connection;
-    private Destination destination;
-    private MessageListener listenerHandler;
-    private boolean transacted;
-    private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-    private String messageSelector;
-    private boolean running;
-    private Executor executor;
-    @SuppressWarnings("unused")
-    private String durableSubscriptionName;
-    @SuppressWarnings("unused")
-    private boolean pubSubNoLocal;
-    private TransactionManager transactionManager;
-
     private ExecutorService pollers;
 
     private int numListenerThreads = 1;
@@ -63,50 +47,6 @@ public class PollingMessageListenerContainer implements JMSListenerContainer
{
         this.listenerHandler = listenerHandler;
     }
 
-    public Connection getConnection() {
-        return connection;
-    }
-
-    public void setTransacted(boolean transacted) {
-        this.transacted = transacted;
-    }
-
-    public void setAcknowledgeMode(int acknowledgeMode) {
-        this.acknowledgeMode = acknowledgeMode;
-    }
-
-    public void setMessageSelector(String messageSelector) {
-        this.messageSelector = messageSelector;
-    }
-
-    protected Executor getExecutor() {
-        if (executor == null) {
-            executor = Executors.newFixedThreadPool(10);
-        }
-        return executor;
-    }
-
-    public void setExecutor(Executor executor) {
-        this.executor = executor;
-    }
-
-    public void setDurableSubscriptionName(String durableSubscriptionName) {
-        this.durableSubscriptionName = durableSubscriptionName;
-    }
-
-    public void setPubSubNoLocal(boolean pubSubNoLocal) {
-        this.pubSubNoLocal = pubSubNoLocal;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return running;
-    }
-
-    public void setTransactionManager(TransactionManager transactionManager) {
-        this.transactionManager = transactionManager;
-    }
-
     class Poller implements Runnable {
 
         @Override

http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
index c1bf86a..8c05076 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
@@ -75,7 +75,7 @@ public class MessageListenerTest {
         Queue dest = createQueue(connection, "test");
 
         MessageListener listenerHandler = new TestMessageListener();
-        PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection,
dest,
+        AbstractMessageListenerContainer container = new MessageListenerContainer(connection,
dest,
                                                                                         listenerHandler);
         container.setTransacted(false);
         container.setAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
@@ -99,7 +99,7 @@ public class MessageListenerTest {
         Connection connection = createConnection("brokerLocalTransaction");
         Queue dest = createQueue(connection, "test");
         MessageListener listenerHandler = new TestMessageListener();
-        MessageListenerContainer container = new MessageListenerContainer(connection, dest,
listenerHandler);
+        AbstractMessageListenerContainer container = new MessageListenerContainer(connection,
dest, listenerHandler);
         container.setTransacted(true);
         container.setAcknowledgeMode(Session.SESSION_TRANSACTED);
         container.start();

http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SOAPJMSTestSuiteTest.java
----------------------------------------------------------------------
diff --git a/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SOAPJMSTestSuiteTest.java
b/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SOAPJMSTestSuiteTest.java
index dd0f628..1b8939a 100644
--- a/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SOAPJMSTestSuiteTest.java
+++ b/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SOAPJMSTestSuiteTest.java
@@ -647,7 +647,7 @@ public class SOAPJMSTestSuiteTest extends AbstractBusClientServerTestBase
{
             Destination replyToDestination = jmsConfig.getReplyToDestination(session, null);
             JMSSender sender = JMSFactory.createJmsSender(jmsConfig, null);
             Message jmsMessage = JMSTestUtil.buildJMSMessageFromTestCase(testcase, session,
replyToDestination);
-            sender.sendMessage(closer, session, targetDest, jmsMessage);
+            sender.sendMessage(session, targetDest, jmsMessage);
             Message replyMessage = JMSUtil.receive(session, replyToDestination, 
                                                    jmsMessage.getJMSMessageID(), 10000, true);
             checkReplyMessage(replyMessage, testcase);

http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java
----------------------------------------------------------------------
diff --git a/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java
b/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java
index 3536f86..e2a16a2 100644
--- a/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java
+++ b/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java
@@ -22,15 +22,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jws.WebService;
 
-
-import org.apache.cxf.systest.jms.GreeterImplDocBase;
+import org.apache.hello_world_doc_lit.Greeter;
+import org.apache.hello_world_doc_lit.PingMeFault;
 
 @WebService(endpointInterface = "org.apache.hello_world_doc_lit.Greeter")
-public class GreeterImplWithTransaction extends GreeterImplDocBase {
+public class GreeterImplWithTransaction implements Greeter {
     private AtomicBoolean flag = new AtomicBoolean(true);
        
     public String greetMe(String requestType) {
-        //System.out.println("Reached here :" + requestType);
+        System.out.println("Reached here :" + requestType);
         if ("Bad guy".equals(requestType)) {
             if (flag.getAndSet(false)) {
                 //System.out.println("Throw exception here :" + requestType);
@@ -42,5 +42,24 @@ public class GreeterImplWithTransaction extends GreeterImplDocBase {
         }
         return "Hello " + requestType;
     }
+
+    @Override
+    public void greetMeOneWay(String name) {
+        if ("Bad guy".equals(name)) {
+            throw new RuntimeException("Got a bad guy call for greetMe");
+        }
+    }
+
+    @Override
+    public void pingMe() throws PingMeFault {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public String sayHi() {
+        // TODO Auto-generated method stub
+        return null;
+    }
     
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java
----------------------------------------------------------------------
diff --git a/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java
b/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java
index fb40aa4..1b262e8 100644
--- a/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java
+++ b/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java
@@ -18,145 +18,56 @@
  */
 package org.apache.cxf.systest.jms.tx;
 
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.URL;
+import java.util.Collections;
 
-import javax.jms.ConnectionFactory;
-import javax.xml.namespace.QName;
-
-import org.apache.activemq.pool.PooledConnectionFactory;
-import org.apache.cxf.Bus;
-import org.apache.cxf.BusFactory;
-import org.apache.cxf.bus.spring.SpringBusFactory;
 import org.apache.cxf.jaxws.EndpointImpl;
 import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
-import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
-import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
-import org.apache.cxf.testutil.common.EmbeddedJMSBrokerLauncher;
-import org.apache.cxf.transport.jms.JMSConfigFeature;
-import org.apache.cxf.transport.jms.JMSConfiguration;
+import org.apache.cxf.systest.jms.AbstractVmJMSTest;
+import org.apache.cxf.transport.jms.spec.JMSSpecConstants;
 import org.apache.hello_world_doc_lit.Greeter;
-import org.apache.hello_world_doc_lit.PingMeFault;
-import org.apache.hello_world_doc_lit.SOAPService2;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.springframework.context.support.ClassPathXmlApplicationContext;
 
-/**
- * Test transactions based on spring transactions.
- * These will not be supported anymore in cxf >= 3
- */
 @Ignore
-public class JMSTransactionClientServerTest extends AbstractBusClientServerTestBase {
-    private static final String BROKER_URI = "vm://JMSTransactionClientServerTest?broker.persistent=false";
-    private static EmbeddedJMSBrokerLauncher broker;
-
-    public static class Server extends AbstractBusTestServerBase {
-        ClassPathXmlApplicationContext context;
-        EndpointImpl endpoint;
-        protected void run()  {
-            SpringBusFactory bf = new SpringBusFactory();
-            Bus bus = bf.createBus("org/apache/cxf/systest/jms/tx/jms_server_config.xml");
-            BusFactory.setDefaultBus(bus);
-            endpoint = new EndpointImpl(bus, new GreeterImplWithTransaction());
-            endpoint.setAddress("jms:queue:greeter.queue.noaop?sessionTransacted=true");
-            endpoint.publish();
-        }
-        public void tearDown() {
-            endpoint.stop();
-            context.close();
-        }
-    }
+public class JMSTransactionClientServerTest extends AbstractVmJMSTest {
+    private static final String SERVICE_ADDRESS = 
+        "jms:queue:greeter.queue.tx?receivetTimeOut=5000&sessionTransacted=true";
+    private static EndpointImpl endpoint;
 
     @BeforeClass
     public static void startServers() throws Exception {
-        broker = new EmbeddedJMSBrokerLauncher(BROKER_URI);
-        System.setProperty("EmbeddedBrokerURL", broker.getBrokerURL());
-        launchServer(broker);
-        launchServer(new Server());
-        createStaticBus();
+        startBusAndJMS(JMSTransactionClientServerTest.class);
+
+        endpoint = new EndpointImpl(bus, new GreeterImplWithTransaction());
+        endpoint.setAddress(SERVICE_ADDRESS);
+        endpoint.setFeatures(Collections.singletonList(cff));
+        endpoint.publish();
     }
+
     @AfterClass
     public static void clearProperty() {
-        System.clearProperty("EmbeddedBrokerURL");
-    }
-    public URL getWSDLURL(String s) throws Exception {
-        return getClass().getResource(s);
-    }
-    public QName getServiceName(QName q) {
-        return q;
+        endpoint.stop();
     }
-    public QName getPortName(QName q) {
-        return q;
-    }
-    
-    @Ignore
-    @Test
-    public void testDocBasicConnection() throws Exception {
-        QName serviceName = getServiceName(new QName("http://apache.org/hello_world_doc_lit",

-                                 "SOAPService2"));
-        QName portName = getPortName(new QName("http://apache.org/hello_world_doc_lit", "SoapPort2"));
-        URL wsdl = getWSDLURL("/wsdl/hello_world_doc_lit.wsdl");
-        assertNotNull(wsdl);
-        String wsdlString = wsdl.toString();
-        SOAPService2 service = new SOAPService2(wsdl, serviceName);
-        broker.updateWsdl(getBus(), wsdlString);
-        assertNotNull(service);
 
-        Greeter greeter = service.getPort(portName, Greeter.class);
-        doService(greeter, true);
-    }
-    
-    @Ignore
     @Test
-    public void testNonAopTransaction() throws Exception {
-        JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
-        factory.setServiceClass(Greeter.class);
-        factory.setAddress("jms://");
-
-        JMSConfiguration jmsConfig = new JMSConfiguration();
-        ConnectionFactory connectionFactory
-            = new PooledConnectionFactory(broker.getBrokerURL());
-        jmsConfig.setConnectionFactory(connectionFactory);
-        jmsConfig.setTargetDestination("greeter.queue.noaop");
-        jmsConfig.setPubSubDomain(false);
-
-        JMSConfigFeature jmsConfigFeature = new JMSConfigFeature();
-        jmsConfigFeature.setJmsConfig(jmsConfig);
-        factory.getFeatures().add(jmsConfigFeature);
-
-        Greeter greeter = (Greeter)factory.create();
-        doService(greeter, false);
-    }    
-    public void doService(Greeter greeter, boolean doEx) throws Exception {
-
-        String response1 = new String("Hello ");
+    public void testTransaction() throws Exception {
+        Greeter greeter = createGreeterProxy();
+        // Should be processed normally
+        greeter.greetMeOneWay("Good guy");
         
-        try {
-                          
-            String greeting = greeter.greetMe("Good guy");
-            assertNotNull("No response received from service", greeting);
-            String exResponse = response1 + "Good guy";
-            assertEquals("Get unexcpeted result", exResponse, greeting);
-
-            greeting = greeter.greetMe("Bad guy");
-            assertNotNull("No response received from service", greeting);
-            exResponse = response1 + "[Bad guy]";
-            assertEquals("Get unexcpeted result", exResponse, greeting);
-            
-            if (doEx) {
-                try {
-                    greeter.pingMe();
-                    fail("Should have thrown FaultException");
-                } catch (PingMeFault ex) {
-                    assertNotNull(ex.getFaultInfo());
-                }
-            }
-        } catch (UndeclaredThrowableException ex) {
-            throw (Exception)ex.getCause();
-        }
+        // Should cause rollback, redelivery and in the end the message should go to the
dead letter queue
+        greeter.greetMe("Bad guy");
     }
 
+    private Greeter createGreeterProxy() throws Exception {
+        JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
+        factory.setBus(bus);
+        factory.getFeatures().add(cff);
+        factory.setTransportId(JMSSpecConstants.SOAP_JMS_SPECIFICATION_TRANSPORTID);
+        factory.setServiceClass(Greeter.class);
+        factory.setAddress(SERVICE_ADDRESS);
+        return (Greeter)markForClose(factory.create());
+    }
 }


Mime
View raw message