activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject [15/20] git commit: https://issues.apache.org/jira/browse/AMQ-3725 - allow kahadb to recover from failed file system
Date Wed, 12 Mar 2014 02:21:41 GMT
https://issues.apache.org/jira/browse/AMQ-3725 - allow kahadb to recover from failed file system


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7ec13f21
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7ec13f21
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7ec13f21

Branch: refs/heads/activemq-5.9
Commit: 7ec13f21104d46f2d4fe16e8ace522e7ac8bfa89
Parents: 50f37be
Author: Dejan Bosanac <dejan@nighttale.net>
Authored: Thu Oct 31 17:58:40 2013 +0100
Committer: Hadrian Zbarcea <hadrian@apache.org>
Committed: Tue Mar 11 21:19:06 2014 -0400

----------------------------------------------------------------------
 .../CallerBufferingDataFileAppender.java        |   3 +-
 .../store/kahadb/disk/journal/DataFile.java     |   7 +-
 .../kahadb/disk/journal/DataFileAccessor.java   |   3 +-
 .../kahadb/disk/journal/DataFileAppender.java   |   4 +-
 .../store/kahadb/disk/page/PageFile.java        |  19 +-
 .../util/RecoverableRandomAccessFile.java       | 407 +++++++++++++++++++
 6 files changed, 425 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7ec13f21/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
