activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r946725 - in /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf: KahaDBDurableTransactedTopicTest.java PerfProducer.java
Date Thu, 20 May 2010 17:39:10 GMT
Author: rajdavies
Date: Thu May 20 17:39:09 2010
New Revision: 946725

URL: http://svn.apache.org/viewvc?rev=946725&view=rev
Log:
added transacted topic test 

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTransactedTopicTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTransactedTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTransactedTopicTest.java?rev=946725&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTransactedTopicTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTransactedTopicTest.java
Thu May 20 17:39:09 2010
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.perf;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public class KahaDBDurableTransactedTopicTest extends SimpleDurableTopicTest {
+
+    @Override
+    protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number,
byte[] payload) throws JMSException {
+        return new PerfProducer(fac, dest, payload);
+    }
+
+  
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTransactedTopicTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTransactedTopicTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java?rev=946725&r1=946724&r2=946725&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java
Thu May 20 17:39:09 2010
@@ -18,7 +18,6 @@ package org.apache.activemq.perf;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -34,23 +33,34 @@ public class PerfProducer implements Run
     protected Connection connection;
     protected MessageProducer producer;
     protected PerfRate rate = new PerfRate();
-    private byte[] payload;
+    private final byte[] payload;
     private Session session;
     private final CountDownLatch stopped = new CountDownLatch(1);
     private boolean running;
+    private final boolean transacted;
     private int sleep = 0;
 
-    public PerfProducer(ConnectionFactory fac, Destination dest, byte[] palyload) throws
JMSException {
+    public PerfProducer(ConnectionFactory fac, Destination dest, byte[] payload) throws JMSException
{
+        this(fac, dest, payload, false);
+    }
+    public PerfProducer(ConnectionFactory fac, Destination dest, byte[] payload, boolean
transacted)
+            throws JMSException {
         connection = fac.createConnection();
-        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        this.transacted = transacted;
+        if (transacted) {
+            session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        } else {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        }
         producer = session.createProducer(dest);
-        this.payload = palyload;
+        this.payload = payload;
+       
     }
 
     public void setDeliveryMode(int mode) throws JMSException {
         producer.setDeliveryMode(mode);
     }
-    
+
     public void setTimeToLive(int ttl) throws JMSException {
         producer.setTimeToLive(ttl);
     }
@@ -68,7 +78,7 @@ public class PerfProducer implements Run
             rate.reset();
             running = true;
             connection.start();
-            Thread t = new  Thread(this);
+            Thread t = new Thread(this);
             t.setName("Producer");
             t.start();
         }
@@ -78,7 +88,7 @@ public class PerfProducer implements Run
         synchronized (this) {
             running = false;
         }
-        stopped.await(1,TimeUnit.SECONDS);
+        stopped.await(1, TimeUnit.SECONDS);
         connection.stop();
     }
 
@@ -93,6 +103,9 @@ public class PerfProducer implements Run
                 msg = session.createBytesMessage();
                 msg.writeBytes(payload);
                 producer.send(msg);
+                if(this.transacted) {
+                    this.session.commit();
+                }
                 rate.increment();
                 if (sleep > 0) {
                     Thread.sleep(sleep);



Mime
View raw message