flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-1761. FileChannel can NPE when log metadata file is empty
Date Tue, 18 Dec 2012 19:18:24 GMT
Updated Branches:
  refs/heads/flume-1.4 d5dccff5e -> d7fd7432d


FLUME-1761. FileChannel can NPE when log metadata file is empty

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d7fd7432
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d7fd7432
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d7fd7432

Branch: refs/heads/flume-1.4
Commit: d7fd7432daebc0516609f20779f2aa757ff30ea3
Parents: d5dccff
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Tue Dec 18 11:17:33 2012 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Tue Dec 18 11:18:08 2012 -0800

----------------------------------------------------------------------
 .../flume/channel/file/CheckpointRebuilder.java    |    8 +-
 .../java/org/apache/flume/channel/file/Commit.java |    5 +-
 .../apache/flume/channel/file/LogFileFactory.java  |   10 ++
 .../org/apache/flume/channel/file/LogFileV3.java   |   13 ++-
 .../java/org/apache/flume/channel/file/Put.java    |    4 +-
 .../org/apache/flume/channel/file/Rollback.java    |    6 +-
 .../java/org/apache/flume/channel/file/Take.java   |    5 +-
 .../flume/channel/file/TransactionEventRecord.java |   10 +-
 .../org/apache/flume/channel/file/TestLog.java     |   99 +++++++++++++++
 9 files changed, 145 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/d7fd7432/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
index 748f49a..6e64003 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
@@ -23,6 +23,8 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.SetMultimap;
 import com.google.common.collect.Sets;
