Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 80272 invoked from network); 5 Feb 2009 22:41:26 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 5 Feb 2009 22:41:26 -0000 Received: (qmail 33717 invoked by uid 500); 5 Feb 2009 22:41:26 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 33661 invoked by uid 500); 5 Feb 2009 22:41:26 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 33652 invoked by uid 99); 5 Feb 2009 22:41:26 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Feb 2009 14:41:26 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Feb 2009 22:41:23 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 63F1B23889CF; Thu, 5 Feb 2009 22:41:02 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r741327 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/ activemq-core/src/main/java/org/apache/activemq/store/ activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-core/src/test... Date: Thu, 05 Feb 2009 22:41:01 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090205224102.63F1B23889CF@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Thu Feb 5 22:41:01 2009 New Revision: 741327 URL: http://svn.apache.org/viewvc?rev=741327&view=rev Log: - Implemented the setBatch API call in KahaDBStore. - fixed bug: When a async thread was used for writing in KahaDBStore you could run in a Memory leak due to an unbound enqueue buffer type problem. Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=741327&r1=741326&r2=741327&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Thu Feb 5 22:41:01 2009 @@ -166,7 +166,7 @@ size++; } - protected void setBatch(MessageId messageId) { + protected void setBatch(MessageId messageId) throws Exception { } public final synchronized void addMessageFirst(MessageReference node) throws Exception { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=741327&r1=741326&r2=741327&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Thu Feb 5 22:41:01 2009 @@ -74,7 +74,7 @@ this.store.resetBatching(); } - protected void setBatch(MessageId messageId) { + protected void setBatch(MessageId messageId) throws Exception { store.setBatch(messageId); batchResetNeeded = false; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=741327&r1=741326&r2=741327&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java Thu Feb 5 22:41:01 2009 @@ -16,6 +16,8 @@ */ package org.apache.activemq.store; +import java.io.IOException; + import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.MessageId; import org.apache.activemq.broker.ConnectionContext; @@ -44,6 +46,6 @@ public void setMemoryUsage(MemoryUsage memoryUsage) { } - public void setBatch(MessageId messageId) { + public void setBatch(MessageId messageId) throws IOException { } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=741327&r1=741326&r2=741327&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Thu Feb 5 22:41:01 2009 @@ -115,6 +115,6 @@ * allow caching cursors to set the current batch offset when cache is exhausted * @param messageId */ - void setBatch(MessageId messageId); + void setBatch(MessageId messageId) throws Exception; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=741327&r1=741326&r2=741327&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java Thu Feb 5 22:41:01 2009 @@ -93,7 +93,7 @@ } - public void setBatch(MessageId messageId) { + public void setBatch(MessageId messageId) throws Exception { delegate.setBatch(messageId); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=741327&r1=741326&r2=741327&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Thu Feb 5 22:41:01 2009 @@ -135,7 +135,7 @@ } - public void setBatch(MessageId messageId) { + public void setBatch(MessageId messageId) throws Exception { delegate.setBatch(messageId); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=741327&r1=741326&r2=741327&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Thu Feb 5 22:41:01 2009 @@ -245,7 +245,24 @@ @Override - public void setBatch(MessageId messageId) { + public void setBatch(MessageId identity) throws IOException { + final String key = identity.toString(); + + // Hopefully one day the page file supports concurrent read operations... but for now we must + // externally synchronize... + Long location; + synchronized(indexMutex) { + location = pageFile.tx().execute(new Transaction.CallableClosure(){ + public Long execute(Transaction tx) throws IOException { + StoredDestination sd = getStoredDestination(dest, tx); + return sd.messageIdIndex.get(tx, key); + } + }); + } + if( location!=null ) { + cursorPos=location; + } + } public void setMemoryUsage(MemoryUsage memoeyUSage) { Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java?rev=741327&r1=741326&r2=741327&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java Thu Feb 5 22:41:01 2009 @@ -104,7 +104,7 @@ // TODO Auto-generated catch block e.printStackTrace(); } - System.out.println("max = " + max); + System.out.println("Max Violation = " + max + " - Total SLA violations: "+slaViolations.get()+"/"+total.get()+" ("+String.format("%.6f", 100.0*slaViolations.get()/total.get())+"%)"); } }; ExecutorService executor = Executors.newCachedThreadPool(); Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=741327&r1=741326&r2=741327&view=diff ============================================================================== --- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original) +++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Thu Feb 5 22:41:01 2009 @@ -813,7 +813,17 @@ void write(Collection> updates) throws IOException { synchronized( writes ) { - + if( enabledWriteThread ) { + while( writes.size() >= writeBatchSize && !stopWriter.get() ) { + try { + writes.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException(); + } + } + } + for (Map.Entry entry : updates) { Long key = entry.getKey(); PageWrite value = entry.getValue(); @@ -889,9 +899,11 @@ try { while( !stopWriter.get() ) { // Wait for a notification... - synchronized( writes ) { + synchronized( writes ) { + writes.notifyAll(); + // If there is not enough to write, wait for a notification... - while( !canStartWriteBatch() && !stopWriter.get() ) { + while( writes.isEmpty() && checkpointLatch==null && !stopWriter.get() ) { writes.wait(100); }