activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r741169 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-core/src/test/java/org/apache/activemq/bugs/ kahadb/src/main/java/org/apache/kahadb/page/
Date Thu, 05 Feb 2009 16:33:23 GMT
Author: chirino
Date: Thu Feb  5 16:33:23 2009
New Revision: 741169

URL: http://svn.apache.org/viewvc?rev=741169&view=rev
Log:
Bettter property names. 


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=741169&r1=741168&r2=741169&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Thu Feb  5 16:33:23 2009
@@ -140,7 +140,7 @@
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
             command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
 
-            store(command, isSyncWrites() && message.isResponseRequired());
+            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired());
             
         }
         
@@ -149,7 +149,7 @@
             command.setDestination(dest);
             command.setMessageId(ack.getLastMessageId().toString());
             command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()) );
-            store(command, isSyncWrites() && ack.isResponseRequired());
+            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired());
         }
 
         public void removeAllMessages(ConnectionContext context) throws IOException {
@@ -282,14 +282,14 @@
             command.setRetroactive(retroactive);
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
             command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(),
packet.getLength()));
-            store(command, isSyncWrites() && true);
+            store(command, isEnableJournalDiskSyncs() && true);
         }
 
         public void deleteSubscription(String clientId, String subscriptionName) throws IOException
{
             KahaSubscriptionCommand command = new KahaSubscriptionCommand();
             command.setDestination(dest);
             command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
-            store(command, isSyncWrites() && true);
+            store(command, isEnableJournalDiskSyncs() && true);
         }
 
         public SubscriptionInfo[] getAllSubscriptions() throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741169&r1=741168&r2=741169&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Thu Feb  5 16:33:23 2009
@@ -146,9 +146,9 @@
     protected boolean deleteAllMessages;
     protected File directory;
     protected Thread checkpointThread;
-    protected boolean syncWrites=true;
-    int checkpointInterval = 5*1000;
-    int cleanupInterval = 30*1000;
+    protected boolean enableJournalDiskSyncs=true;
+    long checkpointInterval = 5*1000;
+    long cleanupInterval = 30*1000;
     
     protected AtomicBoolean started = new AtomicBoolean();
     protected AtomicBoolean opened = new AtomicBoolean();
@@ -1182,9 +1182,7 @@
     // /////////////////////////////////////////////////////////////////
 
     private PageFile createPageFile() {
-        PageFile pf = new PageFile(directory, "db");
-        pf.setEnableAsyncWrites(!isSyncWrites());
-        return pf;
+        return new PageFile(directory, "db");
     }
 
     private Journal createJournal() {
@@ -1211,27 +1209,27 @@
         this.deleteAllMessages = deleteAllMessages;
     }
 
