flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-1583. FileChannel fast full replay will always be used if enabled
Date Fri, 14 Sep 2012 20:16:11 GMT
Updated Branches:
  refs/heads/trunk cc499166b -> f54148a3f


FLUME-1583. FileChannel fast full replay will always be used if enabled

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f54148a3
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f54148a3
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f54148a3

Branch: refs/heads/trunk
Commit: f54148a3f53e5c53bc3932b208f25373d3c6ccbb
Parents: cc49916
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Fri Sep 14 13:14:40 2012 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Fri Sep 14 13:15:56 2012 -0700

----------------------------------------------------------------------
 .../flume/channel/file/CheckpointRebuilder.java    |   36 +++----
 .../java/org/apache/flume/channel/file/Log.java    |   32 +++++--
 .../apache/flume/channel/file/ReplayHandler.java   |   24 +----
 .../channel/file/TestCheckpointRebuilder.java      |   82 +++++++++++++++
 .../flume/channel/file/TestFileChannelRestart.java |   10 ++
 .../org/apache/flume/channel/file/TestUtils.java   |   10 ++
 6 files changed, 146 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/f54148a3/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
index 6e1d2fc..748f49a 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
 
 public class CheckpointRebuilder {
 
-  private final File checkpointDir;
   private final List<File> logFiles;
   private final FlumeEventQueue queue;
   private final Set<ComparableFlumeEventPointer> committedPuts =
@@ -53,19 +52,13 @@ public class CheckpointRebuilder {
   private static Logger LOG =
           LoggerFactory.getLogger(CheckpointRebuilder.class);
 
-  public CheckpointRebuilder(File checkpointDir, List<File> logFiles,
+  public CheckpointRebuilder(List<File> logFiles,
           FlumeEventQueue queue) throws IOException {
-    this.checkpointDir = checkpointDir;
     this.logFiles = logFiles;
     this.queue = queue;
   }
 
   public boolean rebuild() throws IOException, Exception {
-    File checkpointFile = new File(checkpointDir, "checkpoint");
-    if (checkpointFile.exists()) {
-      LOG.info("Checkpoint file found, will not replay with fast logic.");
-      return false;
-    }
     LOG.info("Attempting to fast replay the log files.");
     List<LogFile.SequentialReader> logReaders = Lists.newArrayList();
     for (File logFile : logFiles) {
@@ -243,18 +236,23 @@ public class CheckpointRebuilder {
       logFiles.addAll(Arrays.asList(files));
     }
     int capacity = Integer.parseInt(cli.getOptionValue("t"));
-    EventQueueBackingStore backingStore =
-        EventQueueBackingStoreFactory.get(new File(checkpointDir, "checkpoint"),
-            capacity, "channel");
-    FlumeEventQueue queue = new FlumeEventQueue(backingStore,
-            new File(checkpointDir, "inflighttakes"),
-            new File(checkpointDir, "inflightputs"));
-    CheckpointRebuilder rebuilder = new CheckpointRebuilder(checkpointDir,
-            logFiles, queue);
-    if(rebuilder.rebuild()) {
-      rebuilder.writeCheckpoint();
+    File checkpointFile = new File(checkpointDir, "checkpoint");
+    if(checkpointFile.exists()) {
+      LOG.error("Cannot execute fast replay",
+          new IllegalStateException("Checkpoint exists" + checkpointFile));
     } else {
-      LOG.error("Could not rebuild the checkpoint due to errors.");
+      EventQueueBackingStore backingStore =
+          EventQueueBackingStoreFactory.get(checkpointFile,
+              capacity, "channel");
+      FlumeEventQueue queue = new FlumeEventQueue(backingStore,
+              new File(checkpointDir, "inflighttakes"),
+              new File(checkpointDir, "inflightputs"));
+      CheckpointRebuilder rebuilder = new CheckpointRebuilder(logFiles, queue);
+      if(rebuilder.rebuild()) {
+        rebuilder.writeCheckpoint();
+      } else {
+        LOG.error("Could not rebuild the checkpoint due to errors.");
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/f54148a3/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 1d91460..e36eafb 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -309,11 +309,22 @@ class Log {
        */
       LogUtils.sort(dataFiles);
 
+      boolean useFastReplay = this.useFastReplay;
       /*
        * Read the checkpoint (in memory queue) from one of two alternating
        * locations. We will read the last one written to disk.
        */
       File checkpointFile = new File(checkpointDir, "checkpoint");
+      if(useFastReplay) {
+        if(checkpointFile.exists()) {
+          LOGGER.debug("Disabling fast full replay because checkpoint " +
+              "exists: " + checkpointFile);
+          useFastReplay = false;
+        } else {
+          LOGGER.debug("Not disabling fast full replay because checkpoint " +
+              " does not exist: " + checkpointFile);
+        }
+      }
       File inflightTakesFile = new File(checkpointDir, "inflighttakes");
       File inflightPutsFile = new File(checkpointDir, "inflightputs");
       EventQueueBackingStore backingStore =
@@ -329,16 +340,23 @@ class Log {
        * the queue, the timestamp the queue was written to disk, and
        * the list of data files.
        */
-      ReplayHandler replayHandler = new ReplayHandler(queue,
-          encryptionKeyProvider, useFastReplay, checkpointFile);
-      if(useLogReplayV1) {
-        LOGGER.info("Replaying logs with v1 replay logic");
-        replayHandler.replayLogv1(dataFiles);
+      CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles,
+          queue);
+      if(useFastReplay && rebuilder.rebuild()) {
+        LOGGER.info("Fast replay successful.");
       } else {
-        LOGGER.info("Replaying logs with v2 replay logic");
-        replayHandler.replayLog(dataFiles);
+        ReplayHandler replayHandler = new ReplayHandler(queue,
+            encryptionKeyProvider);
+        if(useLogReplayV1) {
+          LOGGER.info("Replaying logs with v1 replay logic");
+          replayHandler.replayLogv1(dataFiles);
+        } else {
+          LOGGER.info("Replaying logs with v2 replay logic");
+          replayHandler.replayLog(dataFiles);
+        }
       }
 
+
       for (int index = 0; index < logDirs.length; index++) {
         LOGGER.info("Rolling " + logDirs[index]);
         roll(index);

http://git-wip-us.apache.org/repos/asf/flume/blob/f54148a3/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
index 7c32526..81f6172 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
@@ -69,15 +69,11 @@ class ReplayHandler {
    * finding the put and commit in logdir2.
    */
   private final List<Long> pendingTakes;
-  private final boolean useFastReplay;
-  private final File cpDir;
 
-  ReplayHandler(FlumeEventQueue queue, @Nullable KeyProvider encryptionKeyProvider,
-      boolean useFastReplay, File cpDir) {
+  ReplayHandler(FlumeEventQueue queue,
+      @Nullable KeyProvider encryptionKeyProvider) {
     this.queue = queue;
-    this.useFastReplay = useFastReplay;
     this.lastCheckpoint = queue.getLogWriteOrderID();
-    this.cpDir = cpDir;
     pendingTakes = Lists.newArrayList();
     readers = Maps.newHashMap();
     logRecordBuffer = new PriorityQueue<LogRecord>();
@@ -89,14 +85,6 @@ class ReplayHandler {
    */
   @Deprecated
   void replayLogv1(List<File> logs) throws Exception {
-    if(useFastReplay) {
-      CheckpointRebuilder rebuilder = new CheckpointRebuilder(cpDir, logs,
-              queue);
-      if(rebuilder.rebuild()){
-        LOG.info("Fast replay successful.");
-        return;
-      }
-    }
     int total = 0;
     int count = 0;
     MultiMap transactionMap = new MultiValueMap();
@@ -228,14 +216,6 @@ class ReplayHandler {
    * @throws IOException
    */
   void replayLog(List<File> logs) throws Exception {
-    if (useFastReplay) {
-      CheckpointRebuilder rebuilder = new CheckpointRebuilder(cpDir, logs,
-              queue);
-      if (rebuilder.rebuild()) {
-        LOG.info("Fast replay successful.");
-        return;
-      }
-    }
     int count = 0;
     MultiMap transactionMap = new MultiValueMap();
     // seed both with the highest known sequence of either the tnxid or woid

http://git-wip-us.apache.org/repos/asf/flume/blob/f54148a3/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
new file mode 100644
index 0000000..ffc4623
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import static org.apache.flume.channel.file.TestUtils.*;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class TestCheckpointRebuilder extends TestFileChannelBase {
+
+  protected static final Logger LOG = LoggerFactory
+      .getLogger(TestCheckpointRebuilder.class);
+
+  @Before
+  public void setup() throws Exception {
+    super.setup();
+  }
+
+  @After
+  public void teardown() {
+    super.teardown();
+  }
+  @Test
+  public void testFastReplay() throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.CAPACITY,
+        String.valueOf(50));
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> in = fillChannel(channel, "checkpointBulder");
+    channel.stop();
+    File checkpointFile = new File(checkpointDir, "checkpoint");
+    File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
+    File inflightTakesFile = new File(checkpointDir, "inflighttakes");
+    File inflightPutsFile = new File(checkpointDir, "inflightputs");
+    Assert.assertTrue(checkpointFile.delete());
+    Assert.assertTrue(metaDataFile.delete());
+    Assert.assertTrue(inflightTakesFile.delete());
+    Assert.assertTrue(inflightPutsFile.delete());
+    EventQueueBackingStore backingStore =
+        EventQueueBackingStoreFactory.get(checkpointFile, 50,
+            "test");
+    FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakesFile,
+          inflightPutsFile);
+    CheckpointRebuilder checkpointRebuilder =
+        new CheckpointRebuilder(getAllLogs(dataDirs), queue);
+    Assert.assertTrue(checkpointRebuilder.rebuild());
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> out = consumeChannel(channel);
+    compareInputAndOut(in, out);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/f54148a3/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
index 4133573..68285cc 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -66,6 +66,16 @@ public class TestFileChannelRestart extends TestFileChannelBase {
   }
 
   @Test
+  public void testFastReplayNegativeTestV1() throws Exception {
+    doTestRestart(true, true, false, true);
+  }
+
+  @Test
+  public void testFastReplayNegativeTestV2() throws Exception {
+    doTestRestart(false, true, false, true);
+  }
+
+  @Test
   public void testNormalReplayV1() throws Exception {
     doTestRestart(true, true, true, false);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/f54148a3/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
index 2b88b96..8807201 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
@@ -29,6 +29,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URL;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -45,6 +46,7 @@ import org.apache.hadoop.io.Writable;
 import org.junit.Assert;
 
 import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.io.ByteStreams;
@@ -105,6 +107,14 @@ public class TestUtils {
     return events;
   }
 
+  public static List<File> getAllLogs(File[] dataDirs) {
+    List<File> result = Lists.newArrayList();
+    for(File dataDir : dataDirs) {
+      result.addAll(LogUtils.getLogs(dataDir));
+    }
+    return result;
+  }
+
   public static void forceCheckpoint(FileChannel channel) {
     Log log = field("log")
             .ofType(Log.class)


Mime
View raw message