flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [11/11] git commit: FLUME-1699: Make the rename of the meta file platform neutral
Date Thu, 20 Dec 2012 08:15:20 GMT
FLUME-1699: Make the rename of the meta file platform neutral

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/019358d9
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/019358d9
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/019358d9

Branch: refs/heads/flume-1.3.1
Commit: 019358d926b9d6c470dfb0b449c4b9a4c31b7588
Parents: e3de092
Author: Brock Noland <brock@apache.org>
Authored: Mon Nov 19 14:24:51 2012 -0600
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Thu Dec 20 00:11:53 2012 -0800

----------------------------------------------------------------------
 .../apache/flume/channel/file/LogFileFactory.java  |   40 ++++++++-
 .../org/apache/flume/channel/file/LogFileV3.java   |   14 +++-
 .../apache/flume/channel/file/Serialization.java   |    7 ++
 .../org/apache/flume/channel/file/TestLogFile.java |   73 +++++++++++++++
 4 files changed, 132 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/019358d9/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
index 82e14b0..4783448 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
@@ -109,7 +109,45 @@ class LogFileFactory {
     RandomAccessFile logFile = null;
     try {
       File metaDataFile = Serialization.getMetaDataFile(file);
-      if(metaDataFile.exists()) {
+      File oldMetadataFile = Serialization.getOldMetaDataFile(file);
+      File tempMetadataFile = Serialization.getMetaDataTempFile(file);
+      boolean hasMeta = false;
+      // FLUME-1699:
+      // If the platform does not support atomic rename, then we
+      // renamed log.meta -> log.meta.old followed by log.meta.tmp -> log.meta
+      // I am not sure if all platforms maintain file integrity during
+      // file metadata update operations. So:
+      // 1. check if meta file exists
+      // 2. If 1 returns false, check if temp exists
+      // 3. if 2 is also false (maybe the machine died during temp->meta,
+      //    then check if old exists.
+      // In the above, we assume that if a file exists, it's integrity is ok.
+      if (metaDataFile.exists()) {
+        hasMeta = true;
+      } else if (tempMetadataFile.exists()) {
+        if (tempMetadataFile.renameTo(metaDataFile)) {
+          hasMeta = true;
+        } else {
+          throw new IOException("Renaming of " + tempMetadataFile.getName()
+                  + " to " + metaDataFile.getName() + " failed");
+        }
+      } else if (oldMetadataFile.exists()) {
+        if (oldMetadataFile.renameTo(metaDataFile)) {
+          hasMeta = true;
+        } else {
+          throw new IOException("Renaming of " + oldMetadataFile.getName()
+                  + " to " + metaDataFile.getName() + " failed");
+        }
+      }
+      if (hasMeta) {
+        // Now the metadata file has been found, delete old or temp files
+        // so it does not interfere with normal operation.
+        if(oldMetadataFile.exists()) {
+          oldMetadataFile.delete();
+        }
+        if(tempMetadataFile.exists()) {
+          tempMetadataFile.delete();
+        }
         return new LogFileV3.SequentialReader(file, encryptionKeyProvider);
       }
       logFile = new RandomAccessFile(file, "r");

http://git-wip-us.apache.org/repos/asf/flume/blob/019358d9/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
index f768d23..b4c197e 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
@@ -138,7 +138,19 @@ class LogFileV3 extends LogFile {
       outputStream.close();
       closed = true;
       if(!tmp.renameTo(file)) {
-        throw new IOException("Unable to move " + tmp + " over " + file);
+        //Some platforms don't support moving over an existing file.
+        //So:
+        //log.meta -> log.meta.old
+        //log.meta.tmp -> log.meta
+        //delete log.meta.old
+        File oldFile = Serialization.getOldMetaDataFile(file);
+        if(!file.renameTo(oldFile)){
+          throw new IOException("Unable to rename " + file + " to " + oldFile);
+        }
+        if(!tmp.renameTo(file)) {
+          throw new IOException("Unable to rename " + tmp + " over " + file);
+        }
+        oldFile.delete();
       }
     } finally {
       if(!closed) {

http://git-wip-us.apache.org/repos/asf/flume/blob/019358d9/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
index 6b0eeb3..ef8cf72 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
@@ -32,6 +32,7 @@ class Serialization {
 
   static final String METADATA_FILENAME = ".meta";
   static final String METADATA_TMP_FILENAME = ".tmp";
+  static final String OLD_METADATA_FILENAME = METADATA_FILENAME + ".old";
 
   static File getMetaDataTempFile(File metaDataFile) {
     String metaDataFileName = metaDataFile.getName() + METADATA_TMP_FILENAME;
@@ -43,4 +44,10 @@ class Serialization {
     return new File(file.getParentFile(), metaDataFileName);
 
   }
+
+  // Support platforms that cannot do atomic renames - FLUME-1699
+  static File getOldMetaDataFile(File file) {
+    String oldMetaDataFileName = file.getName() + OLD_METADATA_FILENAME;
+    return new File(file.getParentFile(), oldMetaDataFileName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/019358d9/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
index 4b69698..9e28599 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
@@ -167,6 +167,79 @@ public class TestLogFile {
       Assert.assertTrue(Arrays.equals(eventIn.getBody(), eventOut.getBody()));
     }
   }
+
+  @Test
+  public void testReaderOldMetaFile() throws InterruptedException, IOException {
+    Map<Integer, Put> puts = Maps.newHashMap();
+    for (int i = 0; i < 1000; i++) {
+      FlumeEvent eventIn = TestUtils.newPersistableEvent();
+      Put put = new Put(++transactionID, WriteOrderOracle.next(),
+              eventIn);
+      ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
+      FlumeEventPointer ptr = logFileWriter.put(bytes);
+      puts.put(ptr.getOffset(), put);
+    }
+    //rename the meta file to meta.old
+    File metadataFile = Serialization.getMetaDataFile(dataFile);
+    File oldMetadataFile = Serialization.getOldMetaDataFile(dataFile);
+    if (!metadataFile.renameTo(oldMetadataFile)) {
+      Assert.fail("Renaming to meta.old failed");
+    }
+    LogFile.SequentialReader reader =
+            LogFileFactory.getSequentialReader(dataFile, null);
+    Assert.assertTrue(metadataFile.exists());
+    Assert.assertFalse(oldMetadataFile.exists());
+    LogRecord entry;
+    while ((entry = reader.next()) != null) {
+      Integer offset = entry.getOffset();
+      TransactionEventRecord record = entry.getEvent();
+      Put put = puts.get(offset);
+      FlumeEvent eventIn = put.getEvent();
+      Assert.assertEquals(put.getTransactionID(), record.getTransactionID());
+      Assert.assertTrue(record instanceof Put);
+      FlumeEvent eventOut = ((Put) record).getEvent();
+      Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders());
+      Assert.assertTrue(Arrays.equals(eventIn.getBody(), eventOut.getBody()));
+    }
+  }
+
+    @Test
+  public void testReaderTempMetaFile() throws InterruptedException, IOException {
+    Map<Integer, Put> puts = Maps.newHashMap();
+    for (int i = 0; i < 1000; i++) {
+      FlumeEvent eventIn = TestUtils.newPersistableEvent();
+      Put put = new Put(++transactionID, WriteOrderOracle.next(),
+              eventIn);
+      ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
+      FlumeEventPointer ptr = logFileWriter.put(bytes);
+      puts.put(ptr.getOffset(), put);
+    }
+    //rename the meta file to meta.old
+    File metadataFile = Serialization.getMetaDataFile(dataFile);
+    File tempMetadataFile = Serialization.getMetaDataTempFile(dataFile);
+    File oldMetadataFile = Serialization.getOldMetaDataFile(dataFile);
+    oldMetadataFile.createNewFile(); //Make sure temp file is picked up.
+    if (!metadataFile.renameTo(tempMetadataFile)) {
+      Assert.fail("Renaming to meta.temp failed");
+    }
+    LogFile.SequentialReader reader =
+            LogFileFactory.getSequentialReader(dataFile, null);
+    Assert.assertTrue(metadataFile.exists());
+    Assert.assertFalse(tempMetadataFile.exists());
+    Assert.assertFalse(oldMetadataFile.exists());
+    LogRecord entry;
+    while ((entry = reader.next()) != null) {
+      Integer offset = entry.getOffset();
+      TransactionEventRecord record = entry.getEvent();
+      Put put = puts.get(offset);
+      FlumeEvent eventIn = put.getEvent();
+      Assert.assertEquals(put.getTransactionID(), record.getTransactionID());
+      Assert.assertTrue(record instanceof Put);
+      FlumeEvent eventOut = ((Put) record).getEvent();
+      Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders());
+      Assert.assertTrue(Arrays.equals(eventIn.getBody(), eventOut.getBody()));
+    }
+  }
   @Test
   public void testWriteDelimitedTo() throws IOException {
     if(dataFile.isFile()) {


Mime
View raw message