+
+import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
@@ -62,7 +64,11 @@ public class CheckpointRebuilder {
     LOG.info("Attempting to fast replay the log files.");
     List<LogFile.SequentialReader> logReaders = Lists.newArrayList();
     for (File logFile : logFiles) {
-      logReaders.add(LogFileFactory.getSequentialReader(logFile, null));
+      try {
+        logReaders.add(LogFileFactory.getSequentialReader(logFile, null));
+      } catch(EOFException e) {
+        LOG.warn("Ignoring " + logFile + " due to EOF", e);
+      }
     }
     long transactionIDSeed = 0;
     long writeOrderIDSeed = 0;

http://git-wip-us.apache.org/repos/asf/flume/blob/d7fd7432/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java
index 62f4451..3663244 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java
@@ -26,6 +26,8 @@ import java.io.OutputStream;
 
 import org.apache.flume.channel.file.proto.ProtosFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Represents a Commit on disk
  */
@@ -55,7 +57,8 @@ class Commit extends TransactionEventRecord {
   }
   @Override
   void readProtos(InputStream in) throws IOException {
-    ProtosFactory.Commit commit = ProtosFactory.Commit.parseDelimitedFrom(in);
+    ProtosFactory.Commit commit = Preconditions.checkNotNull(ProtosFactory.
+        Commit.parseDelimitedFrom(in), "Commit cannot be null");
     type = (short) commit.getType();
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/d7fd7432/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
index 4783448..1fe219a 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flume.channel.file;
 
+import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -148,6 +149,15 @@ class LogFileFactory {
         if(tempMetadataFile.exists()) {
           tempMetadataFile.delete();
         }
+        if(metaDataFile.length() == 0L) {
+          if(file.length() != 0L) {
+            String msg = String.format("MetaData file %s is empty, but log %s" +
+                " is of size %d", metaDataFile, file, file.length());
+            throw new IllegalStateException(msg);
+          }
+          throw new EOFException(String.format("MetaData file %s is empty",
+              metaDataFile));
+        }
         return new LogFileV3.SequentialReader(file, encryptionKeyProvider);
       }
       logFile = new RandomAccessFile(file, "r");

http://git-wip-us.apache.org/repos/asf/flume/blob/d7fd7432/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
index b4c197e..aac7805 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
@@ -100,8 +100,9 @@ class LogFileV3 extends LogFile {
     ProtosFactory.LogFileMetaData read() throws IOException {
       FileInputStream inputStream = new FileInputStream(metaDataFile);
       try {
-        ProtosFactory.LogFileMetaData metaData =
-            ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream);
+        ProtosFactory.LogFileMetaData metaData = Preconditions.checkNotNull(
+            ProtosFactory.LogFileMetaData.
+            parseDelimitedFrom(inputStream), "Metadata cannot be null");
         if (metaData.getLogFileID() != logFileID) {
           throw new IOException("The file id of log file: "
               + logFile + " is different from expected "
@@ -216,7 +217,8 @@ class LogFileV3 extends LogFile {
       FileInputStream inputStream = new FileInputStream(metaDataFile);
       try {
         ProtosFactory.LogFileMetaData metaData =
-            ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream);
+            Preconditions.checkNotNull(ProtosFactory.LogFileMetaData.
+                parseDelimitedFrom(inputStream), "MetaData cannot be null");
         int version = metaData.getVersion();
         if(version != getVersion()) {
           throw new IOException("Version is " + Integer.toHexString(version) +
@@ -295,8 +297,9 @@ class LogFileV3 extends LogFile {
       File metaDataFile = Serialization.getMetaDataFile(file);
       FileInputStream inputStream = new FileInputStream(metaDataFile);
       try {
-        ProtosFactory.LogFileMetaData metaData =
-            ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream);
+        ProtosFactory.LogFileMetaData metaData = Preconditions.checkNotNull(
+            ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream),
+            "MetaData cannot be null");
         int version = metaData.getVersion();
         if(version != getVersion()) {
           throw new IOException("Version is " + Integer.toHexString(version) +

http://git-wip-us.apache.org/repos/asf/flume/blob/d7fd7432/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
index d47b1c8..4235a79 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import org.apache.flume.channel.file.proto.ProtosFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
 
@@ -82,7 +83,8 @@ class Put extends TransactionEventRecord {
   }
   @Override
   void readProtos(InputStream in) throws IOException {
-    ProtosFactory.Put put = ProtosFactory.Put.parseDelimitedFrom(in);
+    ProtosFactory.Put put = Preconditions.checkNotNull(ProtosFactory.
+        Put.parseDelimitedFrom(in), "Put cannot be null");
     Map<String, String> headers = Maps.newHashMap();
     ProtosFactory.FlumeEvent protosEvent = put.getEvent();
     for(ProtosFactory.FlumeEventHeader header : protosEvent.getHeadersList()) {

http://git-wip-us.apache.org/repos/asf/flume/blob/d7fd7432/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
index cc9ce86..335ad0b 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
@@ -26,6 +26,8 @@ import java.io.OutputStream;
 
 import org.apache.flume.channel.file.proto.ProtosFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Represents a Rollback on disk
  */
@@ -51,8 +53,8 @@ class Rollback extends TransactionEventRecord {
   @Override
   void readProtos(InputStream in) throws IOException {
     @SuppressWarnings("unused")
-    ProtosFactory.Rollback rollback =
-      ProtosFactory.Rollback.parseDelimitedFrom(in);
+    ProtosFactory.Rollback rollback = Preconditions.checkNotNull(ProtosFactory.
+        Rollback.parseDelimitedFrom(in), "Rollback cannot be null");
   }
   @Override
   short getRecordType() {

http://git-wip-us.apache.org/repos/asf/flume/blob/d7fd7432/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
index e61bf7e..143143a 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
@@ -26,6 +26,8 @@ import java.io.OutputStream;
 
 import org.apache.flume.channel.file.proto.ProtosFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Represents a Take on disk
  */
@@ -70,7 +72,8 @@ class Take extends TransactionEventRecord {
   }
   @Override
   void readProtos(InputStream in) throws IOException {
-    ProtosFactory.Take take = ProtosFactory.Take.parseDelimitedFrom(in);
+    ProtosFactory.Take take = Preconditions.checkNotNull(ProtosFactory.
+        Take.parseDelimitedFrom(in), "Take cannot be null");
     fileID = take.getFileID();
     offset = take.getOffset();
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/d7fd7432/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
index 70098a0..073042f 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
@@ -190,8 +190,9 @@ abstract class TransactionEventRecord implements Writable {
       throws IOException {
     ByteArrayInputStream in = new ByteArrayInputStream(buffer);
     try {
-      ProtosFactory.TransactionEventHeader header =
-          ProtosFactory.TransactionEventHeader.parseDelimitedFrom(in);
+      ProtosFactory.TransactionEventHeader header = Preconditions.
+          checkNotNull(ProtosFactory.TransactionEventHeader.
+              parseDelimitedFrom(in), "Header cannot be null");
       short type = (short)header.getType();
       long transactionID = header.getTransactionID();
       long writeOrderID = header.getWriteOrderID();
@@ -199,8 +200,9 @@ abstract class TransactionEventRecord implements Writable {
           newRecordForType(type, transactionID, writeOrderID);
       transactionEvent.readProtos(in);
       @SuppressWarnings("unused")
-      ProtosFactory.TransactionEventFooter footer =
-          ProtosFactory.TransactionEventFooter.parseDelimitedFrom(in);
+      ProtosFactory.TransactionEventFooter footer = Preconditions.checkNotNull(
+          ProtosFactory.TransactionEventFooter.
+          parseDelimitedFrom(in), "Footer cannot be null");
       return transactionEvent;
     } finally {
       try {

http://git-wip-us.apache.org/repos/asf/flume/blob/d7fd7432/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
index bc7b3cf..f9dbba5 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
@@ -337,6 +337,105 @@ public class TestLog {
     LogUtils.sort(expected);
     Assert.assertEquals(expected, actual);
   }
+  @Test
+  public void testReplayFailsWithAllEmptyLogMetaDataNormalReplay()
+      throws IOException, InterruptedException {
+    doTestReplayFailsWithAllEmptyLogMetaData(false);
+  }
+  @Test
+  public void testReplayFailsWithAllEmptyLogMetaDataFastReplay()
+      throws IOException, InterruptedException {
+    doTestReplayFailsWithAllEmptyLogMetaData(true);
+  }
+  public void doTestReplayFailsWithAllEmptyLogMetaData(boolean useFastReplay)
+      throws IOException, InterruptedException {
+    // setup log with correct fast replay parameter
+    log.close();
+    log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
+        MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
+            checkpointDir).setLogDirs(dataDirs)
+            .setChannelName("testlog").setUseFastReplay(useFastReplay).build();
+    log.replay();
+    FlumeEvent eventIn = TestUtils.newPersistableEvent();
+    long transactionID = ++this.transactionID;
+    log.put(transactionID, eventIn);
+    log.commitPut(transactionID);
+    log.close();
+    if(useFastReplay) {
+      FileUtils.deleteQuietly(checkpointDir);
+      Assert.assertTrue(checkpointDir.mkdir());
+    }
+    List<File> logFiles = Lists.newArrayList();
+    for (int i = 0; i < dataDirs.length; i++) {
+      logFiles.addAll(LogUtils.getLogs(dataDirs[i]));
+    }
+    Assert.assertTrue(logFiles.size() > 0);
+    for(File logFile : logFiles) {
+      File logFileMeta = Serialization.getMetaDataFile(logFile);
+      Assert.assertTrue(logFileMeta.delete());
+      Assert.assertTrue(logFileMeta.createNewFile());
+    }
+    log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
+        MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
+            checkpointDir).setLogDirs(dataDirs)
+            .setChannelName("testlog").setUseFastReplay(useFastReplay).build();
+    try {
+      log.replay();
+      Assert.fail();
+    } catch(IllegalStateException expected) {
+      String msg = expected.getMessage();
+      Assert.assertNotNull(msg);
+      Assert.assertTrue(msg, msg.contains(".meta is empty, but log"));
+    }
+  }
+  @Test
+  public void testReplaySucceedsWithUnusedEmptyLogMetaDataNormalReplay()
+      throws IOException, InterruptedException {
+    FlumeEvent eventIn = TestUtils.newPersistableEvent();
+    long transactionID = ++this.transactionID;
+    FlumeEventPointer eventPointer = log.put(transactionID, eventIn);
+    log.commitPut(transactionID); // this is not required since
+    log.close();
+    log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
+        MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
+            checkpointDir).setLogDirs(dataDirs)
+            .setChannelName("testlog").build();
+    doTestReplaySucceedsWithUnusedEmptyLogMetaData(eventIn, eventPointer);
+  }
+  @Test
+  public void testReplaySucceedsWithUnusedEmptyLogMetaDataFastReplay()
+      throws IOException, InterruptedException {
+    FlumeEvent eventIn = TestUtils.newPersistableEvent();
+    long transactionID = ++this.transactionID;
+    FlumeEventPointer eventPointer = log.put(transactionID, eventIn);
+    log.commitPut(transactionID); // this is not required since
+    log.close();
+    FileUtils.deleteDirectory(checkpointDir);
+    Assert.assertTrue(checkpointDir.mkdir());
+    log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
+        MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
+            checkpointDir).setLogDirs(dataDirs)
+            .setChannelName("testlog").setUseFastReplay(true).build();
+    doTestReplaySucceedsWithUnusedEmptyLogMetaData(eventIn, eventPointer);
+  }
+  public void doTestReplaySucceedsWithUnusedEmptyLogMetaData(FlumeEvent eventIn,
+      FlumeEventPointer eventPointer) throws IOException,
+      InterruptedException {
+    for (int i = 0; i < dataDirs.length; i++) {
+      for(File logFile : LogUtils.getLogs(dataDirs[i])) {
+        if(logFile.length() == 0L) {
+          File logFileMeta = Serialization.getMetaDataFile(logFile);
+          Assert.assertTrue(logFileMeta.delete());
+          Assert.assertTrue(logFileMeta.createNewFile());
+        }
+      }
+    }
+    log.replay();
+    FlumeEvent eventOut = log.get(eventPointer);
+    Assert.assertNotNull(eventOut);
+    Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders());
+    Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody());
+  }
   private void takeAndVerify(FlumeEventPointer eventPointerIn,
       FlumeEvent eventIn) throws IOException, InterruptedException {
     FlumeEventQueue queue = log.getFlumeEventQueue();


Mime
View raw message