-    public boolean isSyncWrites() {
-        return syncWrites;
+    public boolean isEnableJournalDiskSyncs() {
+        return enableJournalDiskSyncs;
     }
 
-    public void setSyncWrites(boolean syncWrites) {
-        this.syncWrites = syncWrites;
+    public void setEnableJournalDiskSyncs(boolean syncWrites) {
+        this.enableJournalDiskSyncs = syncWrites;
     }
 
-    public int getCheckpointInterval() {
+    public long getCheckpointInterval() {
         return checkpointInterval;
     }
 
-    public void setCheckpointInterval(int checkpointInterval) {
+    public void setCheckpointInterval(long checkpointInterval) {
         this.checkpointInterval = checkpointInterval;
     }
 
-    public int getCleanupInterval() {
+    public long getCleanupInterval() {
         return cleanupInterval;
     }
 
-    public void setCleanupInterval(int cleanupInterval) {
+    public void setCleanupInterval(long cleanupInterval) {
         this.cleanupInterval = cleanupInterval;
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java?rev=741169&r1=741168&r2=741169&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
Thu Feb  5 16:33:23 2009
@@ -65,6 +65,8 @@
 
     private void doTestEnqueue(final boolean transacted) throws Exception {
         final long min = 100;
+        final AtomicLong total = new AtomicLong(0);
+        final AtomicLong slaViolations = new AtomicLong(0);
         final AtomicLong max = new AtomicLong(0);
         long reportTime = 0;
 
@@ -81,16 +83,20 @@
                         long endT = System.currentTimeMillis();
                         long duration = endT - startT;
 
+                        total.incrementAndGet();
+                        
                         if (duration > max.get()) {
                             max.set(duration);
                         }
 
                         if (duration > min) {
-                            System.err.println(Thread.currentThread().getName()
+                        	slaViolations.incrementAndGet();
+                            System.err.println("SLA violation @ "+Thread.currentThread().getName()
                                     + " "
                                     + DateFormat.getTimeInstance().format(
                                             new Date(startT)) + " at message "
-                                    + i + " send time=" + duration);
+                                    + i + " send time=" + duration
+                                    + " - Total SLA violations: "+slaViolations.get()+"/"+total.get()+"
("+String.format("%.6f", 100.0*slaViolations.get()/total.get())+"%)");
                         }
                     }
 
@@ -145,7 +151,13 @@
             KahaDBStore kaha = new KahaDBStore();
             kaha.setDirectory(new File("target/activemq-data/kahadb"));
             kaha.deleteAllMessages();
-            kaha.getPageFile().setWriteBatchSize(10);
+            kaha.setCleanupInterval(1000 * 60 * 60 * 60);
+            // The setEnableJournalDiskSyncs(false) setting is a little dangerous right now,
as I have not verified 
+            // what happens if the index is updated but a journal update is lost.
+            // Index is going to be in consistent, but can it be repaired?
+            kaha.setEnableJournalDiskSyncs(false);
+            kaha.getPageFile().setWriteBatchSize(100);
+            kaha.getPageFile().setEnableWriteThread(true);
             broker.setPersistenceAdapter(kaha);
         }
 

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=741169&r1=741168&r2=741169&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Thu Feb  5 16:33:23
2009
@@ -26,12 +26,9 @@
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.RandomAccessFile;
-import java.nio.channels.FileLock;
-import java.nio.channels.OverlappingFileLockException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -39,7 +36,6 @@
 import java.util.TreeMap;
 import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.Adler32;
@@ -48,7 +44,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.IOExceptionSupport;
 import org.apache.kahadb.util.IOHelper;
 import org.apache.kahadb.util.IntrospectionSupport;
 import org.apache.kahadb.util.LRUCache;
@@ -119,9 +114,9 @@
     // page write failures..
     private boolean enableRecoveryFile=true;
     // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
-    private boolean enableSyncedWrites=true;
+    private boolean enableDiskSyncs=true;
     // Will writes be done in an async thread?
-    private boolean enableAsyncWrites=false;
+    private boolean enabledWriteThread=false;
 
     // These are used if enableAsyncWrites==true 
     private AtomicBoolean stopWriter = new AtomicBoolean();
@@ -427,7 +422,7 @@
      */
     public void flush() throws IOException {
 
-        if( enableAsyncWrites && stopWriter.get() ) {
+        if( enabledWriteThread && stopWriter.get() ) {
             throw new IOException("Page file already stopped: checkpointing is not allowed");
         }
         
@@ -437,7 +432,7 @@
             if( writes.isEmpty()) {                
                 return;
             }
-            if( enableAsyncWrites ) {
+            if( enabledWriteThread ) {
                 if( this.checkpointLatch == null ) {
                     this.checkpointLatch = new CountDownLatch(1);
                 }
@@ -591,17 +586,17 @@
     /**
      * @return Are page writes synced to disk?
      */
-    public boolean isEnableSyncedWrites() {
-        return enableSyncedWrites;
+    public boolean isEnableDiskSyncs() {
+        return enableDiskSyncs;
     }
 
     /**
      * Allows you enable syncing writes to disk.
      * @param syncWrites
      */
-    public void setEnableSyncedWrites(boolean syncWrites) {
+    public void setEnableDiskSyncs(boolean syncWrites) {
         assertNotLoaded();
-        this.enableSyncedWrites = syncWrites;
+        this.enableDiskSyncs = syncWrites;
     }
     
     /**
@@ -662,13 +657,13 @@
         this.pageCacheSize = pageCacheSize;
     }
 
-    public boolean isEnableAsyncWrites() {
-        return enableAsyncWrites;
+    public boolean isEnabledWriteThread() {
+        return enabledWriteThread;
     }
 
-    public void setEnableAsyncWrites(boolean enableAsyncWrites) {
+    public void setEnableWriteThread(boolean enableAsyncWrites) {
         assertNotLoaded();
-        this.enableAsyncWrites = enableAsyncWrites;
+        this.enabledWriteThread = enableAsyncWrites;
     }
 
     public long getDiskSize() throws IOException {
@@ -700,7 +695,16 @@
         this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
     }
 
-    ///////////////////////////////////////////////////////////////////
+	public int getWriteBatchSize() {
+		return writeBatchSize;
+	}
+
+	public void setWriteBatchSize(int writeBatchSize) {
+        assertNotLoaded();
+		this.writeBatchSize = writeBatchSize;
+	}
+
+	///////////////////////////////////////////////////////////////////
     // Package Protected Methods exposed to Transaction
     ///////////////////////////////////////////////////////////////////
 
@@ -817,7 +821,7 @@
             
             // Once we start approaching capacity, notify the writer to start writing
             if( canStartWriteBatch() ) {
-                if( enableAsyncWrites  ) {
+                if( enabledWriteThread  ) {
                     writes.notify();
                 } else {
                     writeBatch();
@@ -828,7 +832,7 @@
     
     private boolean canStartWriteBatch() {
 		int capacityUsed = ((writes.size() * 100)/writeBatchSize);
-        if( enableAsyncWrites ) {
+        if( enabledWriteThread ) {
             // The constant 10 here controls how soon write batches start going to disk..
             // would be nice to figure out how to auto tune that value.  Make to small and
             // we reduce through put because we are locking the write mutex too often doing
writes
@@ -963,7 +967,7 @@
                 recoveryFile.write(w.diskBound, 0, pageSize);
             }
             
-            if (enableSyncedWrites) {
+            if (enableDiskSyncs) {
                 // Sync to make sure recovery buffer writes land on disk..
                 recoveryFile.getFD().sync();
             }
@@ -978,7 +982,7 @@
         }
         
         // Sync again
-        if( enableSyncedWrites ) {
+        if( enableDiskSyncs ) {
             writeFile.getFD().sync();
         }
         
@@ -1077,7 +1081,7 @@
 
     private void startWriter() {
         synchronized( writes ) {
-            if( enableAsyncWrites ) {
+            if( enabledWriteThread ) {
                 stopWriter.set(false);
                 writerThread = new Thread("KahaDB Page Writer") {
                     @Override
@@ -1092,7 +1096,7 @@
     }
  
     private void stopWriter() throws InterruptedException {
-        if( enableAsyncWrites ) {
+        if( enabledWriteThread ) {
             stopWriter.set(true);
             writerThread.join();
         }
@@ -1102,12 +1106,4 @@
 		return getMainPageFile();
 	}
 
-	public int getWriteBatchSize() {
-		return writeBatchSize;
-	}
-
-	public void setWriteBatchSize(int writeBatchSize) {
-		this.writeBatchSize = writeBatchSize;
-	}
-
 }



Mime
View raw message