From commits-return-12344-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Fri Nov 20 14:40:38 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 48664 invoked from network); 20 Nov 2009 14:40:38 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 20 Nov 2009 14:40:38 -0000 Received: (qmail 58752 invoked by uid 500); 20 Nov 2009 14:40:38 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 58698 invoked by uid 500); 20 Nov 2009 14:40:38 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 58689 invoked by uid 99); 20 Nov 2009 14:40:38 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Nov 2009 14:40:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Nov 2009 14:40:35 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 4D2BB23888D8; Fri, 20 Nov 2009 14:40:14 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r882579 - in /activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async: DataFileAppender.java NIODataFileAppender.java Date: Fri, 20 Nov 2009 14:40:14 -0000 To: commits@activemq.apache.org From: dejanb@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091120144014.4D2BB23888D8@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dejanb Date: Fri Nov 20 14:40:13 2009 New Revision: 882579 URL: http://svn.apache.org/viewvc?rev=882579&view=rev Log: merging 882126 - https://issues.apache.org/activemq/browse/AMQ-2042 Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java?rev=882579&r1=882578&r2=882579&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java Fri Nov 20 14:40:13 2009 @@ -21,6 +21,7 @@ import java.io.RandomAccessFile; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.DataByteArrayOutputStream; @@ -48,7 +49,7 @@ protected final CountDownLatch shutdownDone = new CountDownLatch(1); protected int maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE; - private boolean running; + protected boolean running; private Thread thread; public static class WriteKey { @@ -82,6 +83,7 @@ public final WriteCommand first; public final CountDownLatch latch = new CountDownLatch(1); public int size; + public AtomicReference exception = new AtomicReference(); public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException { this.dataFile = dataFile; @@ -179,6 +181,10 @@ } catch (InterruptedException e) { throw new InterruptedIOException(); } + IOException exception = batch.exception.get(); + if (exception != null) { + throw exception; + } } return location; @@ -216,10 +222,7 @@ if (shutdown) { throw new IOException("Async Writter Thread Shutdown"); } - if (firstAsyncException != null) { - throw firstAsyncException; - } - + if (!running) { running = true; thread = new Thread() { @@ -231,6 +234,11 @@ thread.setDaemon(true); thread.setName("ActiveMQ Data File Writer"); thread.start(); + firstAsyncException = null; + } + + if (firstAsyncException != null) { + throw firstAsyncException; } if (nextWriteBatch == null) { @@ -298,6 +306,7 @@ protected void processQueue() { DataFile dataFile = null; RandomAccessFile file = null; + WriteBatch wb = null; try { DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize); @@ -321,7 +330,7 @@ enqueueMutex.notify(); } - WriteBatch wb = (WriteBatch)o; + wb = (WriteBatch)o; if (dataFile != wb.dataFile) { if (file != null) { dataFile.closeRandomAccessFile(file); @@ -406,6 +415,14 @@ } catch (IOException e) { synchronized (enqueueMutex) { firstAsyncException = e; + if (wb != null) { + wb.latch.countDown(); + wb.exception.set(e); + } + if (nextWriteBatch != null) { + nextWriteBatch.latch.countDown(); + nextWriteBatch.exception.set(e); + } } } catch (InterruptedException e) { } finally { Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java?rev=882579&r1=882578&r2=882579&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java Fri Nov 20 14:40:13 2009 @@ -47,6 +47,7 @@ DataFile dataFile = null; RandomAccessFile file = null; FileChannel channel = null; + WriteBatch wb = null; try { @@ -81,7 +82,7 @@ enqueueMutex.notify(); } - WriteBatch wb = (WriteBatch)o; + wb = (WriteBatch)o; if (dataFile != wb.dataFile) { if (file != null) { dataFile.closeRandomAccessFile(file); @@ -180,16 +181,32 @@ } catch (IOException e) { synchronized (enqueueMutex) { firstAsyncException = e; + if (wb != null) { + wb.latch.countDown(); + wb.exception.set(e); + } + if (nextWriteBatch != null) { + nextWriteBatch.latch.countDown(); + nextWriteBatch.exception.set(e); + } } } catch (InterruptedException e) { } finally { try { if (file != null) { dataFile.closeRandomAccessFile(file); + dataFile = null; + file.close(); + file = null; + } + if (channel != null) { + channel.close(); + channel = null; } } catch (IOException e) { } shutdownDone.countDown(); + running = false; } }