Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8AB11DA04 for ; Tue, 17 Jul 2012 21:21:30 +0000 (UTC) Received: (qmail 36783 invoked by uid 500); 17 Jul 2012 21:21:30 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 36753 invoked by uid 500); 17 Jul 2012 21:21:30 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 36744 invoked by uid 99); 17 Jul 2012 21:21:30 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jul 2012 21:21:30 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jul 2012 21:21:28 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 71B902388962 for ; Tue, 17 Jul 2012 21:21:09 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1362657 - in /zookeeper/branches/branch-3.4: ./ src/java/main/org/apache/zookeeper/common/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/config/ src/java/test/org/apache/zookeeper/test/ Date: Tue, 17 Jul 2012 21:21:09 -0000 To: commits@zookeeper.apache.org From: phunt@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120717212109.71B902388962@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: phunt Date: Tue Jul 17 21:21:08 2012 New Revision: 1362657 URL: http://svn.apache.org/viewvc?rev=1362657&view=rev Log: ZOOKEEPER-1427. Writing to local files is done non-atomically (phunt) Added: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/common/AtomicFileOutputStream.java zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/common/IOUtils.java zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/AtomicFileOutputStreamTest.java Modified: zookeeper/branches/branch-3.4/CHANGES.txt zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java zookeeper/branches/branch-3.4/src/java/test/config/findbugsExcludeFile.xml zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/ClientBase.java Modified: zookeeper/branches/branch-3.4/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1362657&r1=1362656&r2=1362657&view=diff ============================================================================== --- zookeeper/branches/branch-3.4/CHANGES.txt (original) +++ zookeeper/branches/branch-3.4/CHANGES.txt Tue Jul 17 21:21:08 2012 @@ -78,6 +78,8 @@ BUGFIXES: takes a long time with large datasets - is correlated to dataset size (fpj and Thawan Kooburat via camille) + ZOOKEEPER-1427. Writing to local files is done non-atomically (phunt) + IMPROVEMENTS: ZOOKEEPER-1389. it would be nice if start-foreground used exec $JAVA Added: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/common/AtomicFileOutputStream.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/common/AtomicFileOutputStream.java?rev=1362657&view=auto ============================================================================== --- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/common/AtomicFileOutputStream.java (added) +++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/common/AtomicFileOutputStream.java Tue Jul 17 21:21:08 2012 @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.common; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FilterOutputStream; +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * This code is originally from HDFS, see the similarly named files there + * in case of bug fixing, history, etc... + */ + +/** + * A FileOutputStream that has the property that it will only show up at its + * destination once it has been entirely written and flushed to disk. While + * being written, it will use a .tmp suffix. + * + * When the output stream is closed, it is flushed, fsynced, and will be moved + * into place, overwriting any file that already exists at that location. + * + * NOTE: on Windows platforms, it will not atomically replace the target + * file - instead the target file is deleted before this one is moved into + * place. + */ +public class AtomicFileOutputStream extends FilterOutputStream { + private static final String TMP_EXTENSION = ".tmp"; + + private final static Logger LOG = LoggerFactory + .getLogger(AtomicFileOutputStream.class); + + private final File origFile; + private final File tmpFile; + + public AtomicFileOutputStream(File f) throws FileNotFoundException { + // Code unfortunately must be duplicated below since we can't assign + // anything + // before calling super + super(new FileOutputStream(new File(f.getParentFile(), f.getName() + + TMP_EXTENSION))); + origFile = f.getAbsoluteFile(); + tmpFile = new File(f.getParentFile(), f.getName() + TMP_EXTENSION) + .getAbsoluteFile(); + } + + @Override + public void close() throws IOException { + boolean triedToClose = false, success = false; + try { + flush(); + ((FileOutputStream) out).getChannel().force(true); + + triedToClose = true; + super.close(); + success = true; + } finally { + if (success) { + boolean renamed = tmpFile.renameTo(origFile); + if (!renamed) { + // On windows, renameTo does not replace. + if (!origFile.delete() || !tmpFile.renameTo(origFile)) { + throw new IOException( + "Could not rename temporary file " + tmpFile + + " to " + origFile); + } + } + } else { + if (!triedToClose) { + // If we failed when flushing, try to close it to not leak + // an FD + IOUtils.closeStream(out); + } + // close wasn't successful, try to delete the tmp file + if (!tmpFile.delete()) { + LOG.warn("Unable to delete tmp file " + tmpFile); + } + } + } + } + + /** + * Close the atomic file, but do not "commit" the temporary file on top of + * the destination. This should be used if there is a failure in writing. + */ + public void abort() { + try { + super.close(); + } catch (IOException ioe) { + LOG.warn("Unable to abort file " + tmpFile, ioe); + } + if (!tmpFile.delete()) { + LOG.warn("Unable to delete tmp file during abort " + tmpFile); + } + } +} Added: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/common/IOUtils.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/common/IOUtils.java?rev=1362657&view=auto ============================================================================== --- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/common/IOUtils.java (added) +++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/common/IOUtils.java Tue Jul 17 21:21:08 2012 @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.common; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; + +import org.slf4j.Logger; + +/* + * This code is originally from HDFS, see the similarly named files there + * in case of bug fixing, history, etc... + */ + +public class IOUtils { + /** + * Closes the stream ignoring {@link IOException}. Must only be called in + * cleaning up from exception handlers. + * + * @param stream + * the Stream to close + */ + public static void closeStream(Closeable stream) { + cleanup(null, stream); + } + + /** + * Close the Closeable objects and ignore any {@link IOException} or + * null pointers. Must only be used for cleanup in exception handlers. + * + * @param log + * the log to record problems to at debug level. Can be null. + * @param closeables + * the objects to close + */ + public static void cleanup(Logger log, Closeable... closeables) { + for (Closeable c : closeables) { + if (c != null) { + try { + c.close(); + } catch (IOException e) { + if (log != null) { + log.warn("Exception in closing " + c, e); + } + } + } + } + } + + /** + * Copies from one stream to another. + * + * @param in + * InputStrem to read from + * @param out + * OutputStream to write to + * @param buffSize + * the size of the buffer + * @param close + * whether or not close the InputStream and OutputStream at the + * end. The streams are closed in the finally clause. + */ + public static void copyBytes(InputStream in, OutputStream out, + int buffSize, boolean close) throws IOException { + try { + copyBytes(in, out, buffSize); + if (close) { + out.close(); + out = null; + in.close(); + in = null; + } + } finally { + if (close) { + closeStream(out); + closeStream(in); + } + } + } + + /** + * Copies from one stream to another. + * + * @param in + * InputStrem to read from + * @param out + * OutputStream to write to + * @param buffSize + * the size of the buffer + */ + public static void copyBytes(InputStream in, OutputStream out, int buffSize) + throws IOException { + PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null; + byte buf[] = new byte[buffSize]; + int bytesRead = in.read(buf); + while (bytesRead >= 0) { + out.write(buf, 0, bytesRead); + if ((ps != null) && ps.checkError()) { + throw new IOException("Unable to write to output stream."); + } + bytesRead = in.read(buf); + } + } + +} Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1362657&r1=1362656&r2=1362657&view=diff ============================================================================== --- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original) +++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Tue Jul 17 21:21:08 2012 @@ -21,7 +21,6 @@ import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; import java.io.OutputStreamWriter; @@ -36,6 +35,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.zookeeper.common.AtomicFileOutputStream; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.jmx.ZKMBeanInfo; import org.apache.zookeeper.server.ServerCnxnFactory; @@ -1090,17 +1090,37 @@ public class QuorumPeer extends Thread i public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch"; + /** + * Write a long value to disk atomically. Either succeeds or an exception + * is thrown. + * @param name file name to write the long to + * @param value the long value to write to the named file + * @throws IOException if the file cannot be written atomically + */ private void writeLongToFile(String name, long value) throws IOException { - File file = new File(logFactory.getSnapDir(), name); - FileOutputStream out = new FileOutputStream(file); - BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out)); - try { - bw.write(Long.toString(value)); - bw.flush(); - out.getFD().sync(); - } finally { - bw.close(); - } + File file = new File(logFactory.getSnapDir(), name); + AtomicFileOutputStream out = new AtomicFileOutputStream(file); + BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out)); + boolean aborted = false; + try { + bw.write(Long.toString(value)); + bw.flush(); + + out.flush(); + } catch (IOException e) { + LOG.error("Failed to write new file " + file, e); + // worst case here the tmp file/resources(fd) are not cleaned up + // and the caller will be notified (IOException) + aborted = true; + out.abort(); + throw e; + } finally { + if (!aborted) { + // if the close operation (rename) fails we'll get notified. + // worst case the tmp file may still exist + out.close(); + } + } } public long getCurrentEpoch() throws IOException { Modified: zookeeper/branches/branch-3.4/src/java/test/config/findbugsExcludeFile.xml URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/config/findbugsExcludeFile.xml?rev=1362657&r1=1362656&r2=1362657&view=diff ============================================================================== --- zookeeper/branches/branch-3.4/src/java/test/config/findbugsExcludeFile.xml (original) +++ zookeeper/branches/branch-3.4/src/java/test/config/findbugsExcludeFile.xml Tue Jul 17 21:21:08 2012 @@ -125,4 +125,10 @@ + + + + + + Added: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/AtomicFileOutputStreamTest.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/AtomicFileOutputStreamTest.java?rev=1362657&view=auto ============================================================================== --- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/AtomicFileOutputStreamTest.java (added) +++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/AtomicFileOutputStreamTest.java Tue Jul 17 21:21:08 2012 @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.common.AtomicFileOutputStream; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AtomicFileOutputStreamTest extends ZKTestCase { + private static final String TEST_STRING = "hello world"; + private static final String TEST_STRING_2 = "goodbye world"; + + private File testDir; + private File dstFile; + + @Before + public void setupTestDir() throws IOException { + testDir = ClientBase.createTmpDir(); + dstFile = new File(testDir, "test.txt"); + } + @After + public void cleanupTestDir() throws IOException { + ClientBase.recursiveDelete(testDir); + } + + /** + * Test case where there is no existing file + */ + @Test + public void testWriteNewFile() throws IOException { + OutputStream fos = new AtomicFileOutputStream(dstFile); + assertFalse(dstFile.exists()); + fos.write(TEST_STRING.getBytes()); + fos.flush(); + assertFalse(dstFile.exists()); + fos.close(); + assertTrue(dstFile.exists()); + + String readBackData = ClientBase.readFile(dstFile); + assertEquals(TEST_STRING, readBackData); + } + + /** + * Test case where there is no existing file + */ + @Test + public void testOverwriteFile() throws IOException { + assertTrue("Creating empty dst file", dstFile.createNewFile()); + + OutputStream fos = new AtomicFileOutputStream(dstFile); + + assertTrue("Empty file still exists", dstFile.exists()); + fos.write(TEST_STRING.getBytes()); + fos.flush(); + + // Original contents still in place + assertEquals("", ClientBase.readFile(dstFile)); + + fos.close(); + + // New contents replace original file + String readBackData = ClientBase.readFile(dstFile); + assertEquals(TEST_STRING, readBackData); + } + + /** + * Test case where the flush() fails at close time - make sure that we clean + * up after ourselves and don't touch any existing file at the destination + */ + @Test + public void testFailToFlush() throws IOException { + // Create a file at destination + FileOutputStream fos = new FileOutputStream(dstFile); + fos.write(TEST_STRING_2.getBytes()); + fos.close(); + + OutputStream failingStream = createFailingStream(); + failingStream.write(TEST_STRING.getBytes()); + try { + failingStream.close(); + fail("Close didn't throw exception"); + } catch (IOException ioe) { + // expected + } + + // Should not have touched original file + assertEquals(TEST_STRING_2, ClientBase.readFile(dstFile)); + + assertEquals("Temporary file should have been cleaned up", + dstFile.getName(), ClientBase.join(",", testDir.list())); + } + + /** + * Create a stream that fails to flush at close time + */ + private OutputStream createFailingStream() throws FileNotFoundException { + return new AtomicFileOutputStream(dstFile) { + @Override + public void flush() throws IOException { + throw new IOException("injected failure"); + } + }; + } + + /** + * Ensure the tmp file is cleaned up and dstFile is not created when + * aborting a new file. + */ + @Test + public void testAbortNewFile() throws IOException { + AtomicFileOutputStream fos = new AtomicFileOutputStream(dstFile); + + fos.abort(); + + assertEquals(0, testDir.list().length); + } + + /** + * Ensure the tmp file is cleaned up and dstFile is not created when + * aborting a new file. + */ + @Test + public void testAbortNewFileAfterFlush() throws IOException { + AtomicFileOutputStream fos = new AtomicFileOutputStream(dstFile); + fos.write(TEST_STRING.getBytes()); + fos.flush(); + + fos.abort(); + + assertEquals(0, testDir.list().length); + } + + /** + * Ensure the tmp file is cleaned up and dstFile is untouched when + * aborting an existing file overwrite. + */ + @Test + public void testAbortExistingFile() throws IOException { + FileOutputStream fos1 = new FileOutputStream(dstFile); + fos1.write(TEST_STRING.getBytes()); + fos1.close(); + + AtomicFileOutputStream fos2 = new AtomicFileOutputStream(dstFile); + + fos2.abort(); + + // Should not have touched original file + assertEquals(TEST_STRING, ClientBase.readFile(dstFile)); + assertEquals(1, testDir.list().length); + } + + /** + * Ensure the tmp file is cleaned up and dstFile is untouched when + * aborting an existing file overwrite. + */ + @Test + public void testAbortExistingFileAfterFlush() throws IOException { + FileOutputStream fos1 = new FileOutputStream(dstFile); + fos1.write(TEST_STRING.getBytes()); + fos1.close(); + + AtomicFileOutputStream fos2 = new AtomicFileOutputStream(dstFile); + fos2.write(TEST_STRING_2.getBytes()); + fos2.flush(); + + fos2.abort(); + + // Should not have touched original file + assertEquals(TEST_STRING, ClientBase.readFile(dstFile)); + assertEquals(1, testDir.list().length); + } +} Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/ClientBase.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=1362657&r1=1362656&r2=1362657&view=diff ============================================================================== --- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/ClientBase.java (original) +++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/ClientBase.java Tue Jul 17 21:21:08 2012 @@ -20,7 +20,10 @@ package org.apache.zookeeper.test; import static org.apache.zookeeper.client.FourLetterWordMain.send4LetterWord; +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.OperatingSystemMXBean; @@ -44,13 +47,13 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZKTestCase; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.common.IOUtils; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ServerCnxnFactoryAccessor; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnLog; import org.apache.zookeeper.server.quorum.QuorumPeer; - import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -569,4 +572,28 @@ public abstract class ClientBase extends } } } + + public static String readFile(File file) throws IOException { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + BufferedInputStream is = new BufferedInputStream(new FileInputStream(file)); + try { + IOUtils.copyBytes(is, os, 1024, true); + } finally { + is.close(); + } + return os.toString(); + } + + public static String join(String separator, Object[] parts) { + StringBuilder sb = new StringBuilder(); + boolean first = true; + for (Object part : parts) { + if (!first) { + sb.append(separator); + first = false; + } + sb.append(part); + } + return sb.toString(); + } }