activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject [2/2] activemq git commit: [AMQ-6606] avoid partial writes to the end of the journal - revert offset increment on ioexception, fix and test
Date Fri, 24 Feb 2017 19:24:28 GMT
[AMQ-6606] avoid partial writes to the end of the journal - revert offset increment on ioexception,
fix and test

(cherry picked from commit d53b8f8d424e3cf51646b215007fc017717edf44)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/22d5b51a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/22d5b51a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/22d5b51a

Branch: refs/heads/activemq-5.14.x
Commit: 22d5b51a0c69b48665f32ea19bad5046d7237426
Parents: 1a67318
Author: gtully <gary.tully@gmail.com>
Authored: Tue Feb 21 17:03:46 2017 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Fri Feb 24 14:20:54 2017 -0500

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  2 +-
 .../store/kahadb/disk/journal/DataFile.java     |  4 +
 .../kahadb/disk/journal/DataFileAppender.java   | 41 +++++-----
 .../DataFileAppenderNoSpaceNoBatchTest.java     | 80 ++++++++++++++++++++
 4 files changed, 106 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/22d5b51a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 2db07f1..5ee6c4c 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1132,7 +1132,7 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
             }
             return location;
         } catch (IOException ioe) {
-            LOG.error("KahaDB failed to store to Journal", ioe);
+            LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(),
ioe);
             brokerService.handleIOException(ioe);
             throw ioe;
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/22d5b51a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
index 5b96adf..1532f08 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
@@ -72,6 +72,10 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
         length += size;
     }
 
+    public synchronized void decrementLength(int size) {
+        length -= size;
+    }
+
     @Override
     public synchronized String toString() {
         return file.getName() + " number = " + dataFileId + " , length = " + length;

http://git-wip-us.apache.org/repos/asf/activemq/blob/22d5b51a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index 1e87331..25c4e28 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -28,6 +28,7 @@ import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStra
 import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
 import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
 import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.RecoverableRandomAccessFile;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -177,11 +178,6 @@ class DataFileAppender implements FileAppender {
                 thread.setDaemon(true);
                 thread.setName("ActiveMQ Data File Writer");
                 thread.start();
-                firstAsyncException = null;
-            }
-
-            if (firstAsyncException != null) {
-                throw firstAsyncException;
             }
 
             while ( true ) {
@@ -249,7 +245,6 @@ class DataFileAppender implements FileAppender {
 
     int statIdx = 0;
     int[] stats = new int[maxStat];
-    final byte[] end = new byte[]{0};
     /**
      * The async processing loop that writes to the data files and does the
      * force calls. Since the file sync() call is the slowest of all the
@@ -286,7 +281,7 @@ class DataFileAppender implements FileAppender {
                     if (file != null) {
                         if (periodicSync) {
                             if (logger.isTraceEnabled()) {
-                                logger.trace("Syning file {} on rotate", dataFile.getFile().getName());
+                                logger.trace("Syncing file {} on rotate", dataFile.getFile().getName());
                             }
                             file.sync();
                         }
@@ -355,20 +350,13 @@ class DataFileAppender implements FileAppender {
 
                 signalDone(wb);
             }
-        } catch (IOException e) {
-            logger.info("Journal failed while writing at: " + wb.offset);
+        } catch (Throwable error) {
+            logger.warn("Journal failed while writing at: " + wb.dataFile.getDataFileId()
+ ":" + wb.offset, error);
             synchronized (enqueueMutex) {
-                firstAsyncException = e;
-                if (wb != null) {
-                    wb.exception.set(e);
-                    wb.latch.countDown();
-                }
-                if (nextWriteBatch != null) {
-                    nextWriteBatch.exception.set(e);
-                    nextWriteBatch.latch.countDown();
-                }
+                running = false;
+                signalError(wb, error);
+                signalError(nextWriteBatch, error);
             }
-        } catch (InterruptedException e) {
         } finally {
             try {
                 if (file != null) {
@@ -396,7 +384,7 @@ class DataFileAppender implements FileAppender {
             if (!write.sync) {
                 inflightWrites.remove(new Journal.WriteKey(write.location));
             }
-            if (write.onComplete != null) {
+            if (write.onComplete != null && wb.exception.get() == null) {
                 try {
                     write.onComplete.run();
                 } catch (Throwable e) {
@@ -409,4 +397,17 @@ class DataFileAppender implements FileAppender {
         // Signal any waiting threads that the write is on disk.
         wb.latch.countDown();
     }
+
+    protected void signalError(WriteBatch wb, Throwable t) {
+        if (wb != null) {
+            if (t instanceof IOException) {
+                wb.exception.set((IOException) t);
+                // revert batch increment such that next write is contiguous
+                wb.dataFile.decrementLength(wb.size);
+            } else {
+                wb.exception.set(IOExceptionSupport.create(t));
+            }
+            signalDone(wb);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/22d5b51a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
new file mode 100644
index 0000000..aa6df3f
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.journal;
+
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.RecoverableRandomAccessFile;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class DataFileAppenderNoSpaceNoBatchTest {
+    @Rule
+    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
+
+    private DataFileAppender underTest;
+
+    @Test
+    public void testNoSpaceNextWriteSameBatch() throws Exception {
+        final List<Long> seekPositions = Collections.synchronizedList(new ArrayList<Long>());
+
+        final DataFile currentDataFile = new DataFile(dataFileDir.newFile(), 0) {
+            public RecoverableRandomAccessFile appendRandomAccessFile() throws IOException
{
+
+                return new RecoverableRandomAccessFile(dataFileDir.newFile(), "rw") {
+
+                    public void seek(long pos) throws IOException {
+                        seekPositions.add(pos);
+                    }
+
+                    public void write(byte[] bytes, int offset, int len) throws IOException
{
+                        throw new IOException("No space on device");
+                    }
+                };
+            };
+        };
+
+        underTest = new DataFileAppender(new Journal() {
+            @Override
+            public DataFile getCurrentDataFile(int capacity) throws IOException {
+                return currentDataFile;
+            };
+        });
+
+        final ByteSequence byteSequence = new ByteSequence(new byte[4*1024]);
+        for (int i=0; i<2; i++) {
+            try {
+                underTest.storeItem(byteSequence, (byte) 1, true);
+                fail("expect no space");
+            } catch (IOException expected) {
+            }
+        }
+
+        assertEquals("got 2 seeks: " + seekPositions, 2, seekPositions.size());
+        assertEquals("offset is reused", seekPositions.get(0), seekPositions.get(1));
+
+    }
+}


Mime
View raw message