index 92245ab..ff11848 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
@@ -22,6 +22,7 @@ import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.RecoverableRandomAccessFile;
 
 /**
  * An optimized writer to do batch appends to a data file. This object is thread
@@ -82,7 +83,7 @@ class CallerBufferingDataFileAppender extends DataFileAppender {
     @Override
     protected void processQueue() {
         DataFile dataFile = null;
-        RandomAccessFile file = null;
+        RecoverableRandomAccessFile file = null;
         WriteBatch wb = null;
         try {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/7ec13f21/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
index e014b8e..d5762d2 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
@@ -23,6 +23,7 @@ import java.io.RandomAccessFile;
 import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
 import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
 import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.RecoverableRandomAccessFile;
 
 /**
  * DataFile
@@ -67,11 +68,11 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
         return file.getName() + " number = " + dataFileId + " , length = " + length;
     }
 
-    public synchronized RandomAccessFile openRandomAccessFile() throws IOException {
-        return new RandomAccessFile(file.getCanonicalPath(), "rw");
+    public synchronized RecoverableRandomAccessFile openRandomAccessFile() throws IOException
{
+        return new RecoverableRandomAccessFile(file.getCanonicalPath(), "rw");
     }
 
-    public synchronized void closeRandomAccessFile(RandomAccessFile file) throws IOException
{
+    public synchronized void closeRandomAccessFile(RecoverableRandomAccessFile file) throws
IOException {
         file.close();
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/7ec13f21/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
index 983d7de..7781b7e 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
@@ -21,6 +21,7 @@ import java.io.RandomAccessFile;
 import java.util.Map;
 
 import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.RecoverableRandomAccessFile;
 
 /**
  * Optimized Store reader and updater. Single threaded and synchronous. Use in
@@ -32,7 +33,7 @@ final class DataFileAccessor {
 
     private final DataFile dataFile;
     private final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
-    private final RandomAccessFile file;
+    private final RecoverableRandomAccessFile file;
     private boolean disposed;
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/7ec13f21/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index f3f7af3..095db52 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -28,6 +28,7 @@ import java.util.zip.Checksum;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
 import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
+import org.apache.activemq.util.RecoverableRandomAccessFile;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -277,7 +278,7 @@ class DataFileAppender implements FileAppender {
      */
     protected void processQueue() {
         DataFile dataFile = null;
-        RandomAccessFile file = null;
+        RecoverableRandomAccessFile file = null;
         WriteBatch wb = null;
         try {
 
@@ -373,6 +374,7 @@ class DataFileAppender implements FileAppender {
                 signalDone(wb);
             }
         } catch (IOException e) {
+            logger.info("Journal failed while writing at: " + wb.offset);
             synchronized (enqueueMutex) {
                 firstAsyncException = e;
                 if (wb != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/7ec13f21/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
index 12ba275..3f107a6 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
@@ -42,12 +42,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 
-import org.apache.activemq.util.DataByteArrayOutputStream;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.LFUCache;
-import org.apache.activemq.util.LRUCache;
+import org.apache.activemq.util.*;
 import org.apache.activemq.store.kahadb.disk.util.Sequence;
 import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
 import org.slf4j.Logger;
@@ -85,11 +80,11 @@ public class PageFile {
     private final String name;
 
     // File handle used for reading pages..
-    private RandomAccessFile readFile;
+    private RecoverableRandomAccessFile readFile;
     // File handle used for writing pages..
-    private RandomAccessFile writeFile;
+    private RecoverableRandomAccessFile writeFile;
     // File handle used for writing pages..
-    private RandomAccessFile recoveryFile;
+    private RecoverableRandomAccessFile recoveryFile;
 
     // The size of pages
     private int pageSize = DEFAULT_PAGE_SIZE;
@@ -377,8 +372,8 @@ public class PageFile {
 
             File file = getMainPageFile();
             IOHelper.mkdirs(file.getParentFile());
-            writeFile = new RandomAccessFile(file, "rw");
-            readFile = new RandomAccessFile(file, "r");
+            writeFile = new RecoverableRandomAccessFile(file, "rw");
+            readFile = new RecoverableRandomAccessFile(file, "r");
 
             if (readFile.length() > 0) {
                 // Load the page size setting cause that can't change once the file is created.
@@ -397,7 +392,7 @@ public class PageFile {
             }
 
             if (enableRecoveryFile) {
-                recoveryFile = new RandomAccessFile(getRecoveryFile(), "rw");
+                recoveryFile = new RecoverableRandomAccessFile(getRecoveryFile(), "rw");
             }
 
             if (metaData.isCleanShutdown()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/7ec13f21/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
new file mode 100644
index 0000000..fbb3212
--- /dev/null
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.*;
+
+public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io.DataInput,
java.io.Closeable {
+
+    RandomAccessFile raf;
+    File file;
+    String mode;
+
+    public RecoverableRandomAccessFile(File file, String mode) throws FileNotFoundException
{
+        this.file = file;
+        this.mode = mode;
+        raf = new RandomAccessFile(file, mode);
+    }
+
+    public RecoverableRandomAccessFile(String name, String mode) throws FileNotFoundException
{
+        this.file = new File(name);
+        this.mode = mode;
+        raf = new RandomAccessFile(file, mode);
+    }
+
+    protected RandomAccessFile getRaf() throws IOException {
+        if (raf == null) {
+            raf = new RandomAccessFile(file, mode);
+        }
+        return raf;
+    }
+
+    protected void handleException() throws IOException {
+        try {
+            if (raf != null) {
+                raf.close();
+            }
+        } catch (Throwable ignore) {
+        } finally {
+            raf = null;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        raf.close();
+    }
+
+    @Override
+    public void readFully(byte[] bytes) throws IOException {
+        try {
+            getRaf().readFully(bytes);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public void readFully(byte[] bytes, int i, int i2) throws IOException {
+        try {
+            getRaf().readFully(bytes, i, i2);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public int skipBytes(int i) throws IOException {
+        try {
+            return getRaf().skipBytes(i);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public boolean readBoolean() throws IOException {
+        try {
+            return getRaf().readBoolean();
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public byte readByte() throws IOException {
+        try {
+            return getRaf().readByte();
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public int readUnsignedByte() throws IOException {
+        try {
+            return getRaf().readUnsignedByte();
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public short readShort() throws IOException {
+        try {
+            return getRaf().readShort();
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public int readUnsignedShort() throws IOException {
+        try {
+            return getRaf().readUnsignedShort();
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public char readChar() throws IOException {
+        try {
+            return getRaf().readChar();
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public int readInt() throws IOException {
+        try {
+            return getRaf().readInt();
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public long readLong() throws IOException {
+        try {
+            return getRaf().readLong();
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public float readFloat() throws IOException {
+        try {
+            return getRaf().readFloat();
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public double readDouble() throws IOException {
+        try {
+            return getRaf().readDouble();
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public String readLine() throws IOException {
+        try {
+            return getRaf().readLine();
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public String readUTF() throws IOException {
+        try {
+            return getRaf().readUTF();
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public void write(int i) throws IOException {
+        try {
+            getRaf().write(i);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public void write(byte[] bytes) throws IOException {
+        try {
+            getRaf().write(bytes);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public void write(byte[] bytes, int i, int i2) throws IOException {
+        try {
+            getRaf().write(bytes, i, i2);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public void writeBoolean(boolean b) throws IOException {
+        try {
+            getRaf().writeBoolean(b);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public void writeByte(int i) throws IOException {
+        try {
+            getRaf().writeByte(i);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public void writeShort(int i) throws IOException {
+        try {
+            getRaf().writeShort(i);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public void writeChar(int i) throws IOException {
+        try {
+            getRaf().writeChar(i);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public void writeInt(int i) throws IOException {
+        try {
+            getRaf().writeInt(i);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public void writeLong(long l) throws IOException {
+        try {
+            getRaf().writeLong(l);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public void writeFloat(float v) throws IOException {
+        try {
+            getRaf().writeFloat(v);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public void writeDouble(double v) throws IOException {
+        try {
+            getRaf().writeDouble(v);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public void writeBytes(String s) throws IOException {
+        try {
+            getRaf().writeBytes(s);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public void writeChars(String s) throws IOException {
+        try {
+            getRaf().writeChars(s);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public void writeUTF(String s) throws IOException {
+        try {
+            getRaf().writeUTF(s);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+
+    //RAF methods
+    public long length() throws IOException {
+        try {
+            return getRaf().length();
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    public void setLength(long length) throws IOException {
+        try {
+            getRaf().setLength(length);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    public void seek(long pos) throws IOException {
+        try {
+            getRaf().seek(pos);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    public FileDescriptor getFD() throws IOException {
+        try {
+            return getRaf().getFD();
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    public int read(byte[] b, int off, int len) throws IOException {
+        try {
+            return getRaf().read(b, off, len);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    public int read(byte[] b) throws IOException {
+        try {
+            return getRaf().read(b);
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+}


Mime
View raw message