activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r741327 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/ activemq-core/src/main/java/org/apache/activemq/store/ activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-core/src/test...
Date Thu, 05 Feb 2009 22:41:01 GMT
Author: chirino
Date: Thu Feb  5 22:41:01 2009
New Revision: 741327

URL: http://svn.apache.org/viewvc?rev=741327&view=rev
Log:
- Implemented the setBatch API call in KahaDBStore.
- fixed bug: When a async thread was used for writing in KahaDBStore you could run in a Memory
leak due to an unbound enqueue buffer type problem.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Thu Feb  5 22:41:01 2009
@@ -166,7 +166,7 @@
         size++;
     }
 
-    protected void setBatch(MessageId messageId) {
+    protected void setBatch(MessageId messageId) throws Exception {
     }
 
     public final synchronized void addMessageFirst(MessageReference node) throws Exception
{

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
Thu Feb  5 22:41:01 2009
@@ -74,7 +74,7 @@
         this.store.resetBatching();
     }
     
-    protected void setBatch(MessageId messageId) {
+    protected void setBatch(MessageId messageId) throws Exception {
         store.setBatch(messageId);
         batchResetNeeded = false;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
Thu Feb  5 22:41:01 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.store;
 
+import java.io.IOException;
+
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.broker.ConnectionContext;
@@ -44,6 +46,6 @@
     public void setMemoryUsage(MemoryUsage memoryUsage) {
     }
     
-    public void setBatch(MessageId messageId) {
+    public void setBatch(MessageId messageId) throws IOException {
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
Thu Feb  5 22:41:01 2009
@@ -115,6 +115,6 @@
      * allow caching cursors to set the current batch offset when cache is exhausted
      * @param messageId
      */
-    void setBatch(MessageId messageId);
+    void setBatch(MessageId messageId) throws Exception;
     
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
Thu Feb  5 22:41:01 2009
@@ -93,7 +93,7 @@
 
     }
 
-    public void setBatch(MessageId messageId) {
+    public void setBatch(MessageId messageId) throws Exception {
         delegate.setBatch(messageId);
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
Thu Feb  5 22:41:01 2009
@@ -135,7 +135,7 @@
 
     }
 
-    public void setBatch(MessageId messageId) {
+    public void setBatch(MessageId messageId) throws Exception {
         delegate.setBatch(messageId);
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Thu Feb  5 22:41:01 2009
@@ -245,7 +245,24 @@
 
         
         @Override
-        public void setBatch(MessageId messageId) {
+        public void setBatch(MessageId identity) throws IOException {
+            final String key = identity.toString();
+            
+            // Hopefully one day the page file supports concurrent read operations... but
for now we must
+            // externally synchronize...
+            Long location;
+            synchronized(indexMutex) {
+                location = pageFile.tx().execute(new Transaction.CallableClosure<Long,
IOException>(){
+                    public Long execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        return sd.messageIdIndex.get(tx, key);
+                    }
+                });
+            }
+            if( location!=null ) {
+                cursorPos=location;
+            }
+            
         }
 
         public void setMemoryUsage(MemoryUsage memoeyUSage) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
Thu Feb  5 22:41:01 2009
@@ -104,7 +104,7 @@
                     // TODO Auto-generated catch block
                     e.printStackTrace();
                 }
-                System.out.println("max = " + max);
+                System.out.println("Max Violation = " + max + " - Total SLA violations: "+slaViolations.get()+"/"+total.get()+"
("+String.format("%.6f", 100.0*slaViolations.get()/total.get())+"%)");
             }
         };
         ExecutorService executor = Executors.newCachedThreadPool();

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Thu Feb  5 22:41:01
2009
@@ -813,7 +813,17 @@
 
     void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException
{
         synchronized( writes ) {
-            
+            if( enabledWriteThread  ) {
+                while( writes.size() >= writeBatchSize && !stopWriter.get() )
{
+                    try {
+                        writes.wait();
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new InterruptedIOException();
+                    }
+                }
+            }
+
             for (Map.Entry<Long, PageWrite> entry : updates) {
                 Long key = entry.getKey();
                 PageWrite value = entry.getValue();
@@ -889,9 +899,11 @@
         try {
             while( !stopWriter.get() ) {
                 // Wait for a notification...
-                synchronized( writes ) {   
+                synchronized( writes ) {  
+                    writes.notifyAll();
+                    
                     // If there is not enough to write, wait for a notification...
-                    while( !canStartWriteBatch() && !stopWriter.get() ) {
+                    while( writes.isEmpty() && checkpointLatch==null && !stopWriter.get()
) {
                         writes.wait(100);
                     }
                     



Mime
View raw message