cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject [1/3] git commit: CXF-5543 Fixing transactional tests and adding Polling
Date Fri, 04 Apr 2014 07:15:29 GMT
Repository: cxf
Updated Branches:
  refs/heads/master c67b0119c -> 7c7fff780


CXF-5543 Fixing transactional tests and adding Polling


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/a96ef8a0
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/a96ef8a0
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/a96ef8a0

Branch: refs/heads/master
Commit: a96ef8a0f5d3f311a2530b9f13a2a8424c1b665c
Parents: c62ac16
Author: Christian Schneider <chris@die-schneider.net>
Authored: Thu Apr 3 13:55:34 2014 +0200
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Thu Apr 3 13:55:34 2014 +0200

----------------------------------------------------------------------
 .../jms/util/MessageListenerContainer.java      | 253 -------------------
 .../util/PollingMessageListenerContainer.java   | 253 +++++++++++++++++++
 2 files changed, 253 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/a96ef8a0/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
deleted file mode 100644
index 01d9ae7..0000000
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.transport.jms.util;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.XASession;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import javax.transaction.TransactionManager;
-
-import org.apache.cxf.common.logging.LogUtils;
-
-public class MessageListenerContainer implements JMSListenerContainer {
-    private static final Logger LOG = LogUtils.getL7dLogger(MessageListenerContainer.class);
-
-    private Connection connection;
-    private Destination destination;
-    private MessageListener listenerHandler;
-    private boolean transacted;
-    private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-    private String messageSelector;
-    private boolean running;
-    private MessageConsumer consumer;
-    private Session session;
-    private Executor executor;
-    private String durableSubscriptionName;
-    private boolean pubSubNoLocal;
-    private TransactionManager transactionManager;
-
-    public MessageListenerContainer(Connection connection, Destination destination,
-                                    MessageListener listenerHandler) {
-        this.connection = connection;
-        this.destination = destination;
-        this.listenerHandler = listenerHandler;
-    }
-
-    public Connection getConnection() {
-        return connection;
-    }
-
-    public void setTransacted(boolean transacted) {
-        this.transacted = transacted;
-    }
-
-    public void setAcknowledgeMode(int acknowledgeMode) {
-        this.acknowledgeMode = acknowledgeMode;
-    }
-
-    public void setMessageSelector(String messageSelector) {
-        this.messageSelector = messageSelector;
-    }
-
-    protected Executor getExecutor() {
-        if (executor == null) {
-            executor = Executors.newFixedThreadPool(10);
-        }
-        return executor;
-    }
-
-    public void setExecutor(Executor executor) {
-        this.executor = executor;
-    }
-
-    public void setDurableSubscriptionName(String durableSubscriptionName) {
-        this.durableSubscriptionName = durableSubscriptionName;
-    }
-
-    public void setPubSubNoLocal(boolean pubSubNoLocal) {
-        this.pubSubNoLocal = pubSubNoLocal;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return running;
-    }
-
-    public void setTransactionManager(TransactionManager transactionManager) {
-        this.transactionManager = transactionManager;
-    }
-
-    @Override
-    public void start() {
-        try {
-            session = connection.createSession(transacted, acknowledgeMode);
-            if (durableSubscriptionName != null) {
-                consumer = session.createDurableSubscriber((Topic)destination, durableSubscriptionName,
-                                                           messageSelector, pubSubNoLocal);
-            } else {
-                consumer = session.createConsumer(destination, messageSelector);
-            }
-            
-            MessageListener intListener = (transactionManager != null)
-                ? new XATransactionalMessageListener(transactionManager, session, listenerHandler)
-                : new LocalTransactionalMessageListener(session, listenerHandler); 
-            // new DispachingListener(getExecutor(), listenerHandler);
-            consumer.setMessageListener(intListener);
-            
-            running = true;
-        } catch (JMSException e) {
-            throw JMSUtil.convertJmsException(e);
-        }
-    }
-
-    @Override
-    public void stop() {
-        running = false;
-        ResourceCloser.close(consumer);
-        ResourceCloser.close(session);
-        consumer = null;
-        session = null;
-    }
-
-    @Override
-    public void shutdown() {
-        stop();
-        ResourceCloser.close(connection);
-    }
-
-    protected TransactionManager getTransactionManager() {
-        if (this.transactionManager == null) {
-            try {
-                InitialContext ctx = new InitialContext();
-                this.transactionManager = (TransactionManager)ctx
-                    .lookup("javax.transaction.TransactionManager");
-            } catch (NamingException e) {
-                // Ignore
-            }
-        }
-        return this.transactionManager;
-    }
-
-    static class DispachingListener implements MessageListener {
-        private Executor executor;
-        private MessageListener listenerHandler;
-
-        public DispachingListener(Executor executor, MessageListener listenerHandler) {
-            this.executor = executor;
-            this.listenerHandler = listenerHandler;
-        }
-
-        @Override
-        public void onMessage(final Message message) {
-            executor.execute(new Runnable() {
-
-                @Override
-                public void run() {
-                    listenerHandler.onMessage(message);
-                }
-
-            });
-        }
-
-    }
-    
-    static class LocalTransactionalMessageListener implements MessageListener {
-        private MessageListener listenerHandler;
-        private Session session;
-        
-        public LocalTransactionalMessageListener(Session session, MessageListener listenerHandler)
{
-            this.session = session;
-            this.listenerHandler = listenerHandler;
-        }
-
-        @Override
-        public void onMessage(Message message) {
-            try {
-                listenerHandler.onMessage(message);
-                session.commit();
-            } catch (Throwable e) {
-                safeRollback(e);
-            }
-        }
-        
-        private void safeRollback(Throwable t) {
-            LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling
back" , t);
-            try {
-                session.rollback();
-            } catch (Exception e) {
-                LOG.log(Level.WARNING, "Rollback of Local transaction failed", e);
-            }
-        }
-        
-    }
-    
-    @SuppressWarnings("PMD")
-    static class XATransactionalMessageListener implements MessageListener {
-        private TransactionManager tm;
-        private MessageListener listenerHandler;
-        private XASession session;
-        
-        public XATransactionalMessageListener(TransactionManager tm, Session session, MessageListener
listenerHandler) {
-            if (tm == null) {
-                throw new IllegalArgumentException("Must supply a transaction manager");
-            }
-            if (session == null || !(session instanceof XASession)) {
-                throw new IllegalArgumentException("Must supply an XASession");
-            }
-            this.tm = tm;
-            this.session = (XASession)session;
-            this.listenerHandler = listenerHandler;
-        }
-
-        @Override
-        public void onMessage(Message message) {
-            try {
-                tm.begin();
-                tm.getTransaction().enlistResource(session.getXAResource());
-                listenerHandler.onMessage(message);
-                tm.commit();
-            } catch (Throwable e) {
-                safeRollback(e);
-            }
-        }
-        
-        private void safeRollback(Throwable t) {
-            LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling
back" , t);
-            try {
-                tm.rollback();
-            } catch (Exception e) {
-                LOG.log(Level.WARNING, "Rollback of JTA transaction failed", e);
-            }
-        }
-        
-    }
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/a96ef8a0/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
new file mode 100644
index 0000000..01d9ae7
--- /dev/null
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms.util;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.transaction.TransactionManager;
+
+import org.apache.cxf.common.logging.LogUtils;
+
+public class MessageListenerContainer implements JMSListenerContainer {
+    private static final Logger LOG = LogUtils.getL7dLogger(MessageListenerContainer.class);
+
+    private Connection connection;
+    private Destination destination;
+    private MessageListener listenerHandler;
+    private boolean transacted;
+    private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+    private String messageSelector;
+    private boolean running;
+    private MessageConsumer consumer;
+    private Session session;
+    private Executor executor;
+    private String durableSubscriptionName;
+    private boolean pubSubNoLocal;
+    private TransactionManager transactionManager;
+
+    public MessageListenerContainer(Connection connection, Destination destination,
+                                    MessageListener listenerHandler) {
+        this.connection = connection;
+        this.destination = destination;
+        this.listenerHandler = listenerHandler;
+    }
+
+    public Connection getConnection() {
+        return connection;
+    }
+
+    public void setTransacted(boolean transacted) {
+        this.transacted = transacted;
+    }
+
+    public void setAcknowledgeMode(int acknowledgeMode) {
+        this.acknowledgeMode = acknowledgeMode;
+    }
+
+    public void setMessageSelector(String messageSelector) {
+        this.messageSelector = messageSelector;
+    }
+
+    protected Executor getExecutor() {
+        if (executor == null) {
+            executor = Executors.newFixedThreadPool(10);
+        }
+        return executor;
+    }
+
+    public void setExecutor(Executor executor) {
+        this.executor = executor;
+    }
+
+    public void setDurableSubscriptionName(String durableSubscriptionName) {
+        this.durableSubscriptionName = durableSubscriptionName;
+    }
+
+    public void setPubSubNoLocal(boolean pubSubNoLocal) {
+        this.pubSubNoLocal = pubSubNoLocal;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return running;
+    }
+
+    public void setTransactionManager(TransactionManager transactionManager) {
+        this.transactionManager = transactionManager;
+    }
+
+    @Override
+    public void start() {
+        try {
+            session = connection.createSession(transacted, acknowledgeMode);
+            if (durableSubscriptionName != null) {
+                consumer = session.createDurableSubscriber((Topic)destination, durableSubscriptionName,
+                                                           messageSelector, pubSubNoLocal);
+            } else {
+                consumer = session.createConsumer(destination, messageSelector);
+            }
+            
+            MessageListener intListener = (transactionManager != null)
+                ? new XATransactionalMessageListener(transactionManager, session, listenerHandler)
+                : new LocalTransactionalMessageListener(session, listenerHandler); 
+            // new DispachingListener(getExecutor(), listenerHandler);
+            consumer.setMessageListener(intListener);
+            
+            running = true;
+        } catch (JMSException e) {
+            throw JMSUtil.convertJmsException(e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        running = false;
+        ResourceCloser.close(consumer);
+        ResourceCloser.close(session);
+        consumer = null;
+        session = null;
+    }
+
+    @Override
+    public void shutdown() {
+        stop();
+        ResourceCloser.close(connection);
+    }
+
+    protected TransactionManager getTransactionManager() {
+        if (this.transactionManager == null) {
+            try {
+                InitialContext ctx = new InitialContext();
+                this.transactionManager = (TransactionManager)ctx
+                    .lookup("javax.transaction.TransactionManager");
+            } catch (NamingException e) {
+                // Ignore
+            }
+        }
+        return this.transactionManager;
+    }
+
+    static class DispachingListener implements MessageListener {
+        private Executor executor;
+        private MessageListener listenerHandler;
+
+        public DispachingListener(Executor executor, MessageListener listenerHandler) {
+            this.executor = executor;
+            this.listenerHandler = listenerHandler;
+        }
+
+        @Override
+        public void onMessage(final Message message) {
+            executor.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    listenerHandler.onMessage(message);
+                }
+
+            });
+        }
+
+    }
+    
+    static class LocalTransactionalMessageListener implements MessageListener {
+        private MessageListener listenerHandler;
+        private Session session;
+        
+        public LocalTransactionalMessageListener(Session session, MessageListener listenerHandler)
{
+            this.session = session;
+            this.listenerHandler = listenerHandler;
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                listenerHandler.onMessage(message);
+                session.commit();
+            } catch (Throwable e) {
+                safeRollback(e);
+            }
+        }
+        
+        private void safeRollback(Throwable t) {
+            LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling
back" , t);
+            try {
+                session.rollback();
+            } catch (Exception e) {
+                LOG.log(Level.WARNING, "Rollback of Local transaction failed", e);
+            }
+        }
+        
+    }
+    
+    @SuppressWarnings("PMD")
+    static class XATransactionalMessageListener implements MessageListener {
+        private TransactionManager tm;
+        private MessageListener listenerHandler;
+        private XASession session;
+        
+        public XATransactionalMessageListener(TransactionManager tm, Session session, MessageListener
listenerHandler) {
+            if (tm == null) {
+                throw new IllegalArgumentException("Must supply a transaction manager");
+            }
+            if (session == null || !(session instanceof XASession)) {
+                throw new IllegalArgumentException("Must supply an XASession");
+            }
+            this.tm = tm;
+            this.session = (XASession)session;
+            this.listenerHandler = listenerHandler;
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                tm.begin();
+                tm.getTransaction().enlistResource(session.getXAResource());
+                listenerHandler.onMessage(message);
+                tm.commit();
+            } catch (Throwable e) {
+                safeRollback(e);
+            }
+        }
+        
+        private void safeRollback(Throwable t) {
+            LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling
back" , t);
+            try {
+                tm.rollback();
+            } catch (Exception e) {
+                LOG.log(Level.WARNING, "Rollback of JTA transaction failed", e);
+            }
+        }
+        
+    }
+}


Mime
View raw message