Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 13F8110549 for ; Wed, 12 Mar 2014 02:21:59 +0000 (UTC) Received: (qmail 64562 invoked by uid 500); 12 Mar 2014 02:21:47 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 64281 invoked by uid 500); 12 Mar 2014 02:21:37 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 63909 invoked by uid 99); 12 Mar 2014 02:21:29 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Mar 2014 02:21:29 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E54F8941111; Wed, 12 Mar 2014 02:21:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hadrian@apache.org To: commits@activemq.apache.org Date: Wed, 12 Mar 2014 02:21:41 -0000 Message-Id: <6557a2c8e0574f9d9b07431e1218e618@git.apache.org> In-Reply-To: <0dfdd7129b6349acad365bd8656f4f07@git.apache.org> References: <0dfdd7129b6349acad365bd8656f4f07@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/20] git commit: https://issues.apache.org/jira/browse/AMQ-3725 - allow kahadb to recover from failed file system https://issues.apache.org/jira/browse/AMQ-3725 - allow kahadb to recover from failed file system Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7ec13f21 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7ec13f21 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7ec13f21 Branch: refs/heads/activemq-5.9 Commit: 7ec13f21104d46f2d4fe16e8ace522e7ac8bfa89 Parents: 50f37be Author: Dejan Bosanac Authored: Thu Oct 31 17:58:40 2013 +0100 Committer: Hadrian Zbarcea Committed: Tue Mar 11 21:19:06 2014 -0400 ---------------------------------------------------------------------- .../CallerBufferingDataFileAppender.java | 3 +- .../store/kahadb/disk/journal/DataFile.java | 7 +- .../kahadb/disk/journal/DataFileAccessor.java | 3 +- .../kahadb/disk/journal/DataFileAppender.java | 4 +- .../store/kahadb/disk/page/PageFile.java | 19 +- .../util/RecoverableRandomAccessFile.java | 407 +++++++++++++++++++ 6 files changed, 425 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/7ec13f21/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java index 92245ab..ff11848 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java @@ -22,6 +22,7 @@ import java.util.zip.Adler32; import java.util.zip.Checksum; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream; +import org.apache.activemq.util.RecoverableRandomAccessFile; /** * An optimized writer to do batch appends to a data file. This object is thread @@ -82,7 +83,7 @@ class CallerBufferingDataFileAppender extends DataFileAppender { @Override protected void processQueue() { DataFile dataFile = null; - RandomAccessFile file = null; + RecoverableRandomAccessFile file = null; WriteBatch wb = null; try { http://git-wip-us.apache.org/repos/asf/activemq/blob/7ec13f21/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java index e014b8e..d5762d2 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java @@ -23,6 +23,7 @@ import java.io.RandomAccessFile; import org.apache.activemq.store.kahadb.disk.util.LinkedNode; import org.apache.activemq.store.kahadb.disk.util.SequenceSet; import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.RecoverableRandomAccessFile; /** * DataFile @@ -67,11 +68,11 @@ public class DataFile extends LinkedNode implements Comparable inflightWrites; - private final RandomAccessFile file; + private final RecoverableRandomAccessFile file; private boolean disposed; /** http://git-wip-us.apache.org/repos/asf/activemq/blob/7ec13f21/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java index f3f7af3..095db52 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java @@ -28,6 +28,7 @@ import java.util.zip.Checksum; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream; import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; +import org.apache.activemq.util.RecoverableRandomAccessFile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -277,7 +278,7 @@ class DataFileAppender implements FileAppender { */ protected void processQueue() { DataFile dataFile = null; - RandomAccessFile file = null; + RecoverableRandomAccessFile file = null; WriteBatch wb = null; try { @@ -373,6 +374,7 @@ class DataFileAppender implements FileAppender { signalDone(wb); } } catch (IOException e) { + logger.info("Journal failed while writing at: " + wb.offset); synchronized (enqueueMutex) { firstAsyncException = e; if (wb != null) { http://git-wip-us.apache.org/repos/asf/activemq/blob/7ec13f21/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java index 12ba275..3f107a6 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java @@ -42,12 +42,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.zip.Adler32; import java.util.zip.Checksum; -import org.apache.activemq.util.DataByteArrayOutputStream; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.IOHelper; -import org.apache.activemq.util.IntrospectionSupport; -import org.apache.activemq.util.LFUCache; -import org.apache.activemq.util.LRUCache; +import org.apache.activemq.util.*; import org.apache.activemq.store.kahadb.disk.util.Sequence; import org.apache.activemq.store.kahadb.disk.util.SequenceSet; import org.slf4j.Logger; @@ -85,11 +80,11 @@ public class PageFile { private final String name; // File handle used for reading pages.. - private RandomAccessFile readFile; + private RecoverableRandomAccessFile readFile; // File handle used for writing pages.. - private RandomAccessFile writeFile; + private RecoverableRandomAccessFile writeFile; // File handle used for writing pages.. - private RandomAccessFile recoveryFile; + private RecoverableRandomAccessFile recoveryFile; // The size of pages private int pageSize = DEFAULT_PAGE_SIZE; @@ -377,8 +372,8 @@ public class PageFile { File file = getMainPageFile(); IOHelper.mkdirs(file.getParentFile()); - writeFile = new RandomAccessFile(file, "rw"); - readFile = new RandomAccessFile(file, "r"); + writeFile = new RecoverableRandomAccessFile(file, "rw"); + readFile = new RecoverableRandomAccessFile(file, "r"); if (readFile.length() > 0) { // Load the page size setting cause that can't change once the file is created. @@ -397,7 +392,7 @@ public class PageFile { } if (enableRecoveryFile) { - recoveryFile = new RandomAccessFile(getRecoveryFile(), "rw"); + recoveryFile = new RecoverableRandomAccessFile(getRecoveryFile(), "rw"); } if (metaData.isCleanShutdown()) { http://git-wip-us.apache.org/repos/asf/activemq/blob/7ec13f21/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java new file mode 100644 index 0000000..fbb3212 --- /dev/null +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java @@ -0,0 +1,407 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import java.io.*; + +public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io.DataInput, java.io.Closeable { + + RandomAccessFile raf; + File file; + String mode; + + public RecoverableRandomAccessFile(File file, String mode) throws FileNotFoundException { + this.file = file; + this.mode = mode; + raf = new RandomAccessFile(file, mode); + } + + public RecoverableRandomAccessFile(String name, String mode) throws FileNotFoundException { + this.file = new File(name); + this.mode = mode; + raf = new RandomAccessFile(file, mode); + } + + protected RandomAccessFile getRaf() throws IOException { + if (raf == null) { + raf = new RandomAccessFile(file, mode); + } + return raf; + } + + protected void handleException() throws IOException { + try { + if (raf != null) { + raf.close(); + } + } catch (Throwable ignore) { + } finally { + raf = null; + } + } + + @Override + public void close() throws IOException { + raf.close(); + } + + @Override + public void readFully(byte[] bytes) throws IOException { + try { + getRaf().readFully(bytes); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void readFully(byte[] bytes, int i, int i2) throws IOException { + try { + getRaf().readFully(bytes, i, i2); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public int skipBytes(int i) throws IOException { + try { + return getRaf().skipBytes(i); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public boolean readBoolean() throws IOException { + try { + return getRaf().readBoolean(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public byte readByte() throws IOException { + try { + return getRaf().readByte(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public int readUnsignedByte() throws IOException { + try { + return getRaf().readUnsignedByte(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public short readShort() throws IOException { + try { + return getRaf().readShort(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public int readUnsignedShort() throws IOException { + try { + return getRaf().readUnsignedShort(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public char readChar() throws IOException { + try { + return getRaf().readChar(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public int readInt() throws IOException { + try { + return getRaf().readInt(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public long readLong() throws IOException { + try { + return getRaf().readLong(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public float readFloat() throws IOException { + try { + return getRaf().readFloat(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public double readDouble() throws IOException { + try { + return getRaf().readDouble(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public String readLine() throws IOException { + try { + return getRaf().readLine(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public String readUTF() throws IOException { + try { + return getRaf().readUTF(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void write(int i) throws IOException { + try { + getRaf().write(i); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void write(byte[] bytes) throws IOException { + try { + getRaf().write(bytes); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void write(byte[] bytes, int i, int i2) throws IOException { + try { + getRaf().write(bytes, i, i2); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeBoolean(boolean b) throws IOException { + try { + getRaf().writeBoolean(b); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeByte(int i) throws IOException { + try { + getRaf().writeByte(i); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeShort(int i) throws IOException { + try { + getRaf().writeShort(i); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeChar(int i) throws IOException { + try { + getRaf().writeChar(i); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeInt(int i) throws IOException { + try { + getRaf().writeInt(i); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeLong(long l) throws IOException { + try { + getRaf().writeLong(l); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeFloat(float v) throws IOException { + try { + getRaf().writeFloat(v); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeDouble(double v) throws IOException { + try { + getRaf().writeDouble(v); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeBytes(String s) throws IOException { + try { + getRaf().writeBytes(s); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeChars(String s) throws IOException { + try { + getRaf().writeChars(s); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeUTF(String s) throws IOException { + try { + getRaf().writeUTF(s); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + + //RAF methods + public long length() throws IOException { + try { + return getRaf().length(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + public void setLength(long length) throws IOException { + try { + getRaf().setLength(length); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + public void seek(long pos) throws IOException { + try { + getRaf().seek(pos); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + public FileDescriptor getFD() throws IOException { + try { + return getRaf().getFD(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + public int read(byte[] b, int off, int len) throws IOException { + try { + return getRaf().read(b, off, len); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + public int read(byte[] b) throws IOException { + try { + return getRaf().read(b); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } +}