activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1222635 - in /activemq/trunk: activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
Date Fri, 23 Dec 2011 10:48:38 GMT
Author: gtully
Date: Fri Dec 23 10:48:38 2011
New Revision: 1222635

URL: http://svn.apache.org/viewvc?rev=1222635&view=rev
Log:
CallerBufferingDataFileAppender, fix rollover of cached buffers

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java?rev=1222635&r1=1222634&r2=1222635&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
Fri Dec 23 10:48:38 2011
@@ -50,11 +50,13 @@ public class KahaDBFastEnqueueTest {
     private Destination destination = new ActiveMQQueue("Test");
     private String payloadString = new String(new byte[6*1024]);
     private boolean useBytesMessage= true;
-    private final int parallelProducer = 2;
+    private final int parallelProducer = 20;
     private Vector<Exception> exceptions = new Vector<Exception>();
-    final long toSend = 1000;//500000;
+    final long toSend = 500000;
 
-    @Ignore("not ready yet, exploring getting broker disk bound")
+    @Ignore("too slow, exploring getting broker disk bound")
+    // use with:
+    // -Xmx4g -Dorg.apache.kahadb.journal.appender.WRITE_STAT_WINDOW=10000 -Dorg.apache.kahadb.journal.CALLER_BUFFER_APPENDER=true
     public void testPublishNoConsumer() throws Exception {
 
         startBroker(true);
@@ -116,8 +118,10 @@ public class KahaDBFastEnqueueTest {
 
     @After
     public void stopBroker() throws Exception {
-        broker.stop();
-        broker.waitUntilStopped();
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
     }
 
     final double sampleRate = 100000;
@@ -174,4 +178,12 @@ public class KahaDBFastEnqueueTest {
         String options = "?jms.watchTopicAdvisories=false&jms.useAsyncSend=true&jms.alwaysSessionAsync=false&jms.dispatchAsync=false&socketBufferSize=131072&ioBufferSize=16384&wireFormat.tightEncodingEnabled=false&wireFormat.cacheSize=8192";
         connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()
+ options);
     }
+
+    public void testRollover() throws Exception {
+        byte flip = 0x1;
+        for (long i=0; i<Short.MAX_VALUE; i++) {
+            assertEquals("0 @:" + i, 0, flip ^= 1);
+            assertEquals("1 @:" + i, 1, flip ^= 1);
+        }
+    }
 }
\ No newline at end of file

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java?rev=1222635&r1=1222634&r2=1222635&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
Fri Dec 23 10:48:38 2011
@@ -57,10 +57,10 @@ class CallerBufferingDataFileAppender im
             new DataByteArrayOutputStream(maxWriteBatchSize),
             new DataByteArrayOutputStream(maxWriteBatchSize)
     };
-    AtomicInteger writeBatchInstanceCount = new AtomicInteger();
+    volatile byte flip = 0x1;
     public class WriteBatch {
 
-        DataByteArrayOutputStream buff = cachedBuffers[writeBatchInstanceCount.getAndIncrement()%2];
+        DataByteArrayOutputStream buff = cachedBuffers[flip ^= 1];
         public final DataFile dataFile;
 
         public final LinkedNodeList<Journal.WriteCommand> writes = new LinkedNodeList<Journal.WriteCommand>();



Mime
View raw message